Kafka Chess App
Building a web-based chess game with real-time analytics
kafka-chess
is a project that builds a web-based chess game with real-time analytics. Players interact with an interactive chessboard via a web application, where every move and game event is published to a Kafka event bus. An Apache Flink engine continuously processes these events to calculate live game statistics, which are then displayed on a dynamic Streamlit dashboard.

Interactive Chessboard

Streamlit Dashboard
Source Repository: https://github.com/cyshen11/kafka-chess
flowchart TD
A0["Chess Game Engine
"]
A1["Interactive Chessboard
"]
A2["Web Application Backend
"]
A3["Kafka Event Bus
"]
A4["Flink Analytics Engine
"]
A5["Streamlit Analytics Dashboard
"]
A6["Game Interaction Controller
"]
A7["Live Data Update Workers
"]
A6 -- "Consults rules" --> A0
A6 -- "Updates display" --> A1
A1 -- "Triggers events" --> A6
A6 -- "Sends game data" --> A2
A2 -- "Publishes events to" --> A3
A4 -- "Consumes data from" --> A3
A4 -- "Provides analytics to" --> A7
A7 -- "Updates UI of" --> A5
Tech Stack
- Frontend: Flask, Streamlit
- Database: Kafka
- Analytics Engine: Flink
Getting Started
- Clone the source repository:
git clone https://github.com/cyshen11/kafka-chess.git
. - Create a python environment and install the requirements.txt.
- Run
flask --app flaskr run --debug
to run the interactive chessboard and navigate to 127.0.0.1:5000 to view it. - Change directory to Kafka
cd kafka
and rundocker compose up
to run the Kafka server in Docker container. - Change directory to parent level
cd ../
. - Run
streamlit run streamlit/app.py
to run the analytics dashboard and navigate to 127.0.0.1:8501 to view it.
Chapters
- Web Application Backend
- Interactive Chessboard
- Streamlit Analytics Dashboard
- Game Interaction Controller
- Chess Game Engine
- Kafka Event Bus
- Flink Analytics Engine
- Live Data Update Workers
Chapter 1: Web Application Backend
Welcome to kafka-chess
! In this project, we’re building a complete chess game that uses some really powerful tools. Think of it like building a complex machine: it has many different parts, and each part has a specific job.
This first chapter introduces you to the Web Application Backend. Imagine you’re playing chess on your computer. You open your web browser, and poof! a chessboard appears. You click on a pawn, and it moves. How does all of this happen? That’s exactly what the Web Application Backend helps to manage!
What is the Web Application Backend?
At its heart, the Web Application Backend is like the control center for your entire web experience. It has two main jobs:
- Serving the Webpage: It’s responsible for showing you the
index.html
page, which is where you see the chessboard and all the game controls. Think of it as the restaurant that gives you the menu. - Receiving Your Actions: When you make a move, click a button, or surrender in the game, your web browser needs a way to tell the “brain” of the application what you did. The backend provides special “API endpoints” (like different phone numbers for different departments) where your browser can send this information.
But it does one more very important thing: it acts like a post office. When you make a move, the backend doesn’t try to figure out if it’s a legal move or what the new board position should be. Instead, it simply takes the information about your move and sends it off to a special “mail system” called Kafka. This allows other parts of our chess system to pick up these “letters” (game events) and process them in real-time.
Use Case: Making a Chess Move
Let’s use a simple example: You, the player, move a pawn on the screen. How does the Web Application Backend help make this happen?
- You interact with the chessboard in your browser.
- Your browser (the “frontend”) tells the backend, “Hey, I just moved a piece from square ‘e2’ to ‘e4’!”
- The backend receives this message.
- Instead of processing the move itself, the backend immediately sends this move information into our “mail system” (Kafka) so other components can react to it.
This process ensures that the backend is fast and efficient, simply passing along information to where it needs to go.
How the Backend Works (Simplified)
Let’s break down how the backend fulfills its role. We use a Python tool called Flask to build our web application.
1. Showing You the Chessboard
When you visit the website, the backend’s first job is to show you the main chess game page. This is handled by a function that “routes” requests to the correct page.
# In flaskr/__init__.py
from flask import Flask, render_template
def create_app(test_config=None):
app = Flask(__name__, instance_relative_config=True)
# ... (other setup code)
@app.route("/") # This means "when someone visits the main page /"
def main():
# This sends the 'index.html' file to the user's browser
return render_template("index.html", move={})
return app
In this code:
-
app = Flask(__name__, ...)
creates our web application. -
@app.route("/")
is like setting up a street sign that says “If you’re looking for the main page, come this way!” -
def main():
is the function that runs when someone visits that main page. -
return render_template("index.html", ...)
tells Flask to send theindex.html
file (which contains all the HTML, CSS, and JavaScript for our chessboard) to your web browser. This is how you see the game!
2. Setting Up the “Post Office” (Kafka Producer)
Before the backend can send messages to Kafka, it needs a way to connect. This is done by creating a Producer
object, which is like setting up a connection to the main post office.
# In flaskr/__init__.py
from confluent_kafka import Producer
KAFKA_SERVER = "localhost:9092" # Where our Kafka "post office" is located
# Set up connection details for the producer
config = {
"bootstrap.servers": KAFKA_SERVER,
"acks": "all", # Make sure messages are safely delivered
}
# Create the Producer instance
producer = Producer(config)
Here:
- We import
Producer
fromconfluent_kafka
, which is a library that helps Python talk to Kafka. -
KAFKA_SERVER
tells our producer where to find the Kafka system (usually onlocalhost:9092
if you’re running it on your own computer). -
producer = Producer(config)
creates the actual “post office worker” object that will send our messages.
3. Receiving Your Moves and Sending Them to Kafka
Now, let’s see how the backend receives your move and sends it off.
First, on the frontend (the JavaScript code running in your web browser), when you make a move, a function is called to send that information to our backend:
// In flaskr/static/js/chess_actions.js (simplified)
function createMove() {
const lastMove = game.history({ verbose: true })[0]; // Get details of the last move
const key = uuid.v4(); // Generate a unique ID for this move
const value =
'"' +
key +
'","' +
sessionStorage.getItem("gameId") +
'","' +
"Player" +
'","' +
"Pawn from e2 to e4" +
'","' + // Example move details
toISOStringLocal(new Date()) +
'"'; // When the move happened
console.log(value);
// Send the move data to the backend's /add_move API endpoint
axios.post("http://127.0.0.1:5000/add_move/" + key + "/" + value);
}
This JavaScript code uses axios.post
to send an HTTP POST request to our Flask backend. It sends the key
(a unique ID for the move) and value
(details about the move) as part of the web address.
Next, on the backend side, Flask has another route that is specifically designed to listen for these add_move
requests:
# In flaskr/__init__.py (simplified)
@app.route("/add_move/<key>/<value>", methods=["POST"])
def add_move(key, value):
topic = "moves" # We send chess moves to the 'moves' topic
# Use our Kafka producer to send the message
producer.produce(topic, value, key, on_delivery=delivery_callback)
# Make sure the message is sent before continuing
producer.poll(10000)
producer.flush()
return "" # Send an empty response back to the frontend
Here’s what happens step-by-step:
-
@app.route("/add_move/<key>/<value>", methods=["POST"])
sets up another “street sign.” This one says, “If you’re sending a POST request to/add_move
with akey
andvalue
, come this way!” -
def add_move(key, value):
is the function that runs when your browser sends a move. Thekey
andvalue
from the browser are passed directly into this function. -
topic = "moves"
: Kafka uses “topics” to organize messages, like different departments in a post office. All chess moves go into themoves
topic. -
producer.produce(topic, value, key, ...)
is the crucial line! This tells our “post office worker” (producer
) to send thevalue
(the move details) to thetopic
(moves
), using thekey
as an identifier.on_delivery=delivery_callback
is just a way to check if the message was delivered successfully. -
producer.poll()
andproducer.flush()
ensure that the message is actually sent out immediately.
There’s also a similar endpoint, /add_game
, which works in the same way to send information about when a game starts or ends to a different Kafka topic called games
.
Internal Workflow
Let’s visualize the simplified interaction when you make a move:
sequenceDiagram
participant User
participant Frontend as Interactive Chessboard
participant Backend as Web Application Backend
participant Kafka as Kafka Event Bus
User->>Frontend: Makes a move (e.g., clicks a square)
Frontend->>Backend: Sends move data via HTTP POST to /add_move
Backend->>Kafka: Produces 'move' event to 'moves' topic
Kafka-->>Backend: Confirms event received
Backend-->>Frontend: Sends empty response (acknowledgment)
In this flow:
- User makes a move: You interact with the visual chessboard.
- Frontend sends move data: The JavaScript code on your webpage detects the move and uses
axios.post
to send the details to the backend’s/add_move
endpoint. - Backend produces event to Kafka: The
add_move
function in Flask receives the move data. It doesn’t process the game logic itself; instead, it uses theproducer
to send this raw move information into the “moves” topic within Kafka. - Kafka confirms receipt: Kafka acknowledges that it has received the message.
- Backend responds to Frontend: The backend sends a simple “OK” (an empty response) back to the frontend, letting the browser know the message was received.
This design makes the backend very lightweight and efficient, as its main job is just to route information quickly!
Conclusion
You’ve now learned about the Web Application Backend, which is the “control center” that serves our chess webpage, listens for your actions (like making a move), and acts as a “post office” to send these actions to Kafka for further processing. This allows our backend to stay fast and focused on getting information where it needs to go.
Next up, we’ll dive into the Interactive Chessboard itself, to see how the web page handles your clicks and displays the game!
Chapter 2: Interactive Chessboard
Welcome back! In Chapter 1: Web Application Backend, we learned how our backend serves the index.html
page to your web browser and acts as a “post office” for game events. But once that index.html
page loads, how does the actual chessboard appear? How do you interact with it, dragging and dropping pieces? That’s where the Interactive Chessboard comes in!
What is the Interactive Chessboard?
Imagine the chessboard you see on your screen. It’s not just a static image; it’s a dynamic playground where you move pieces around. The Interactive Chessboard is the part of our system responsible for making this visual experience possible.
Its main jobs are:
- Drawing the Board and Pieces: It’s like the artist who paints the chessboard, draws all the pawns, knights, and kings, and places them in their starting positions.
- Listening to Your Actions: When you click a square, drag a piece, or tap your screen, the Interactive Chessboard detects these actions.
- Translating Actions into Events: It doesn’t know the rules of chess (like if a knight can jump over pieces). Instead, it’s like a messenger that tells another part of our system, “Hey, the user just tried to move a piece from square ‘e2’ to ‘e4’!”
The chessboard2.js
library is the specific tool we use to create this interactive visual element. It’s the “front-end” part that runs directly in your web browser.
Use Case: Moving a Chess Piece Visually
Let’s use our familiar example: You, the player, want to move a pawn on the screen. How does the Interactive Chessboard make this visual interaction happen?
- You drag a pawn from
e2
toe4
with your mouse. - The
chessboard2.js
library detects this “drag and drop” action. - It then calls a special function (a “callback”) that we provide, telling it, “A piece was dropped from ‘e2’ to ‘e4’!”
- Our special function (which lives in
chess_actions.js
, covered in a later chapter) then checks if this is a legal move using the Chess Game Engine. - If it’s legal, our function tells
chessboard2.js
to update its display to show the pawn now one4
.
Notice a key point: chessboard2.js
is only concerned with showing things and reporting user interactions. It doesn’t know the game rules itself. It relies on other parts of our system to tell it what to display.
How the Interactive Chessboard Works (Simplified)
The chessboard2.js
library is initialized and configured directly within your index.html
file, using JavaScript code.
1. Setting Up the Visual Board
When the index.html
page loads in your browser, JavaScript code runs to create the chessboard.
<!-- In flaskr/templates/index.html (simplified) -->
<div id="myBoard" style="width: 400px; margin: 20px 0;"></div>
<script>
// ... (other setup code)
// Chessboard Configuration
const boardConfig = {
draggable: true, // Allow piece dragging
onDragStart, // Function called when drag starts
onTouchSquare, // Function for touch/click interactions
onDrop, // Function called when piece is dropped
onSnapEnd, // Function called after piece animation ends
position: game.fen(), // Set initial board position from game state
touchMove: true, // Enable touch-based moves on mobile
};
// Initialize the visual chessboard in the myBoard div
const board = Chessboard2("myBoard", boardConfig);
// ... (more code)
</script>
In this snippet:
- The
<div id="myBoard"></div>
is a placeholder on your webpage where the chessboard will be drawn. -
boardConfig
is an object (like a shopping list) that tellschessboard2.js
how we want our board to behave.-
draggable: true
means you can pick up pieces. -
position: game.fen()
tellschessboard2.js
the current state of the game board using a special code called FEN (which the Chess Game Engine understands). -
onDragStart
,onTouchSquare
,onDrop
, andonSnapEnd
are callback functions. These are functions we write in another file (chess_actions.js
, as you’ll see in Chapter 4: Game Interaction Controller).chessboard2.js
will call these functions when a user performs a specific action (like starting a drag or dropping a piece).
-
-
const board = Chessboard2('myBoard', boardConfig)
is the line that actually creates the chessboard! It tells thechessboard2.js
library: “Hey, draw a chessboard inside thediv
with the ID ‘myBoard’, and use theseboardConfig
settings.”
2. Reacting to Your Visual Moves
When you drag and drop a piece, chessboard2.js
doesn’t know if it’s a legal chess move. It just reports the action. It calls the onDrop
function (which we defined in chess_actions.js
).
Let’s look at a simplified onDrop
function:
// In flaskr/static/js/chess_actions.js (simplified)
function onDrop(dropEvt) {
// 1. Try to make the move using the Chess Game Engine (chess.js)
const move = game.move({
from: dropEvt.source, // Where the piece came from (e.g., 'e2')
to: dropEvt.target, // Where the piece was dropped (e.g., 'e4')
promotion: "q", // (simplified) always promote to a Queen
});
// 2. If the move was legal (chess.js returned a result)
if (move) {
// 3. Tell chessboard2.js to update its visual position
board.position(game.fen()).then(() => {
// 4. Update other game elements (like move history)
updatePGN();
updateStatus();
// 5. (Later chapters) send move to backend for Kafka
createMove(); // Sends move to Flask backend (Chapter 1)
// Make a random move for the AI (for single-player demo)
window.setTimeout(makeRandomMove, 250);
});
} else {
// If the move was illegal, snap the piece back to its original square
return "snapback";
}
}
In this code:
- The
onDrop
function receivesdropEvt
(an event object) fromchessboard2.js
. This object containssource
(the starting square) andtarget
(the ending square). -
game.move(...)
is a function from the Chess Game Engine (chess.js
). This is where the actual chess rules are checked. - If
game.move()
returns a validmove
(meaning the move was legal), then:-
board.position(game.fen())
tellschessboard2.js
to redraw the board to the new game state (provided bygame.fen()
). This visually updates the board. -
updatePGN()
andupdateStatus()
update the text you see on the page. -
createMove()
(as we saw in Chapter 1: Web Application Backend) sends the move data to our Flask backend, which then pushes it to Kafka.
-
Internal Workflow
Let’s visualize the steps involved when you interact with the chessboard:
sequenceDiagram
participant User
participant A as Interactive Chessboard (chessboard2.js)
participant B as Game Interaction Controller (chess_actions.js)
participant C as Chess Game Engine (chess.js)
User->>A: Drags a piece from e2 to e4
A->>B: Calls onDrop(source: 'e2', target: 'e4')
B->>C: Calls game.move({from: 'e2', to: 'e4'})
C-->>B: Returns move object (if legal)
B->>A: Calls board.position(game.fen())
A-->>User: Visually updates board
In this flow:
- User Interacts: You physically drag a chess piece on the screen.
- Interactive Chessboard Detects:
chessboard2.js
detects this interaction and figures out which squares were involved. - Calls Callback:
chessboard2.js
calls theonDrop
function, passing details of the move attempt. - Game Logic: The
onDrop
function (inchess_actions.js
) uses thegame
object (fromchess.js
) to check if the move is legal and to update the actual game state. - Visual Update: If the move is valid, the
onDrop
function tellschessboard2.js
to update its visual display to reflect the new board position.
This separation of concerns is powerful: chessboard2.js
focuses on the “graphics” and user input, while chess.js
handles the “brain” (game rules). Our chess_actions.js
acts as the bridge between them, as you’ll learn more about in Chapter 4: Game Interaction Controller.
Conclusion
You’ve now seen how the Interactive Chessboard (powered by chessboard2.js
) provides the visual interface for our chess game. It handles drawing the board and pieces, and, crucially, it acts as the primary listener for your interactions, passing those “events” to our custom JavaScript functions. This allows us to keep the visual display separate from the actual game logic.
Next up, we’ll shift gears and look at how we visualize and analyze game data using a tool called Streamlit Analytics Dashboard.
Chapter 3: Streamlit Analytics Dashboard
Welcome back! In Chapter 1: Web Application Backend, we learned how our chess system handles sending messages about moves to Kafka. Then, in Chapter 2: Interactive Chessboard, we saw how the chessboard appears in your browser and lets you interact with it.
Now, imagine we have many people playing chess on our platform. We’re collecting all these moves and game starts/ends. But how can we see what’s happening across all games? How many games are active right now? What are the most recent moves being played? We need a way to display and understand this information easily.
What is the Streamlit Analytics Dashboard?
The Streamlit Analytics Dashboard is like a “control panel” or a “TV screen” for our kafka-chess
project. Its main job is to show you the important numbers and recent activities from all the chess games being played. Instead of just playing a single game, this dashboard lets you observe the bigger picture.
It acts as the main user interface for observing game trends. It uses a tool called Streamlit to create a web-based dashboard that is visually organized to present game statistics and a live feed of recent moves. This makes insights from our data (which is processed by the Flink Analytics Engine, covered later) accessible and easy to understand.
Use Case: Monitoring Live Chess Game Trends
Let’s say you’re the person running the kafka-chess
platform, or just a curious user. You want to quickly answer questions like:
- How many chess games are currently happening?
- How many total players have joined?
- What were the last few moves made across any game?
The Streamlit Analytics Dashboard solves this by providing a simple webpage that constantly updates with these answers.
How the Dashboard Works (Simplified)
The core of our dashboard is the app.py
script located in the streamlit
directory. This script uses the Streamlit library to build a webpage that looks nice and shows data.
1. Setting Up the Web Page Layout
First, app.py
tells Streamlit how the web page should look. It sets a title and divides the page into sections, like arranging furniture in a room.
# In streamlit/app.py (simplified)
import streamlit as st
# Configure the Streamlit page layout and metadata
st.set_page_config(page_title="Chess Analytics", page_icon="♟️", layout="wide")
st.title("♟️ Chess Web App Analytics")
st.write("Built with Streamlit and PyFlink")
# Create main content columns - col1 for stats, col2 for moves
col1, col2 = st.columns([1, 2])
col1.subheader("📊 Game Statistics")
col2.subheader("🔄 Recent Moves")
# ... (more code for data workers)
In this code:
-
import streamlit as st
brings in the Streamlit library. -
st.set_page_config(...)
sets up basic things like the tab title in your browser and the overall layout. -
st.title(...)
puts a big title at the top of our page. -
st.columns([1, 2])
is a powerful command! It creates two “columns” on our webpage. The[1, 2]
means the second column (col2
) will be twice as wide as the first (col1
). We then give each column its own subheader (col1.subheader
,col2.subheader
) to label its content.
2. Getting and Displaying Live Data
The dashboard needs to constantly update with new information. It doesn’t just show a static picture; it’s like a live news feed. To do this, we have special “worker threads” (imagine them as small, independent reporters) that constantly fetch data and update specific parts of the dashboard.
The app.py
script starts these workers:
# In streamlit/app.py (simplified)
from utils.utils_table_env import TableEnvCustomized
from utils.utils_worker import run_workers
# ... (page layout code)
# Initialize PyFlink table environment for data processing
table_env = TableEnvCustomized()
table_env.create_source_tables() # Set up connections to data sources
# Start the data processing workers
try:
run_workers(col1, col2, col3, col4, table_env)
except Exception:
st.info("Unable to fetch live data.")
Here:
-
TableEnvCustomized()
is setting up the connection to our data processing engine (Flink Analytics Engine). Think ofTableEnvCustomized()
as the “data manager” that knows how to ask for processed game information. -
run_workers(...)
is the key function that kicks off all our “reporter” threads. It tells them which “column” on the Streamlit page they should update (col1
,col2
, etc.) and gives them thetable_env
so they can ask for data.
Internal Workflow
Let’s visualize how the dashboard gets and displays its live information:
sequenceDiagram
participant User
participant A as Streamlit App (app.py)
participant B as Worker Threads (utils_worker.py)
participant C as Table Environment (Flink Engine)
User->>A: Opens Streamlit Dashboard
A->>B: Starts Worker Threads (e.g., for stats, for moves)
loop Continuous Updates
B->>C: Worker asks for latest "Game Stats"
C-->>B: Provides "Active Games", "Completed Games"
B->>A: Worker updates `col1` with metrics (st.metric)
B->>C: Worker asks for latest "Recent Moves"
C-->>B: Provides table of recent moves
B->>A: Worker updates `col2` with data table (st.dataframe)
end
In this flow:
- User Opens Dashboard: You navigate to the Streamlit app in your web browser.
- Streamlit App Starts Workers: The
app.py
script runs, sets up the page, and then callsrun_workers
to start background tasks. - Workers Request Data: These “worker threads” (defined in
utils_worker.py
) constantly ask theTable Environment
(which represents the Flink Analytics Engine’s processed data) for the latest statistics or recent moves. - Flink Provides Data: The
Table Environment
sends back the processed data (e.g., “3 active games,” “a list of 5 recent moves”). - Workers Update Dashboard: Each worker then takes its data and uses Streamlit commands (
st.metric
,st.dataframe
) to update the specific sections of the web page it’s responsible for. This happens continuously, giving you a live view!
Diving into the Worker Code
Let’s look at a simplified example of one of these “reporter” worker threads from streamlit/utils/utils_worker.py
.
Here’s WorkerThread1
, which handles displaying game statistics:
# In streamlit/utils/utils_worker.py (simplified)
import streamlit as st
from threading import Thread # For running tasks in the background
class WorkerThread1(Thread):
"""Worker thread for displaying game statistics metrics."""
def __init__(self, delay, target, table_env):
super().__init__()
self.delay = delay
self.target = target # This is a Streamlit column (e.g., col1)
self.table_env = table_env # Our data manager
def run(self):
"""Execute the worker thread to display game statistics."""
# Initial delay to let other things start
time.sleep(self.delay)
# Ask the data manager for the game stats stream
stream = self.table_env.get_games_stats()
# Loop forever, getting new data as it comes in
for chunk in stream:
# Use the target column to display the data
with self.target.container():
# Display metrics like "Active Games"
st.metric("Active Games", chunk[0])
st.metric("Games Completed Today", chunk[1])
Explanation:
-
class WorkerThread1(Thread):
means this is a special kind of task that can run in the background without freezing the rest of our app. -
self.target
is thecol1
(or similar) Streamlit object we passed to it. -
self.table_env.get_games_stats()
is where this worker asks the Flink Analytics Engine for the live game statistics. -
for chunk in stream:
This loop is crucial! It continuously waits for new data to arrive from Flink. As soon as Flink sends updated stats, this loop gets achunk
of data. -
with self.target.container():
ensures that whatever we display (st.metric
) appears inside the specific column (col1
) assigned to this worker. -
st.metric("Active Games", chunk[0])
displays a nice, big number (a “metric”) labeled “Active Games” using the first piece of data (chunk[0]
) from Flink.
Similarly, WorkerThread2
is responsible for showing the “Recent Moves” table:
# In streamlit/utils/utils_worker.py (simplified)
# ... (imports and other worker classes)
class WorkerThread2(Thread):
"""Worker thread for displaying chess moves data table."""
def __init__(self, delay, target, table_env):
super().__init__()
self.delay = delay
self.target = target
self.table_env = table_env
def run(self):
"""Execute the worker thread to display moves data."""
time.sleep(self.delay)
stream = self.table_env.get_moves() # Get stream of recent moves
for chunk in stream:
# Display moves data as a scrollable dataframe
self.target.container().dataframe(chunk, height=400)
Here:
-
self.table_env.get_moves()
asks Flink for the list of recent moves. -
self.target.container().dataframe(chunk, height=400)
takes the receivedchunk
(which is a table of move data) and displays it nicely as a scrollable table withincol2
on the dashboard.
Finally, the run_workers
function brings it all together:
# In streamlit/utils/utils_worker.py (simplified)
# ... (worker classes)
from streamlit.runtime.scriptrunner_utils.script_run_context import (
add_script_run_ctx, get_script_run_ctx,
)
def run_workers(col1, col2, col3, col4, table_env):
"""Initialize and start all worker threads for the Streamlit dashboard."""
threads = [
WorkerThread1(1.2, col1.empty(), table_env), # Stats worker for col1
WorkerThread2(1.1, col2.empty(), table_env), # Moves worker for col2
# WorkerThread3 (disabled for now)
WorkerThread4(1.3, col4.empty()), # Timestamp worker for col4
]
# Start all threads
for thread in threads:
# Crucial: this allows threads to update the Streamlit page
add_script_run_ctx(thread, get_script_run_ctx())
thread.start()
# Wait for all threads to complete (they run forever in this case)
for thread in threads:
thread.join()
This run_workers
function:
- Creates instances of our
WorkerThread
reporters, giving them their specificcol
(where to display) andtable_env
(where to get data).col1.empty()
creates a placeholder in that column that the worker can continuously update. - The
add_script_run_ctx(...)
line is important technical glue for Streamlit: it tells Streamlit that these background threads are allowed to update the web page. Without it, Streamlit wouldn’t let the threads change what’s on the screen. -
thread.start()
actually starts each reporter thread running in the background. -
thread.join()
makes the main program wait for these threads, effectively keeping the dashboard running.
Conclusion
You’ve now learned about the Streamlit Analytics Dashboard. It’s our real-time “control panel” built with Streamlit, showing live statistics and recent moves from our chess games. We saw how app.py
sets up the page layout, and how “worker threads” in utils_worker.py
continuously fetch processed data from the Flink Analytics Engine to keep the dashboard updated. This provides a clear, visual way to understand what’s happening across our kafka-chess
system.
Next, we’ll dive into the Game Interaction Controller, which acts as the “middleman” between your clicks on the chessboard and the game’s core logic.
Chapter 4: Game Interaction Controller
Welcome back! In Chapter 2: Interactive Chessboard, we explored how the chessboard2.js
library draws the board and lets you click or drag pieces. We also touched on how it sends signals (like onDrop
or onTouchSquare
) when you interact. Then, in Chapter 1: Web Application Backend, we learned how our Flask backend acts as a “post office,” ready to receive these game events and send them to Kafka.
But there’s a missing piece! When you drag a piece on the visual board, how does the system know if it’s a legal chess move? And if it is, how does that legal move get communicated to the backend to be sent to Kafka?
This is where the Game Interaction Controller comes in! Think of it as the “player’s assistant” or the “traffic cop” for your chess moves.
What is the Game Interaction Controller?
The Game Interaction Controller (GIC) is like the central coordinator for your live chess game. It’s not the one drawing the board, and it doesn’t know all the chess rules by itself. Instead, its main jobs are:
- Interpreting Your Actions: It receives raw interactions from the visual chessboard (like “piece moved from E2 to E4”).
- Consulting the Rulebook: It asks the real chess “brain” (the Chess Game Engine) if your proposed move is legal.
- Updating the World: If the move is legal, it tells the visual chessboard to update its display, updates other parts of the game (like showing “checkmate!”), and, crucially, tells the Web Application Backend to send the move information to Kafka.
The chess_actions.js
file is where this “assistant” lives. It’s a JavaScript file that connects all the pieces of our frontend chess game.
Use Case: Making a Valid Chess Move
Let’s use our familiar example: You, the player, try to move a pawn from e2
to e4
. How does the Game Interaction Controller manage this?
- You drag the pawn visually on the board.
- The Interactive Chessboard (
chessboard2.js
) detects this and calls theonDrop
function (which is part of our Game Interaction Controller inchess_actions.js
). - The Game Interaction Controller takes the
e2
ande4
information and asks the Chess Game Engine (chess.js
), “Is moving a pawn frome2
toe4
a legal move right now?” - The Chess Game Engine checks its rules and says, “Yes, that’s a legal move!”
- The Game Interaction Controller then:
- Tells the visual board (
chessboard2.js
) to permanently show the pawn one4
. - Updates the text on the screen (like “White to move” or “Check!”).
- Tells the Web Application Backend to record this move by sending it to Kafka.
- Tells the visual board (
If the move wasn’t legal (e.g., trying to move a pawn sideways), the Game Interaction Controller would tell the visual board to “snap back” the piece to its original square.
How the Game Interaction Controller Works (Simplified)
The chess_actions.js
file contains various functions that act as the Game Interaction Controller. These functions are often “callback” functions that the chessboard2.js
library calls when a user interacts.
1. Connecting to the “Chess Brain” and “Visual Board”
At the very start of our index.html
file, we initialize the Chess
game object and the Chessboard2
visual board:
// In flaskr/templates/index.html (inside <script> tags)
// Create new chess game instance using chess.js library
const game = new Chess(); // This is our Chess Game Engine!
// Chessboard Configuration
const boardConfig = {
draggable: true, // Allow piece dragging
onDragStart, // Function called when drag starts
onDrop, // Function called when piece is dropped
// ... other callbacks
};
// Initialize the visual chessboard
const board = Chessboard2("myBoard", boardConfig); // This is our Interactive Chessboard!
Here:
-
const game = new Chess();
creates an object that understands all the rules of chess. This is our direct link to the Chess Game Engine. -
const board = Chessboard2('myBoard', boardConfig)
creates the visual board. NoticeonDragStart
andonDrop
are listed inboardConfig
. These tellchessboard2.js
which functions to call inchess_actions.js
when the user drags or drops a piece. Theboard
object also has functions likeboard.position()
that the GIC will use to update the visual board.
2. Processing a Drag-and-Drop Move (onDrop
)
When you drag a piece and drop it, chessboard2.js
calls the onDrop
function in chess_actions.js
.
// In flaskr/static/js/chess_actions.js (simplified)
function onDrop(dropEvt) {
// 1. Try to make the move using the Chess Game Engine (chess.js)
const move = game.move({
from: dropEvt.source, // Where the piece came from (e.g., 'e2')
to: dropEvt.target, // Where the piece was dropped (e.g., 'e4')
promotion: "q", // (for simplicity) always promote to a Queen
});
// 2. If the move was legal (chess.js returned a result)
if (move) {
// 3. Tell chessboard2.js to update its visual position
board.position(game.fen()).then(() => {
// 4. Update other game elements (like move history & status)
updatePGN();
updateStatus();
// 5. Send the valid move to the backend for Kafka
createMove(); // Sends move to Flask backend (Chapter 1)
// Make AI move (for demo purposes)
window.setTimeout(makeRandomMove, 250);
});
} else {
// If the move was illegal, snap the piece back to its original square
return "snapback";
}
}
Let’s break down this simplified onDrop
function:
-
function onDrop (dropEvt)
: This function is called bychessboard2.js
. It receivesdropEvt
, which contains details likedropEvt.source
(the starting square like ‘e2’) anddropEvt.target
(the ending square like ‘e4’). -
game.move(...)
: This is the crucial line! It asks the Chess Game Engine (chess.js
) if the move fromdropEvt.source
todropEvt.target
is valid. If it is,game.move()
makes the move internally and returns information about it. If it’s not valid, it returnsnull
. -
if (move)
: This checks if the move was legal.- If
true
(legal):-
board.position(game.fen())
: The GIC tells theboard
object (our Interactive Chessboard) to redraw itself to match the new state of thegame
(represented bygame.fen()
, a special chess notation). -
updatePGN()
andupdateStatus()
: These (other functions inchess_actions.js
) update the text on the page, like the move history or game status. -
createMove()
: This function, also part ofchess_actions.js
, is called to send the move data to our Web Application Backend, which then sends it to Kafka.
-
- If
false
(illegal):return 'snapback'
tellschessboard2.js
to visually return the piece to its original square.
- If
3. Sending the Move to the Backend (createMove
)
The createMove
function is the part of the Game Interaction Controller that talks to our Flask backend.
// In flaskr/static/js/chess_actions.js (simplified)
function createMove() {
const moves = game.history({ verbose: true });
const lastMove = moves[moves.length - 1]; // Get details of the very last move
const key = uuid.v4(); // Generate a unique ID for this move
const player = lastMove.color == "w" ? "Player" : "AI"; // Who made the move?
const moveDetails = `${getPieceName(lastMove.piece)} from ${
lastMove.from
} to ${lastMove.to}`; // Formatted move
// Construct the data to send (as a string, simple for this example)
const value =
'"' +
key +
'","' +
sessionStorage.getItem("gameId") +
'","' +
player +
'","' +
moveDetails +
'","' +
toISOStringLocal(new Date()) +
'"';
console.log(value);
// Send the move data to the backend's /add_move API endpoint using Axios
axios.post("http://127.0.0.1:5000/add_move/" + key + "/" + value);
}
Here:
-
const lastMove = ...
: We get the details of the move that just happened from thegame
object (our Chess Game Engine). -
const key = uuid.v4();
: A unique ID is generated for each move. -
const value = ...
: The GIC formats all the important details about the move (game ID, player, move description, timestamp) into a single string. This is the “letter” we’re going to send. -
axios.post(...)
: This line sends an HTTP POST request to our Web Application Backend. It calls the/add_move
endpoint we saw in Chapter 1: Web Application Backend, passing thekey
andvalue
of the move. The backend then takes this information and publishes it to Kafka.
There’s also an onTouchSquare
function that handles moves made by clicking squares instead of dragging. Its logic is a bit more complex as it manages “pending moves” (where you click a piece, then click a destination square), but it follows the same core idea: ask game.move()
for legality, then update board.position()
and call createMove()
.
Internal Workflow
Let’s visualize the full journey of a legal move with the Game Interaction Controller acting as the central hub:
sequenceDiagram
participant User
participant A as Interactive Chessboard (chessboard2.js)
participant B as Game Interaction Controller (chess_actions.js)
participant C as Chess Game Engine (chess.js)
participant D as Web Application Backend (Flask)
participant E as Kafka Event Bus
User->>A: Drags piece from e2 to e4
A->>B: Calls onDrop(source: 'e2', target: 'e4')
B->>C: Asks game.move({from: 'e2', to: 'e4'})
C-->>B: Returns valid move object
B->>A: Tells board.position(game.fen())
A-->>User: Visually updates board
B->>B: Calls createMove() (formats data)
B->>D: Sends move data via Axios POST to /add_move
D->>E: Produces 'move' event to 'moves' topic
E-->>D: Confirms event received
D-->>B: Sends empty response (acknowledgment)
In this flow:
- User Interacts: You drag a piece on the visual chessboard.
- Interactive Chessboard Notifies:
chessboard2.js
detects the drag-and-drop and calls theonDrop
function in the Game Interaction Controller. - Game Interaction Controller Consults Engine: The
onDrop
function immediately asks thegame
object (the Chess Game Engine) if the move is legal. - Engine Responds: The Chess Game Engine checks its rules and confirms the move is legal, returning a
move
object. - GIC Updates Visuals: Because the move is legal, the Game Interaction Controller tells
chessboard2.js
to visually update the board withboard.position(game.fen())
. - GIC Prepares for Backend: The GIC then calls its
createMove()
function to format the move details. - GIC Sends to Backend: The
createMove()
function uses Axios to send this formatted move data to the/add_move
endpoint of the Web Application Backend. - Backend Publishes to Kafka: The Web Application Backend receives the data and, without processing it, immediately publishes it as a ‘move’ event to the ‘moves’ topic in the Kafka Event Bus.
- Kafka Confirms & Backend Responds: Kafka acknowledges receipt, and the backend sends an “OK” back to the Game Interaction Controller.
This intricate dance ensures that your actions are validated by the game rules, visually reflected on the board, and then reliably sent into our data stream via Kafka.
Conclusion
You’ve now learned about the Game Interaction Controller, residing in chess_actions.js
. This crucial component acts as the “player’s assistant,” mediating between the visual chessboard, the chess game rules, and the backend server. It interprets your moves, validates them with the Chess Game Engine, updates the visual display, and then ensures the move data is sent to the Web Application Backend for Kafka publication.
Next up, we’ll dive deeper into the “brain” of our chess game: the Chess Game Engine itself, to see how it knows all the complex rules of chess!
Chapter 5: Chess Game Engine
Welcome back! In Chapter 4: Game Interaction Controller, we saw how our chess_actions.js
file acts as a “player’s assistant,” taking your clicks and drags from the Interactive Chessboard. But when you try to move a piece, how does our system know if that move is actually allowed by the rules of chess? Can a knight jump over other pieces? Can a pawn move backward? What happens if your king is in danger?
This is where the Chess Game Engine comes in! It’s the ultimate “brain” or “rulebook” of our chess game.
What is the Chess Game Engine?
Think of the Chess Game Engine as the official arbiter of all chess rules. It doesn’t care about how the board looks visually, or how your actions are sent over the internet. Its single, crucial job is to understand and enforce all the rules of chess.
Here’s what it knows:
- Piece Movements: How each type of piece (pawn, knight, bishop, rook, queen, king) is allowed to move.
- Captures: How pieces capture each other.
- Special Moves: Complex rules like castling, en passant, and pawn promotion.
- Game State: What constitutes a check, checkmate, or a draw.
When a player (or the AI) tries to make a move, the Chess Game Engine is asked: “Is this move legal from the current board position?” It then checks all the rules and responds with a clear “Yes, that’s legal!” or “No, that’s not allowed.” If the move is legal, it updates its internal record of the board’s state.
In our kafka-chess
project, this powerful brain is provided by a JavaScript library called chess.js
.
Use Case: Checking if a Move is Legal
Let’s use our familiar example: You, the player, try to move a pawn from e2
to e4
. How does the Chess Game Engine help determine if this is a legal move?
- Your visual drag (from Interactive Chessboard) is picked up by the Game Interaction Controller.
- The Game Interaction Controller then sends the proposed move (“pawn from e2 to e4”) to the Chess Game Engine.
- The Chess Game Engine looks at the current board, applies its deep knowledge of chess rules, and determines if the pawn can indeed move from
e2
toe4
. - It then tells the Game Interaction Controller the verdict.
How the Chess Game Engine Works (Simplified)
The chess.js
library is initialized once when our web page loads, and then the Game Interaction Controller constantly talks to it.
1. Starting a Chess Game Instance
In our index.html
file, right when the page loads, we create a new Chess
object. This is like opening the rulebook and setting up the board to its starting position.
// In flaskr/templates/index.html (simplified)
// Create new chess game instance using chess.js library
const game = new Chess(); // This is our Chess Game Engine!
// ... (other code for chessboard setup)
The const game = new Chess();
line creates an instance of the Chess
object. From this point on, game
is our direct line to the Chess Game Engine’s brain.
2. Asking the Engine to Make a Move
When you drop a piece on the board, the onDrop
function (part of our Game Interaction Controller) is called. This function’s first job is to ask the game
object (our Chess Game Engine) if the move is legal.
// In flaskr/static/js/chess_actions.js (simplified onDrop function)
function onDrop(dropEvt) {
// 1. Try to make the move using the Chess Game Engine (chess.js)
const move = game.move({
from: dropEvt.source, // e.g., 'e2'
to: dropEvt.target, // e.g., 'e4'
promotion: "q", // (simplified) always promote to a Queen
});
// 2. Check if the move was legal
if (move) {
// Move was legal! The engine updated its internal state.
// Now, tell the visual board to update, send to backend, etc.
// ...
} else {
// Move was illegal!
return "snapback"; // Tell the visual board to return the piece
}
}
In this code snippet:
-
game.move({ from: dropEvt.source, to: dropEvt.target, ... })
is how we ask the Chess Game Engine to try to make a move. - If the move is legal,
game.move()
performs the move internally (changes the engine’s record of the board state) and returns amove
object (containing details about the move). - If the move is illegal,
game.move()
does not change the internal board state and returnsnull
. This allows our Game Interaction Controller to decide what to do next (e.g., snap the piece back).
3. Getting the Board’s State
After a legal move is made, the Chess Game Engine’s internal representation of the board changes. To tell the Interactive Chessboard to update its visual display, we need to get the engine’s current board state.
// In flaskr/static/js/chess_actions.js (simplified after a legal move)
// ... inside onDrop function, after game.move() returns a valid 'move'
// 3. Tell chessboard2.js to update its visual position
board.position(game.fen()).then(() => {
// ... (update other elements, send to backend)
});
Here:
-
game.fen()
is a powerful function provided bychess.js
. It returns the current state of the board as a single string using a standard notation called FEN (Forsyth-Edwards Notation). This string contains all the necessary information: piece positions, whose turn it is, castling rights, en passant square, halfmove clock, and fullmove number. - The Interactive Chessboard (
board.position(...)
) then uses this FEN string to redraw the visual board to match the engine’s state.
4. Checking Game Status
The Chess Game Engine also provides functions to check the overall status of the game:
// In flaskr/static/js/chess_actions.js (simplified updateStatus function)
function updateStatus() {
let statusHTML = "";
if (!game.game_over()) {
// Is the game still ongoing?
if (game.in_check()) statusHTML = " is in check! "; // Is someone in check?
// ...
} else if (game.in_checkmate()) {
// Is it checkmate?
statusHTML = "Game over: ... is in checkmate.";
} else if (game.in_stalemate()) {
// Is it a stalemate?
statusHTML = "Game is drawn. ... is stalemated.";
}
// ... other draw conditions like game.in_threefold_repetition(), game.insufficient_material(), game.in_draw()
document.getElementById("gameStatus").innerHTML = statusHTML;
}
Functions like game.game_over()
, game.in_check()
, game.in_checkmate()
, game.in_stalemate()
, etc., are all part of the Chess Game Engine. They are the engine’s way of telling us the current strategic situation of the game. Our updateStatus
function then takes this information and displays it to the player.
Internal Workflow
Let’s visualize how the Chess Game Engine interacts when you try to make a move:
sequenceDiagram
participant User
participant A as Interactive Chessboard
participant B as Game Interaction Controller
participant C as Chess Game Engine
User->>A: Drags piece from e2 to e4
A->>B: Calls onDrop(source: 'e2', target: 'e4')
B->>C: Calls game.move({from: 'e2', to: 'e4'}) - "Is this legal?"
C-->>B: If Legal: Returns move object & updates internal board state
C-->>B: If Illegal: Returns null & internal board state unchanged
alt If move was Legal
B->>A: Calls board.position(game.fen()) - "Update visual!"
A-->>User: Visually updates board
else If move was Illegal
B-->>A: Returns 'snapback' - "Put piece back!"
A-->>User: Piece snaps back to original square
end
In this flow:
- User initiates move: You drag a piece on the visual board.
- Interactive Chessboard notifies:
chessboard2.js
(Interactive Chessboard) detects the drag and drop and calls theonDrop
function inchess_actions.js
(Game Interaction Controller). - Game Interaction Controller asks the Engine: The
onDrop
function callsgame.move()
on theChess
object, providing the source and target squares. This is the moment it asks the Chess Game Engine: “Is this move allowed?” - Chess Game Engine processes: The
chess.js
library (Chess Game Engine) performs all its complex rule checks.- If Legal: It updates its own internal board state to reflect the move and returns a
move
object to the Game Interaction Controller. - If Illegal: It leaves its internal board state unchanged and returns
null
to the Game Interaction Controller.
- If Legal: It updates its own internal board state to reflect the move and returns a
- Game Interaction Controller reacts:
- If Legal: It then tells the Interactive Chessboard to visually update using
board.position(game.fen())
, as the engine’s internal state has changed. - If Illegal: It tells the Interactive Chessboard to ‘snapback’ the piece, as the engine did not accept the move.
- If Legal: It then tells the Interactive Chessboard to visually update using
Diving Deeper into chess.js
(The Engine’s Core)
While we don’t write the chess.js
library ourselves, it’s helpful to know that it contains thousands of lines of JavaScript code specifically designed to handle every possible chess scenario. It’s a highly optimized and thoroughly tested set of algorithms.
For example, when game.move()
is called, internally, the chess.js
library might perform steps like:
-
generate_moves()
: Calculates all pseudo-legal moves for the current player (moves that follow piece movement rules, but might leave the king in check). -
attacked()
: Checks if a specific square is currently under attack by the opposing player’s pieces. This is crucial for determining if a king is in check. -
king_attacked()
: A special check to see if the current player’s king is attacked after a move. If so, that move is illegal. -
make_move()
: If a move is legal, this function actually updates the internal 0x88 board representation (a clever waychess.js
stores the board) by moving the piece, handling captures, castling, en passant, etc. -
undo_move()
: A vital function for “trying out” moves to see if they result in check, and then reverting the board state.
You can see these functions referenced in the flaskr/static/js/chess.js
file, though their internal logic is quite complex. The key takeaway is that chess.js
handles all the intricate details so we don’t have to! We just use its public functions like game.move()
, game.fen()
, and game.in_check()
.
Conclusion
You’ve now learned about the Chess Game Engine, powered by the chess.js
library. It’s the “brain” of our application, understanding and enforcing all the rules of chess. It validates moves, updates the game state, and tells us whether the game is in check, checkmate, or a draw. By using chess.js
, we can build a functional chess game without having to implement the complex rules ourselves.
Next, we’ll explore how all the different parts of our kafka-chess
system communicate with each other using a central “post office” called the Kafka Event Bus!
Chapter 6: Kafka Event Bus
Welcome back! In Chapter 5: Chess Game Engine, we learned how chess.js
acts as the “brain” of our game, understanding all the complex rules. We also saw in Chapter 4: Game Interaction Controller how it validates your moves, and if a move is legal, it tells the Web Application Backend to record it.
But how does this “recording” work behind the scenes? And more importantly, if the backend just “records” a move, how do other parts of our system, like the Streamlit Analytics Dashboard, know about that move instantly without being directly connected to the game? This is a challenge!
Imagine you’re trying to send a letter to many different friends, but you don’t want to write a separate letter for each one. You want a single place where you can drop off your letter, and then anyone who cares about what you wrote can pick it up. This is exactly the problem that the Kafka Event Bus solves!
What is the Kafka Event Bus?
The Kafka Event Bus is like a super-efficient central data highway or a high-tech post office for all the real-time events happening in our kafka-chess
system. When something important happens – like a player making a move, or a game starting or ending – this information is immediately sent to Kafka.
Its main jobs are:
- Collecting Events (Producing): It receives messages from various parts of our application (like the Web Application Backend when a move is made). Think of this as putting a letter into a specific mailbox.
- Organizing Events (Topics): It neatly stores these messages in different “topics.” A topic is like a specific mail slot or department in the post office. All chess moves go into a
moves
topic, and game start/end events go into agames
topic. - Broadcasting Events (Consuming): It allows different parts of the application (like the Flink Analytics Engine or the Live Data Update Workers) to “subscribe” to these topics and automatically receive the messages as they happen. This means senders don’t need to know who the receivers are!
This “bus” or “post office” setup makes our system very flexible. Each part can focus on its own job without worrying about how to directly talk to every other part.
Use Case: Broadcasting a Chess Move
Let’s use our familiar example: You, the player, move a pawn from e2
to e4
.
- The Game Interaction Controller validates the move and tells the Web Application Backend about it.
- The Web Application Backend then immediately sends this move information as a “message” (also called an “event”) to the Kafka Event Bus. It sends it to the
moves
topic. - Now, the Kafka Event Bus holds onto this message.
- The Flink Analytics Engine (which is always “listening” to the
moves
topic) receives this message and processes it for statistics. - The Streamlit Analytics Dashboard (which gets its data from Flink) then updates its “Recent Moves” section to show your move!
Notice that the Web Application Backend never directly talks to Flink or Streamlit. It just drops the message into Kafka, and Kafka handles the rest.
How the Kafka Event Bus Works (Simplified)
Kafka isn’t a part of our Python or JavaScript code directly. Instead, it’s a powerful separate program that runs in the background. We tell Docker (a tool that helps run programs in isolated “containers”) to start Kafka for us.
1. Setting Up the Kafka “Post Office” (Configuration)
We define Kafka as a service in our docker-compose.yml
file. This file tells Docker how to build and run all the different parts of our system, including Kafka.
# In kafka/docker-compose.yml (simplified)
services:
broker:
image: apache/kafka:latest # Use the official Kafka image
hostname: broker
container_name: broker
ports:
- "9092:9092" # This makes Kafka available on your computer at port 9092
environment:
# ... other Kafka settings (like where it stores data, how it communicates)
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092"
In this snippet:
-
image: apache/kafka:latest
tells Docker to download and use the latest version of Kafka. -
ports: - '9092:9092'
is very important! It means that whatever traffic goes to port9092
on your computer will be sent to port9092
inside the Kafka container. This is how our other applications (like Flask) can connect to Kafka usinglocalhost:9092
. -
KAFKA_ADVERTISED_LISTENERS
is like Kafka announcing its address so other applications know where to send messages. Here,localhost:9092
is the address other applications outside the Docker network will use.
This docker-compose.yml
file effectively sets up the entire “post office building” for our event bus.
2. Sending Messages to Kafka (Producing)
As we saw briefly in Chapter 1: Web Application Backend, our Flask backend is responsible for “producing” (sending) messages to Kafka.
First, the Flask application needs to connect to Kafka by creating a Producer
object.
# In flaskr/__init__.py (simplified)
from confluent_kafka import Producer
KAFKA_SERVER = "localhost:9092" # The address of our Kafka "post office"
config = {
"bootstrap.servers": KAFKA_SERVER, # Tell the producer where Kafka is
"acks": "all", # Make sure messages are safely delivered
}
# Create the Producer instance, our "post office worker"
producer = Producer(config)
Here:
-
Producer(config)
creates a special object that knows how to send messages to Kafka. -
bootstrap.servers
is the key setting that tells the producer where to find Kafka (using thelocalhost:9092
address we exposed indocker-compose.yml
).
Once the producer is set up, the Flask backend can send a message whenever a move or game event happens.
# In flaskr/__init__.py (simplified add_move route)
@app.route("/add_move/<key>/<value>", methods=["POST"])
def add_move(key, value):
topic = "moves" # Our "mailbox" for chess moves
# Use our Kafka producer to send the message
# value is the actual move data, key is a unique ID
producer.produce(topic, value, key)
# Make sure the message is sent before continuing
producer.poll(10000)
producer.flush()
return "" # Send an empty response back to the frontend
In this code:
-
topic = "moves"
: This specifies which “mailbox” or “department” in Kafka this message belongs to. -
producer.produce(topic, value, key)
: This is the critical line! It tells ourproducer
to put thevalue
(the move details) into themoves
topic
, usingkey
as a unique identifier for that message. -
producer.poll()
andproducer.flush()
: These lines ensure the message is actually sent out and confirmed by Kafka before the backend finishes its job.
The add_game
route works in a very similar way, but it sends game start/end events to the games
topic instead.
3. Receiving Messages from Kafka (Consuming)
Other parts of our application, like the Flink Analytics Engine, are designed to “consume” (read) messages from Kafka topics. They act like subscribers who get new mail as soon as it arrives in their specific mailbox.
In our system, the Flink Analytics Engine uses powerful SQL-like commands to connect to Kafka and start reading the data stream.
# In streamlit/utils/utils_table_env.py (simplified)
class TableEnvCustomized:
# ... (initialization of Flink table environment)
def create_source_tables(self):
"""Create Kafka source tables for games and moves data."""
self.table_env.execute_sql(
f"""
CREATE TABLE games (
game_id VARCHAR
,start_time TIMESTAMP(3)
,end_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka', # Tell Flink to connect to Kafka
'topic' = 'games', # Which Kafka topic to read from
'properties.bootstrap.servers' = 'localhost:9092', # Kafka's address
'scan.startup.mode' = 'timestamp', # Start reading from a specific time
'value.format' = 'csv' # How the messages are formatted
)
"""
)
self.table_env.execute_sql(
f"""
CREATE TABLE moves (
move_id VARCHAR
,game_id VARCHAR
,player VARCHAR
,move VARCHAR
,move_time TIMESTAMP(0)
) WITH (
'connector' = 'kafka',
'topic' = 'moves',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'timestamp',
'value.format' = 'csv'
)
"""
)
Here:
-
CREATE TABLE games (...) WITH (...)
: This is a special SQL command telling Flink to set up a “virtual table” that is actually linked to a Kafka topic. -
'connector' = 'kafka'
: This explicitly tells Flink, “I want to connect to Kafka!” -
'topic' = 'games'
(or'moves'
): This tells Flink which specific Kafka topic to listen to. Flink will only receive messages from this topic. -
'properties.bootstrap.servers' = 'localhost:9092'
: Again, this is the address where Flink knows to find our Kafka “post office.” -
'value.format' = 'csv'
: This tells Flink how to understand the data inside the messages (we’re sending them as simple comma-separated values).
Once these “source tables” are created, Flink can then start reading the incoming messages as a continuous stream of data, ready to perform analytics on them.
Internal Workflow
Let’s visualize how a chess move travels through our system, with Kafka as the central hub:
sequenceDiagram
participant B as Web Application Backend (Flask)
participant K as Kafka Event Bus
participant F as Flink Analytics Engine
participant S as Streamlit Analytics Dashboard
B->>K: Publishes 'move' event to 'moves' topic (Producer)
K-->>B: Acknowledges receipt
K->>F: Streams 'move' events from 'moves' topic (Consumer)
F->>F: Processes and aggregates move data
F->>S: Provides processed data to Streamlit workers
S-->>User: Displays updated "Recent Moves"
In this flow:
- Backend Publishes: When you make a move, the Web Application Backend sends that move as a message (an “event”) to the Kafka Event Bus, specifically to the
moves
topic. - Kafka Acknowledges: Kafka confirms it has received and stored the message safely.
- Flink Consumes: The Flink Analytics Engine is constantly “listening” to the
moves
topic in Kafka. As soon as a new message arrives, Flink automatically picks it up. - Flink Processes: Flink then uses its powerful capabilities to process this raw move data (e.g., counting moves, calculating statistics).
- Flink Feeds Streamlit: Flink makes its processed data available to the Streamlit Analytics Dashboard through its workers.
- Streamlit Displays: The Streamlit Analytics Dashboard updates its display with the latest information, including your recent move.
This is the beauty of the event bus: each part of the system is loosely connected. The backend doesn’t need to know about Flink or Streamlit; it just knows how to drop messages onto the Kafka highway. And Flink and Streamlit don’t need to know about the backend; they just know how to pick up messages from Kafka. This makes the system robust and easy to expand.
Conclusion
You’ve now learned about the Kafka Event Bus, which acts as the central “data highway” for all real-time events in our kafka-chess
system. We saw how it’s configured using docker-compose.yml
, how the Web Application Backend sends messages (produces them) to specific Kafka topics, and how other parts like the Flink Analytics Engine read messages (consume them) from these topics. This event-driven architecture makes our system scalable, flexible, and real-time.
Next, we’ll dive into the Flink Analytics Engine to see how it takes the raw data from Kafka and turns it into useful insights for our analytics dashboard!
Chapter 7: Flink Analytics Engine
Welcome back! In Chapter 6: Kafka Event Bus, we learned how our chess system uses Kafka as a central “data highway” to send real-time information about chess moves and game starts/ends. So now, all this raw event data is flowing into Kafka topics like moves
and games
.
But what’s the point of having all this data if we can’t understand it? How do we take that raw stream of “player A moved pawn from e2 to e4” and turn it into useful insights like: “There are currently 5 active games,” or “Here are the last 10 moves made across all games”? We need a powerful tool to continuously analyze this live stream of data.
This is where the Flink Analytics Engine comes in!
What is the Flink Analytics Engine?
Imagine you have a super-smart data scientist who works tirelessly, day and night, watching every single piece of information that flows through Kafka. This data scientist’s job isn’t to play chess, but to constantly look for patterns, count things, and summarize what’s happening right now in all the games.
The Flink Analytics Engine is our project’s version of that tireless data scientist. It uses Apache Flink, a powerful tool designed for real-time data processing.
Its main jobs are:
- Listening to Kafka (Data Ingestion): It continuously connects to our Kafka Event Bus and reads every new chess move and game event as soon as it appears.
- Processing Data (Analysis): It then takes this raw stream of events and applies special “recipes” (called queries) to count, filter, and aggregate the data. For example, it can count how many games are currently in progress or list the most recent moves.
- Providing Insights (Results): It makes these live statistics and processed data available for other parts of our system, particularly the Streamlit Analytics Dashboard.
In our project, the TableEnvCustomized
class (found in streamlit/utils/utils_table_env.py
) represents this “data scientist.” It’s the part of our code that sets up and uses Apache Flink.
Use Case: Calculating Live Chess Statistics
Let’s say you’re looking at the Streamlit Analytics Dashboard and you see numbers like “Active Games: 3” or a list of “Recent Moves.” How does the system know these numbers and lists in real-time?
The Flink Analytics Engine is constantly doing the work:
- It’s always connected to Kafka, receiving every new move and game event.
- It runs continuous calculations in the background:
- “How many unique game IDs have started but not yet ended?” (for active games)
- “What are the last N entries in the
moves
stream?” (for recent moves)
- It then provides these updated results to the dashboard, ensuring you always see the latest information.
How the Flink Analytics Engine Works (Simplified)
The core of our Flink Analytics Engine is the TableEnvCustomized
class, which uses PyFlink (the Python API for Apache Flink) to interact with our data streams.
1. Setting Up the “Data Scientist’s Workbench”
Before Flink can start analyzing, it needs to be configured. This involves setting up a TableEnvironment
(Flink’s main workplace) and telling it where to find special connector files (JARs) that allow it to talk to Kafka.
# In streamlit/utils/utils_table_env.py (simplified __init__)
from pyflink.table import EnvironmentSettings, TableEnvironment
import os # For accessing environment variables
class TableEnvCustomized:
def __init__(self):
# Create a streaming table environment - this is Flink's main workspace
self.table_env = TableEnvironment.create(
EnvironmentSettings.in_streaming_mode()
)
# Tell Flink where to find the Kafka connector
self.table_env.get_config().set(
"pipeline.jars",
f"file:////{os.getenv('ABSOLUTE_PATH_TO_JAR_FILE')}",
)
In this code:
-
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
creates the special Flink environment designed for continuous, real-time data processing (streaming). -
self.table_env.get_config().set("pipeline.jars", ...)
is a crucial line. It tells Flink, “To connect to Kafka, you’ll need this special tool (JAR file).” This JAR file is downloaded separately and its path is set as an environment variable in our setup.
2. Teaching Flink About Kafka Data (“Source Tables”)
Next, our Flink “data scientist” needs to understand the structure of the data coming from Kafka. We do this by defining “virtual tables” that are actually connected to our Kafka topics. Think of this as giving Flink a “schema” or a “blueprint” for the data it’s about to receive.
# In streamlit/utils/utils_table_env.py (simplified create_source_tables)
def create_source_tables(self):
"""Create Kafka source tables for games and moves data."""
self.table_env.execute_sql(
f"""
CREATE TABLE games (
game_id VARCHAR
,start_time TIMESTAMP(3)
,end_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka', # Tell Flink: use the Kafka connector
'topic' = 'games', # Listen to the 'games' Kafka topic
'properties.bootstrap.servers' = 'localhost:9092', # Kafka's address
'scan.startup.mode' = 'timestamp', # Start reading from a specific time
'value.format' = 'csv' # Data format (simple comma-separated)
)
"""
)
self.table_env.execute_sql(
f"""
CREATE TABLE moves (
move_id VARCHAR
,game_id VARCHAR
,player VARCHAR
,move VARCHAR
,move_time TIMESTAMP(0)
) WITH (
'connector' = 'kafka',
'topic' = 'moves',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'timestamp',
'value.format' = 'csv'
)
"""
)
In this code:
-
CREATE TABLE ... WITH (...)
is a SQL-like command that registers a new “table” within Flink. But this isn’t a normal table; it’s a streaming table that represents our Kafka data. -
'connector' = 'kafka'
explicitly tells Flink to use the Kafka connector we linked earlier. -
'topic' = 'games'
(or'moves'
) specifies which Kafka topic Flink should continuously read messages from. -
'properties.bootstrap.servers' = 'localhost:9092'
is the address of our Kafka Event Bus. -
'value.format' = 'csv'
indicates that the data arriving in Kafka is in a simple comma-separated format.
After these commands run, Flink is now actively listening to both the games
and moves
Kafka topics, ready to process the data as it arrives.
3. Asking Flink for Real-Time Insights (Queries)
Once Flink is connected and understands the data, we can ask it questions (queries) about the live streams. The TableEnvCustomized
class has methods to get specific insights.
Here’s how we get the count of active and completed games:
# In streamlit/utils/utils_table_env.py (simplified get_games_stats)
def get_games_stats(self):
"""Get count of active and completed games."""
with self.table_env.execute_sql(
"""
SELECT
SUM(CASE WHEN record_count = 1 THEN 1 ELSE 0 END) active_game_count
,SUM(CASE WHEN record_count = 2 THEN 1 ELSE 0 END) completed_game_count
FROM (
SELECT game_id, COUNT(*) record_count FROM games
GROUP BY game_id
)
"""
).collect() as results:
for result in results:
yield (result) # Continuously yield new results
Explanation:
-
self.table_env.execute_sql(...)
tells Flink to run a SQL query. - The inner
SELECT game_id, COUNT(*) record_count FROM games GROUP BY game_id
counts how many events (start_time
andend_time
) exist for eachgame_id
. A game that only has astart_time
will haverecord_count = 1
(an active game). A game that also has anend_time
will haverecord_count = 2
(a completed game). - The outer
SELECT SUM(CASE WHEN record_count = 1 THEN 1 ELSE 0 END) ...
then sums these up to give us the total active and completed game counts. -
.collect()
starts a continuous stream of results. As soon as a new game starts or ends (and new data arrives in Kafka), Flink re-calculates, and a newresult
is sent. -
yield (result)
means this function doesn’t return just one answer; it keeps providing new answers as they become available.
Similarly, to get the list of recent moves, we simply select from the moves
table:
# In streamlit/utils/utils_table_env.py (simplified get_moves)
def get_moves(self):
"""Get all chess moves data formatted for display."""
with self.table_env.execute_sql(
"SELECT game_id, player, move, move_time FROM moves"
).collect() as results:
moves = []
for result in results:
# Filter out initial dummy records
if result[1] != "dummy":
moves.append(result)
# Convert to a Pandas DataFrame for Streamlit display
df = pd.DataFrame(
moves, columns=["Game ID", "Player", "Move", "Move Timestamp"]
)
yield (df) # Continuously yield new DataFrames
This get_moves()
function simply asks Flink to give it all the data from the moves
Kafka stream. As new moves arrive, Flink adds them to the stream, and our function receives them, bundles them into a Pandas DataFrame, and yield
s them. The Streamlit Analytics Dashboard (Live Data Update Workers specifically) then receives these DataFrames and updates the display.
Internal Workflow
Let’s visualize how the Flink Analytics Engine acts as the central processor, taking data from Kafka and preparing it for the dashboard:
sequenceDiagram
participant K as Kafka Event Bus
participant F as Flink Analytics Engine (TableEnvCustomized)
participant W as Live Data Update Workers
participant S as Streamlit Analytics Dashboard
K->>F: Streams new 'move' & 'game' events continuously
loop Real-time Analysis
F->>F: Flink executes continuous SQL queries (e.g., SUM, COUNT, SELECT)
F->>F: Aggregates and processes data
W->>F: Worker calls Flink's get_games_stats() or get_moves()
F-->>W: Flink yields updated results (e.g., (3, 2) or DataFrame of moves)
W->>S: Worker updates Streamlit UI with new data
end
In this flow:
- Kafka Streams Events to Flink: The Kafka Event Bus continuously sends new game and move events to the Flink Analytics Engine. Flink is always listening.
- Flink Processes Continuously: Flink uses the “source tables” and “queries” we defined to constantly re-calculate statistics and update the list of recent moves as new data arrives. This happens automatically and in real-time.
- Workers Request Data: The Live Data Update Workers (running as part of our Streamlit Analytics Dashboard) periodically call
TableEnvCustomized
’sget_games_stats()
andget_moves()
methods. - Flink Yields Results: Because these Flink queries are continuous streams, Flink
yield
s (provides) the latest calculated results to the workers. - Workers Update Dashboard: The workers then take these fresh results and use Streamlit commands to update the metrics and tables on the Streamlit Analytics Dashboard.
This continuous cycle is what makes our dashboard “live” and constantly updated with the latest chess activity.
Conclusion
You’ve now learned about the Flink Analytics Engine, which is the “data scientist” of our kafka-chess
project. It uses Apache Flink to continuously read raw game and move data from the Kafka Event Bus, apply powerful SQL-like queries to analyze it in real-time, and make the processed insights available for our dashboard. This allows us to see live statistics like active games and recent moves, transforming raw events into meaningful information.
Next up, we’ll dive into the Live Data Update Workers, which are the actual “reporters” that fetch this processed data from Flink and display it on the Streamlit Analytics Dashboard!
Chapter 8: Live Data Update Workers
Welcome back! In Chapter 7: Flink Analytics Engine, we learned how Apache Flink acts as our “data scientist,” tirelessly analyzing the raw game and move data flowing through the Kafka Event Bus and preparing useful insights. But once Flink has calculated “3 active games” or “the last 5 moves,” how does that information actually get onto your screen in the Streamlit Analytics Dashboard and keep updating automatically?
Imagine you’re watching a news channel that needs to show live scores or breaking news. You don’t want to press a “refresh” button every few seconds, right? You want the numbers and headlines to update by themselves, instantly. This is exactly the job of the Live Data Update Workers!
What are the Live Data Update Workers?
The Live Data Update Workers are like our dashboard’s very own “auto-refresh crew” or “dedicated news reporters.” They are special Python programs that run quietly in the background, making sure the Streamlit Analytics Dashboard always shows the most current information.
Their main jobs are:
- Continuous Fetching: They regularly ask the Flink Analytics Engine for the very latest processed data (like updated game counts or new moves).
- Targeted Updating: Each worker is responsible for updating a specific part of the Streamlit dashboard. One worker might update the “Game Statistics” numbers, another updates the “Recent Moves” list, and so on.
- Real-Time Display: By continuously fetching and updating, they ensure that the dashboard remains live and dynamic, without you ever needing to click a refresh button.
These workers are implemented as WorkerThread
classes in the streamlit/utils/utils_worker.py
file. They use Python’s threading
feature to run multiple tasks at the same time, keeping your dashboard responsive.
Use Case: Keeping the Dashboard Live
Let’s say you’re watching the Streamlit Analytics Dashboard. You see the “Active Games” count, “Games Completed Today,” and a “Recent Moves” table. How do these constantly update?
- One Live Data Update Worker (let’s call it Worker A) is constantly asking the Flink Analytics Engine for the latest game statistics.
- As soon as Flink has new stats (e.g., a game starts or ends), Worker A receives them.
- Worker A immediately tells Streamlit to update the “Active Games” and “Games Completed Today” numbers on the dashboard.
- At the same time, another Live Data Update Worker (Worker B) is doing the same for recent moves, fetching them from Flink and updating the “Recent Moves” table on the dashboard.
This happens continuously, giving you a smooth, real-time view of all the chess activity.
How the Live Data Update Workers Work (Simplified)
The core streamlit/app.py
script starts these workers when the dashboard loads. The workers then run independently in the background.
1. Starting the “Auto-Refresh Crew”
In streamlit/app.py
, after setting up the page layout and connecting to Flink, we call a special function run_workers
to kick off all the data update threads.
# In streamlit/app.py (simplified)
import streamlit as st
from utils.utils_table_env import TableEnvCustomized
from utils.utils_worker import run_workers # Import our worker startup function
# ... (page layout code)
# Initialize PyFlink table environment (our Flink "data scientist")
table_env = TableEnvCustomized()
table_env.create_source_tables() # Set up connections to data sources
# Start the data processing workers
try:
run_workers(col1, col2, col3, col4, table_env) # Pass columns and Flink connection
except Exception:
st.info("Unable to fetch live data.")
Here:
-
run_workers(...)
is the key function that creates and starts our “reporter” threads. - It passes the
col1
,col2
,col4
(which are specific sections of our Streamlit dashboard page) to the workers, so each worker knows where on the dashboard to put its updates. - It also passes
table_env
(our connection to the Flink Analytics Engine), so workers can ask Flink for data.
2. What a Worker Does (Inside the run
method)
Each WorkerThread
class (like WorkerThread1
for game stats or WorkerThread2
for recent moves) inherits from Python’s Thread
class. This allows them to run in the background. The most important part of any worker is its run
method.
Let’s look at a simplified WorkerThread1
which updates game statistics:
# In streamlit/utils/utils_worker.py (simplified)
import time
from threading import Thread # For running tasks in the background
import streamlit as st # To update the Streamlit UI
class WorkerThread1(Thread):
"""Worker thread for displaying game statistics metrics."""
def __init__(self, delay, target, table_env):
super().__init__()
self.delay = delay # Initial wait time
self.target = target # The Streamlit column (e.g., col1) to update
self.table_env = table_env # Our Flink "data scientist" connection
def run(self):
"""Execute the worker thread to display game statistics."""
time.sleep(self.delay) # Wait a bit before starting (prevents clutter)
# Ask the Flink "data scientist" for the live game stats stream
stream = self.table_env.get_games_stats()
# Loop forever, getting new data as it comes in from Flink
for chunk in stream:
# Use the target column to display the data
with self.target.container():
st.metric("Active Games", chunk[0]) # Update "Active Games"
st.metric("Games Completed Today", chunk[1]) # Update "Games Completed"
In this code:
-
class WorkerThread1(Thread):
declares this class as a background task. -
self.target
is the specific placeholder in the Streamlit UI (e.g.,col1.empty()
) that this worker will update.col1.empty()
creates an empty container that the worker can fill and re-fill. -
self.table_env.get_games_stats()
is how this worker asks the Flink Analytics Engine for the live game statistics. Remember from Chapter 7 that Flink’scollect()
method returns a continuousstream
of results. -
for chunk in stream:
This loop is crucial! It continuously waits for new data to arrive from Flink. As soon as Flink re-calculates and sends updated stats, this loop gets achunk
of data. -
with self.target.container():
ensures that the Streamlit commands (st.metric
) display their content inside the specific column (col1
) assigned to this worker. -
st.metric("Active Games", chunk[0])
displays a nice, big number (a “metric”) labeled “Active Games” using the first piece of data (chunk[0]
) from Flink.
WorkerThread2
works similarly but fetches recent moves and displays them as a table using st.dataframe
.
# In streamlit/utils/utils_worker.py (simplified WorkerThread2)
class WorkerThread2(Thread):
"""Worker thread for displaying chess moves data table."""
def __init__(self, delay, target, table_env):
super().__init__()
self.delay = delay
self.target = target
self.table_env = table_env
def run(self):
time.sleep(self.delay)
stream = self.table_env.get_moves() # Get stream of recent moves from Flink
for chunk in stream:
# Display moves data as a scrollable table in the target column
self.target.container().dataframe(chunk, height=400)
Here, self.table_env.get_moves()
asks Flink for the list of recent moves, and self.target.container().dataframe(chunk, height=400)
takes the received chunk
(which is a table of move data) and displays it nicely as a scrollable table within its assigned column (col2
).
Internal Workflow
Let’s visualize how the Live Data Update Workers continuously bring fresh data to your dashboard:
sequenceDiagram
participant User
participant SA as Streamlit App (app.py)
participant W as Live Data Update Workers (utils_worker.py)
participant FA as Flink Analytics Engine (TableEnvCustomized)
User->>SA: Opens Streamlit Dashboard
SA->>W: Calls run_workers() to start Worker Threads
loop Continuous Dashboard Updates
W->>FA: Worker requests latest processed data (e.g., get_games_stats())
FA-->>W: Flink yields updated results (e.g., (3 active, 2 completed))
W->>SA: Worker uses st.metric() or st.dataframe() to update its target section
SA-->>User: User sees updated numbers/tables on dashboard
end
In this flow:
- User Opens Dashboard: You navigate to the Streamlit app in your web browser.
- Streamlit App Starts Workers: The
app.py
script runs, sets up the page, and then callsrun_workers
to start the background worker threads. - Workers Request Data from Flink: Each “worker thread” (
WorkerThread1
,WorkerThread2
, etc.) continuously asks the Flink Analytics Engine (via thetable_env
object) for the latest processed data related to its specific task. - Flink Provides Data: The Flink Analytics Engine (which is always processing data from Kafka)
yield
s (provides) the latest updated results to the waiting workers. - Workers Update Dashboard UI: Each worker takes the fresh data it received and uses Streamlit commands (
st.metric
,st.dataframe
) to update the specific sections of the web page it’s responsible for. Streamlit then re-renders those parts of the page. - User Sees Updates: You see the numbers and tables on your dashboard change in real-time, reflecting the latest activity across all games!
The Importance of add_script_run_ctx
There’s one crucial detail when making threads update a Streamlit app. Streamlit usually runs in a single thread. For background threads to be able to tell Streamlit to update the UI, they need a special “permission slip.” This is handled by the add_script_run_ctx
function.
# In streamlit/utils/utils_worker.py (simplified run_workers)
from streamlit.runtime.scriptrunner_utils.script_run_context import (
add_script_run_ctx, get_script_run_ctx,
)
def run_workers(col1, col2, col3, col4, table_env):
"""Initialize and start all worker threads for the Streamlit dashboard."""
threads = [
WorkerThread1(1.2, col1.empty(), table_env), # Stats worker
WorkerThread2(1.1, col2.empty(), table_env), # Moves worker
WorkerThread4(1.3, col4.empty()), # Timestamp worker
]
# Start all threads
for thread in threads:
# Crucial: This tells Streamlit that this background thread
# is allowed to update the Streamlit page!
add_script_run_ctx(thread, get_script_run_ctx())
thread.start()
# Wait for all threads to complete (they run forever in this case)
for thread in threads:
thread.join()
Here:
-
get_script_run_ctx()
gets the special “permission slip” from Streamlit’s main thread. -
add_script_run_ctx(thread, get_script_run_ctx())
then attaches this permission slip to each of ourWorkerThread
objects before they start. Without this, Streamlit would not allow the threads to modify the dashboard, and your page would never update! -
thread.start()
actually begins the execution of the worker’srun()
method in a separate thread. -
thread.join()
makes the main Streamlit program wait for these threads indefinitely, ensuring the dashboard keeps running and updating.
Conclusion
You’ve now learned about the Live Data Update Workers! These WorkerThread
classes are the “auto-refresh crew” for our Streamlit Analytics Dashboard. They run in separate background threads, continuously fetching the latest analytical results from the Flink Analytics Engine and using Streamlit commands to update specific sections of the UI. This ensures that our dashboard always displays real-time data without you needing to lift a finger!
This concludes our journey through the kafka-chess
project’s architecture! You’ve seen how each component, from the chessboard to the data analytics, plays a vital role in creating a powerful, real-time chess application.
Generated by AI Codebase Knowledge Builder
Afterthoughts
I took about 3 weeks to complete this project. At the end of this project, I felt satisfied that I had learned a lot: Kafka, ChessboardJS, Flink, Streamlit (multithreading), Pocketflow (to generate the chapters above). I also discovered that it is difficult to setup own Kafka server on-cloud using EC2 as the node frequently disconnected from the controller. The managed Kafka service is also expensive as of now.
Lessons Learned
- How to setup Kafka server locally
- How to create a chess web app
- How does Kafka works
- How to setup Flink to connect to Kafka
- How to create a real-time application using Streamlit
- How to generate tutorials using Pocketflow
Future Enhancements
- Update the interactive chessboard app AI to be based on machine learning instead of random
- Add online learning for the chessboard AI