Flask apps usually feel like a single, synchronous conversation. You ask for something, it gives it back. But what if you want to do something after a request, without making the user wait? Or trigger an action in a completely different part of your app based on what just happened? That’s where event-driven architecture comes in, and we can build it into Flask using signals and queues.
Let’s see it in action. Imagine a simple Flask app that handles user signups. When a user signs up, we want to do a few things: send a welcome email, update a marketing list, and maybe even trigger a background job to generate a user profile. Doing all of this synchronously in the request handler would make signup slow.
from flask import Flask, request, jsonify
from blinker import Namespace
from redis import Redis
import json
import uuid
app = Flask(__name__)
signals = Namespace()
# Define signals
user_signed_up = signals.signal('user-signed-up')
# Redis connection for queuing
redis_client = Redis(host='localhost', port=6379, db=0)
@app.route('/signup', methods=['POST'])
def signup():
data = request.get_json()
user_id = str(uuid.uuid4())
username = data.get('username')
email = data.get('email')
# Simulate user creation
print(f"User created: {username} ({email}) with ID: {user_id}")
# Send a signal that the user has signed up
user_signed_up.send(app, user_id=user_id, username=username, email=email)
return jsonify({"message": "Signup successful", "user_id": user_id}), 201
# --- Signal Receivers ---
@user_signed_up.connect
def send_welcome_email(sender, **extra):
user_id = extra['user_id']
email = extra['email']
print(f"Queuing welcome email for {email} (User ID: {user_id})...")
# In a real app, this would enqueue a task for an email worker
redis_client.lpush('email_queue', json.dumps({
'type': 'welcome_email',
'user_id': user_id,
'email': email
}))
@user_signed_up.connect
def update_marketing_list(sender, **extra):
user_id = extra['user_id']
email = extra['email']
print(f"Queuing marketing list update for {email} (User ID: {user_id})...")
# In a real app, this would enqueue a task for a marketing worker
redis_client.lpush('marketing_queue', json.dumps({
'type': 'add_to_list',
'user_id': user_id,
'email': email
}))
# --- Background Worker Simulation ---
# In a real system, these would be separate processes/services
def process_email_queue():
while True:
message = redis_client.brpop('email_queue', timeout=1)[1]
if message:
task = json.loads(message)
print(f"Processing email task: {task}")
# Simulate sending email
print(f"Sent welcome email to {task['email']}")
redis_client.incr('emails_sent')
def process_marketing_queue():
while True:
message = redis_client.brpop('marketing_queue', timeout=1)[1]
if message:
task = json.loads(message)
print(f"Processing marketing task: {task}")
# Simulate adding to marketing list
print(f"Added {task['email']} to marketing list")
redis_client.incr('marketing_list_updates')
if __name__ == '__main__':
import threading
# Simulate background workers
email_worker = threading.Thread(target=process_email_queue, daemon=True)
marketing_worker = threading.Thread(target=process_marketing_queue, daemon=True)
email_worker.start()
marketing_worker.start()
print("Starting Flask app...")
app.run(debug=True)
This example uses blinker for signals, which are like in-app event notifications. When user_signed_up.send() is called, any function decorated with @user_signed_up.connect will be executed. This is great for decoupling components within the same process.
However, for tasks that are time-consuming or need to run independently of the Flask request cycle, we need a queue. Here, Redis acts as a simple message broker. When a user signs up, we not only send a signal but also push messages onto Redis lists (email_queue, marketing_queue). These messages represent tasks that external worker processes (simulated here with threads) will pick up and execute. The Flask app remains fast because it only needs to enqueue the work, not perform it.
The core idea is to separate what happened (the event) from what should be done about it (the reactions). Signals handle reactions within the same application process, allowing different modules to communicate without direct imports. Queues handle reactions that are external, asynchronous, or resource-intensive, allowing your Flask app to stay responsive. The sender in signal.send(sender, ...) is typically your Flask application instance, making it easy to identify the source of the signal if you have multiple applications or components emitting signals. The **extra dictionary is where you pass the actual payload of your event – the data needed by the listeners.
A common pattern is to use signals to trigger the enqueuing of tasks. The signal receiver doesn’t do the work (like sending an email); it just puts a job description onto a queue. This keeps your signal receivers lean and fast. The actual heavy lifting is delegated to dedicated worker processes that constantly monitor these queues. This architecture is the foundation of robust, scalable applications where different parts can evolve independently.
You might be tempted to put complex logic directly into your signal receivers. Resist this. The signal receiver’s job is to observe an event and initiate a follow-up action, typically by placing a message on a queue. If the receiver itself performs the action, you’ve just moved the synchronous bottleneck from your request handler to your signal handler, defeating the purpose. The true power comes from the separation of concerns: Flask handles requests, signals notify of in-app occurrences, and queues orchestrate asynchronous, external processing.
The next step is managing the workers. What happens when a worker crashes mid-task?