Elasticsearch pipelines can transform documents before they’re indexed, meaning you can clean, enrich, or reshape your data on the fly, saving you from complex client-side processing or post-indexing manipulation.

Let’s see this in action. Imagine you’re ingesting log data that includes a client_ip field, and you want to enrich it with geographical information.

PUT /_ingest/pipeline/geo_enrichment_pipeline
{
  "description": "Enriches documents with GeoIP information based on client_ip",
  "processors": [
    {
      "geoip": {
        "field": "client_ip",
        "target_field": "client_geo",
        "database_file": "GeoLite2-City.mmdb"
      }
    }
  ]
}

Now, when you index a document, you specify this pipeline:

POST /logs/_ingest/pipeline/geo_enrichment_pipeline
{
  "client_ip": "8.8.8.8",
  "message": "User logged in"
}

The document that gets indexed into logs will look like this, with the client_geo field automatically populated:

{
  "client_ip": "8.8.8.8",
  "message": "User logged in",
  "client_geo": {
    "continent_name": "North America",
    "country_iso_code": "US",
    "country_name": "United States",
    "region_name": "California",
    "city_name": "Mountain View",
    "location": {
      "lat": 37.422,
      "lon": -122.084
    }
  }
}

The core problem ingest pipelines solve is decoupling data transformation logic from your application code or from expensive post-processing. Instead of your application needing to call a GeoIP lookup service for every log line before sending it to Elasticsearch, or running a separate batch job to add this data later, Elasticsearch handles it natively. This reduces latency for ingestion, simplifies your application’s responsibilities, and ensures data consistency across all ingested documents.

Internally, an Elasticsearch ingest pipeline is a sequence of processors. Each processor takes the incoming document, performs a specific operation (like geoip, grok for parsing, rename for fields, set for adding values, remove for deleting fields, or script for custom logic), and passes the modified document to the next processor in line. If a processor fails, the entire pipeline execution for that document typically fails, and the document is not indexed (though you can configure failure handling).

You control the flow by defining the order of processors in the pipeline’s JSON definition. The description field is purely for human readability. The processors array is where the magic happens. Each object in the array represents a single processor.

The geoip processor, as shown, is a powerful built-in tool. It requires a GeoIP database file (like MaxMind’s GeoLite2 City) to be available on the Elasticsearch node. The field parameter specifies the field containing the IP address, and target_field is where the enriched information will be stored. You can configure it to output specific fields or to include the full set of GeoIP data.

Beyond geoip, other common processors include:

  • grok: For parsing unstructured log data into structured fields using regular expressions. This is incredibly useful for making sense of free-form text logs.
  • rename: To change the name of a field. For instance, if your logs have ip_address but you want it as client_ip for consistency.
  • set: To add a new field or overwrite an existing one with a static value or a value derived from other fields.
  • remove: To delete fields that are not needed for indexing, reducing document size and improving search performance.
  • date: To parse date strings into the standard Elasticsearch date format, essential for time-series analysis.
  • split: To split a string field into an array of strings based on a delimiter.
  • join: The inverse of split, to join an array of strings into a single string.
  • script: For highly custom transformations that cannot be achieved with the built-in processors. This uses Painless scripting language, offering great flexibility but also potential performance implications if not written carefully.

You can chain these processors together. For example, you might first use grok to extract an IP address from a log message, then use rename to give it a standard name, and finally use geoip to enrich it.

When you specify a pipeline using POST /index/_ingest/pipeline/pipeline_id, it acts as a default for that index. You can also apply a pipeline ad-hoc to a single document index request using the ?pipeline=pipeline_id query parameter, or to a bulk request.

A common pitfall is assuming a processor will always succeed. For instance, the geoip processor will fail if the client_ip field is missing, malformed, or if the IP address isn’t found in the GeoIP database. By default, this failure halts indexing for that document. You can mitigate this by using the on_failure clause within a processor or at the pipeline level to define alternative actions, such as sending failed documents to a separate index or logging the error without stopping the ingest.

The on_failure clause allows you to execute another set of processors if any processor in the current set fails. This is crucial for robust data ingestion pipelines. For example, you could send a document that failed GeoIP lookup to a _failed_ingest index with an added field indicating the failure reason.

PUT /_ingest/pipeline/geo_enrichment_with_failure_handling
{
  "description": "Enriches documents with GeoIP information, handling failures",
  "processors": [
    {
      "geoip": {
        "field": "client_ip",
        "target_field": "client_geo",
        "database_file": "GeoLite2-City.mmdb",
        "on_failure": [
          {
            "set": {
              "field": "ingest_error",

              "value": "GeoIP lookup failed for IP: {{ _ingest.on_failure_message }}"

            }
          },
          {
            "set": {
              "field": "client_geo",
              "value": null
            }
          }
        ]
      }
    }
  ]
}

This pipeline will attempt the GeoIP lookup. If it fails, it will set an ingest_error field with a descriptive message and ensure client_geo is null, allowing the document to be indexed with this error information rather than being dropped entirely.

The ability to define and test pipelines directly in Kibana’s Ingest Node Playbook or via the API (GET _ingest/pipeline/pipeline_id) is invaluable for iterative development and debugging. You can simulate ingesting documents and see exactly how the pipeline transforms them before applying it to live data.

The next step after mastering ingest pipelines is understanding how to manage their lifecycle, including versioning and potential conflicts when multiple pipelines might be applied to the same index.

Want structured learning?

Take the full Elasticsearch course →