You can generate dynamic parallel steps in Airflow using withParam by passing a list of dictionaries to withParam, where each dictionary represents the parameters for a single task instance.
Let’s see this in action with a concrete example. Imagine you have a list of files you need to process, and for each file, you want to run a Python function that simulates processing.
from __future__ import annotations
import pendulum
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
@dag(
dag_id="dynamic_parallel_steps_with_param",
schedule=None,
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
catchup=False,
tags=["example", "dynamic"],
)
def dynamic_parallel_dag():
start = EmptyOperator(task_id="start")
def process_file(file_name: str, processing_date: str):
"""Simulates processing a file."""
print(f"Processing file: {file_name} for date: {processing_date}")
# In a real scenario, this would involve reading, transforming, and writing data.
@task
def get_files_to_process():
"""
This task would typically fetch a list of files or data items
that need to be processed. For this example, we'll hardcode it.
"""
return [
{"file_name": "data_2023-10-26.csv", "processing_date": "2023-10-26"},
{"file_name": "report_2023-10-27.xlsx", "processing_date": "2023-10-27"},
{"file_name": "log_2023-10-28.txt", "processing_date": "2023-10-28"},
]
files_info = get_files_to_process()
# Use with_param to dynamically generate parallel tasks
# The 'files_info' will be a list of dictionaries, and each dictionary
# will be unpacked as keyword arguments to the 'process_file' function.
process_tasks = PythonOperator.partial(
task_id="process_file",
python_callable=process_file,
).with_param("file_info", files_info)
end = EmptyOperator(task_id="end")
start >> files_info >> process_tasks >> end
dynamic_parallel_dag()
In this DAG, get_files_to_process returns a list of dictionaries. Each dictionary contains the file_name and processing_date for a specific file. The PythonOperator.partial(...).with_param("file_info", files_info) part is where the magic happens. with_param takes the list of dictionaries and creates a separate instance of the process_file task for each dictionary in the list. The keys of the dictionary (file_name, processing_date) are automatically passed as keyword arguments to the python_callable.
This pattern is incredibly useful when you have a variable number of items to process in parallel, such as processing files from an S3 bucket, handling messages from a Kafka topic, or running a report for different geographical regions. The number of parallel tasks is determined at runtime by the output of the upstream task, making your DAGs adaptable to changing data volumes.
The core problem this solves is avoiding hardcoding task IDs or creating a massive, static list of tasks when the number of parallel operations is dynamic. Instead of writing task_1 = PythonOperator(...), task_2 = PythonOperator(...), …, task_n = PythonOperator(...), you define a template task using partial and then use with_param to instantiate it for each item in your dynamic list. The task_id generated for each parallel task will be a combination of the base task_id ("process_file" in this case) and a unique identifier derived from the parameter, usually an index or a hash of the parameter values, like process_file.{"file_name": "data_2023-10-26.csv", "processing_date": "2023-10-26"}.
One of the most powerful, yet often overlooked, aspects of with_param is its ability to accept complex nested structures within the parameter dictionaries. You aren’t limited to simple strings or numbers; you can pass lists, nested dictionaries, or even other Python objects as parameters. Airflow serializes these parameters and makes them available to your task. This allows for highly sophisticated parameterization of parallel tasks, enabling scenarios like processing different types of data with varying configurations for each parallel branch.
The next logical step is to explore how to handle dependencies between these dynamically generated tasks and other parts of your DAG, especially when the exact number of tasks isn’t known until runtime.