Flink SQL UDFs are surprisingly similar to regular SQL UDFs, but with a few key differences related to state management and execution environments.

Let’s see a simple UDF in action. Imagine we have a stream of sensor readings with device IDs and temperature values. We want to flag readings that are unusually high.

-- Define a Python UDF that checks if a temperature is above a threshold
CREATE TEMPORARY FUNCTION high_temperature AS 'udf.HighTemperatureUDF' LANGUAGE PYTHON;

-- Use the UDF in a SELECT statement
SELECT
    device_id,
    temperature,
    high_temperature(temperature) AS is_high
FROM sensor_readings;

Here’s the Python code for udf.py:

class HighTemperatureUDF:
    def __init__(self):
        self.threshold = 30.0  # Default threshold

    def open(self, context):
        # This method is called when the UDF is initialized.
        # You can load configuration or other resources here.
        pass

    def eval(self, temperature):
        if temperature is None:
            return False
        return temperature > self.threshold

    def close(self):
        # This method is called when the UDF is closed.
        # Clean up resources if necessary.
        pass

# The Flink runtime expects a class named 'HighTemperatureUDF'
# to be available in the 'udf' module.

And here’s how you’d register it with Flink SQL:

# Assuming you have your Python UDF code in a file named udf.py
# and your Flink SQL client is running.

# Register the UDF
!python --jar /path/to/your/udf.py

# Now you can use it in your SQL queries
SELECT device_id, temperature, high_temperature(temperature) AS is_high FROM sensor_readings;

The high_temperature function will now be available within your Flink SQL session. For each row in sensor_readings, it will call the eval method of your HighTemperatureUDF instance, passing the temperature value. The UDF will return True if the temperature exceeds 30.0, and False otherwise.

The core problem Flink SQL UDFs solve is extending SQL’s capabilities with custom logic that isn’t built-in. This could be complex string manipulations, domain-specific calculations, or even integrating with external services (though be cautious with I/O in UDFs). Flink supports UDFs in Python, Java, and Scala, giving you flexibility based on your team’s expertise and existing libraries.

Internally, Flink serializes your UDF code and distributes it to the TaskManagers where your Flink job is running. When a query needs to execute your UDF, Flink instantiates your UDF class on the relevant worker, calls its open() method for initialization (which happens once per UDF instance on a given worker), and then repeatedly calls its eval() method for each input row. Finally, when the Flink job finishes or the UDF is no longer needed, the close() method is invoked for cleanup.

The open(self, context) method is your hook for setting up anything the UDF needs before it starts processing data. This is where you’d typically load configuration parameters passed from Flink SQL (e.g., the threshold value), initialize connections to external systems (again, use with extreme care), or pre-compute lookup tables. The context object provides access to runtime information, though it’s often not needed for simple UDFs.

The most surprising thing for many is how Flink handles stateful UDFs. Unlike traditional SQL UDFs that are inherently stateless, Flink UDFs can leverage Flink’s state management primitives. This means a UDF can maintain state across multiple input records, enabling operations like aggregations, windowing logic, or maintaining running totals directly within the UDF. To do this, you’d use Flink’s state APIs (like ValueState, ListState, MapState) within your UDF’s open and eval methods, and Flink handles the checkpointing and fault tolerance for that state automatically.

The next concept you’ll likely encounter is how to handle complex data types as UDF inputs and outputs, and how to manage dependencies for your UDF code.

Want structured learning?

Take the full Flink course →