BigQuery isn’t just a data warehouse; it’s a distributed query engine that can act as a source and sink for your streaming data pipelines.
Let’s see how we can read from BigQuery, process that data with Apache Beam, and write it back to BigQuery.
Here’s a simple pipeline that reads customer data from a BigQuery table, adds a "processing_timestamp" field, and writes it to a new table.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
def add_processing_timestamp(element):
element['processing_timestamp'] = datetime.utcnow().isoformat()
return element
def run():
# Define pipeline options
options = PipelineOptions(
runner='DataflowRunner', # Use Dataflow for managed execution
project='your-gcp-project-id',
region='us-central1',
temp_location='gs://your-bucket/temp',
staging_location='gs://your-bucket/staging',
job_name='bigquery-processing-job'
)
# Define input and output BigQuery table specifications
input_table = 'your-gcp-project-id.your_dataset.input_customers'
output_table = 'your-gcp-project-id.your_dataset.output_customers'
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'SELECT * FROM {input_table}')
| 'AddTimestamp' >> beam.Map(add_processing_timestamp)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
output_table,
schema='customer_id:STRING,name:STRING,email:STRING,processing_timestamp:TIMESTAMP', # Define schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == '__main__':
run()
To run this, you’ll need:
- A Google Cloud project with billing enabled.
- The
apache-beam[gcp]Python package installed (pip install apache-beam[gcp]). - A BigQuery dataset and table (
input_customers) with some data. - A Google Cloud Storage bucket (
your-bucket) for staging and temporary files.
The magic here is beam.io.ReadFromBigQuery and beam.io.WriteToBigQuery. Beam handles the heavy lifting of interacting with BigQuery’s API, translating your data into a format Beam can process, and then writing it back. The DataflowRunner ensures this pipeline runs on Google’s managed Dataflow service, scaling automatically.
The core problem this solves is moving and transforming data at scale without managing infrastructure. You define the logic in Beam, and Dataflow executes it across potentially thousands of machines. beam.io.ReadFromBigQuery uses a SQL query to fetch data, and Beam treats each row as a dictionary (or a TableRow object). The beam.Map transform applies our add_processing_timestamp function to each row. Finally, beam.io.WriteToBigQuery takes these transformed rows and writes them to a new BigQuery table, inferring the schema or using one you provide.
The create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED tells BigQuery to create the table if it doesn’t exist, and write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND ensures new data is added to the table rather than overwriting it.
The schema definition for WriteToBigQuery is crucial. If your input table has different data types or if you’re adding new fields, you must explicitly define the schema for the output table. Beam doesn’t automatically infer complex types or new fields without this explicit definition.
What most people miss is how deeply integrated Beam’s BigQuery I/O is with BigQuery’s own distributed execution capabilities. When you read from BigQuery, Beam can leverage BigQuery’s internal mechanisms to read data directly from its distributed storage, often avoiding the need to materialize the entire dataset in Cloud Storage first. Similarly, when writing, Dataflow can stream data directly to BigQuery’s ingestion endpoints, optimizing for high throughput.
The next step is often handling schema evolution or dealing with malformed data during the read process.