FastAPI’s built-in support for background tasks is surprisingly limited, often leading developers to seek more robust solutions like ARQ for managing long-running or scheduled jobs.
Let’s see ARQ in action. Imagine a scenario where you need to process an uploaded image – resizing, applying filters, and storing metadata. This is a perfect candidate for a background job.
Here’s a simplified setup:
main.py (FastAPI application):
from fastapi import FastAPI, UploadFile, File
from arq import ArqRedis
from arq.connections import RedisSettings
from your_tasks import process_image # Assuming process_image is defined in your_tasks.py
app = FastAPI()
redis_settings = RedisSettings(host='localhost', port=6379, database=0)
redis = ArqRedis(redis_settings)
@app.post("/upload-image/")
async def upload_image_endpoint(file: UploadFile = File(...)):
contents = await file.read()
# Enqueue the background job
await redis.enqueue_job('process_image', file.filename, contents)
return {"message": f"Image '{file.filename}' uploaded and processing started."}
your_tasks.py (ARQ worker tasks):
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def process_image(filename: str, content: bytes):
logger.info(f"Starting to process image: {filename}")
# Simulate image processing (e.g., resizing, filtering)
await asyncio.sleep(10) # Simulate a 10-second task
logger.info(f"Finished processing image: {filename}")
# In a real app, you'd save the processed image and metadata here
worker.py (ARQ worker startup):
import asyncio
from arq import ArqRedis
from arq.connections import RedisSettings
from your_tasks import process_image # Import your tasks
async def startup(arq_redis: ArqRedis):
# Optional: Any setup needed when the worker starts
pass
async def shutdown(arq_redis: ArqRedis):
# Optional: Any cleanup needed when the worker shuts down
pass
async def main():
redis_settings = RedisSettings(host='localhost', port=6379, database=0)
# Pass the startup and shutdown functions to the worker
await asyncio.create_task(
# Ensure this matches the key used in enqueue_job
arq_redis.run_worker(
'process_image',
startup=startup,
shutdown=shutdown,
redis_settings=redis_settings
)
)
if __name__ == '__main__':
asyncio.run(main())
To run this, you’d start Redis, then run python worker.py in one terminal, and uvicorn main:app --reload in another. Uploading an image via a POST request to /upload-image/ will trigger the background process_image task. You’ll see logs in the worker.py terminal as the image is processed.
ARQ elegantly decouples long-running tasks from your main web server, preventing user requests from being blocked and improving overall application responsiveness. It leverages Redis as a message broker and job queue. When you enqueue_job, ARQ pushes a message onto a Redis list. The ARQ worker, running in a separate process, constantly polls this list. Upon finding a new job, it fetches the task details, deserializes arguments, and executes the corresponding Python function. This asynchronous, queue-based architecture is the core of its power.
The ArqRedis object is your primary interface for interacting with the ARQ system. You use it to enqueue jobs, check job status, and retrieve results. The RedisSettings object configures the connection to your Redis instance, specifying the host, port, and database. In worker.py, arq_redis.run_worker() is the command that spins up the worker process. You tell it which functions it should be aware of (in this case, 'process_image') and provide connection details.
The real magic of ARQ, beyond just background processing, lies in its ability to handle scheduled tasks and retries. You can define a task to run at a specific time or at regular intervals using cron-like syntax. If a task fails, ARQ can automatically retry it a configurable number of times, with optional exponential backoff, ensuring that transient failures don’t lead to permanent data loss or application instability. This is configured within the run_worker call or through job-specific options.
What most people miss is that ARQ’s job serialization is handled by pickle by default. While convenient, this can be a security risk if your Redis instance is exposed or if you’re processing untrusted job payloads. For production, it’s highly recommended to configure ARQ to use a more secure serializer like json or msgpack if your task arguments and return values are compatible, or to ensure your Redis instance is properly secured. You can specify this using serializer='json' when initializing ArqRedis.
The next hurdle in building a robust background job system is handling job idempotency and ensuring tasks can be safely retried without unintended side effects.