Skip to main content

Flink

Overview

Apache Flink is a distributed stream and batch processing framework. Learn more in the official Flink documentation.

The DataHub integration for Flink extracts job metadata, operator topology, and dataset lineage by connecting to the Flink JobManager REST API and optionally the SQL Gateway. It resolves table references to their actual platforms (Kafka, Postgres, Iceberg, etc.) via catalog introspection, and tracks job execution history as DataProcessInstances. Stateful ingestion is supported for stale entity removal.

Concept Mapping

Source ConceptDataHub ConceptNotes
Flink JobDataFlowOne DataFlow per Flink job
Flink OperatorDataJobGranularity depends on operator_granularity
Job ExecutionDataProcessInstanceWhen include_run_history is enabled
Kafka TopicDatasetResolved via lineage (DataStream or SQL/Table API)
JDBC TableDatasetResolved via SQL Gateway catalog introspection
Iceberg TableDatasetResolved via SQL Gateway or catalog_platform_map config

Incubating

Important Capabilities

CapabilityStatusNotes
Asset ContainersCatalog databases as containers (requires SQL Gateway).
Detect Deleted EntitiesVia stateful ingestion.
Platform InstanceEnabled by default.
Schema MetadataCatalog table schemas via SQL Gateway (requires include_catalog_metadata).
Table-Level LineageTable-level lineage from Kafka sources/sinks.

Overview

The flink module ingests metadata from Apache Flink into DataHub. It connects to the Flink JobManager REST API to extract jobs, execution plans, and run history. When a SQL Gateway URL is provided, it resolves SQL/Table API table references to their actual platforms (Kafka, Postgres, Iceberg, Paimon, etc.) via catalog introspection.

Prerequisites

In order to ingest metadata from Apache Flink, you will need:

  • Access to a Flink cluster with the JobManager REST API enabled (default port 8081)
  • Flink version >= 1.16 (tested with 1.19; platform resolution via DESCRIBE CATALOG requires Flink 1.20+)
  • For platform-resolved lineage of SQL/Table API jobs: access to a Flink SQL Gateway (default port 8083)

Required Permissions

CapabilityAPIRequired Access
Job metadata, run historyJobManager REST API (/v1/jobs)Read access to the REST API
Platform-resolved lineageSQL Gateway REST API (/v1/sessions)Session creation and SQL execution

NOTE: If your Flink cluster uses authentication (bearer token or basic auth), provide credentials in the connection config. The same credentials are used for both the JobManager and SQL Gateway APIs.

Install the Plugin

pip install 'acryl-datahub[flink]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "flink"
config:
connection:
rest_api_url: "http://localhost:8081"

# SQL Gateway enables platform resolution for SQL/Table API lineage.
# Without it, only DataStream Kafka lineage is extracted.
# sql_gateway_url: "http://localhost:8083"

# Authentication (uncomment ONE method)
# token: "${FLINK_API_TOKEN}"
# username: "admin"
# password: "${FLINK_PASSWORD}"

# Advanced connection tuning
# timeout_seconds: 30
# max_retries: 3
# verify_ssl: true

# Filter jobs by name
# job_name_pattern:
# allow:
# - "^prod_.*"

# Filter by job state (defaults to RUNNING, FINISHED, FAILED, CANCELED)
# include_job_states:
# - "RUNNING"
# - "FINISHED"

# Per-catalog platform_instance overrides. Platform is auto-detected via
# SQL Gateway for most catalog types; specify platform only when
# auto-detection is unavailable (see documentation for details).
# catalog_platform_map:
# pg_catalog:
# platform_instance: "prod-postgres"
# kafka_catalog:
# platform_instance: "prod-kafka"

# DataJob granularity - "job" (default) or "vertex" (one per operator)
# operator_granularity: "job"

# include_lineage: true
# include_run_history: true

# Platform-wide fallback for platform_instance (used when catalog_platform_map
# does not have an entry for the catalog).
# platform_instance_map:
# kafka: "prod-kafka-cluster"

# Parallel job processing
# max_workers: 10

# Stale entity removal
# stateful_ingestion:
# enabled: true
# remove_stale_metadata: true

env: "PROD"

sink:
type: datahub-rest
config:
server: "http://localhost:8080"

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
connection 
FlinkConnectionConfig
Connection configuration for Flink REST APIs.
connection.rest_api_url 
string
JobManager REST API endpoint (e.g., http://localhost:8081).
connection.max_retries
integer
Maximum total attempts (initial + retries) for failed HTTP requests with exponential backoff. Default of 3 means 1 initial attempt plus up to 2 retries.
Default: 3
connection.password
One of string(password), null
Password for HTTP Basic authentication. Must be paired with 'username'.
Default: None
connection.sql_gateway_operation_timeout_seconds
integer
Maximum time in seconds to wait for a SQL Gateway operation (SHOW CATALOGS, DESCRIBE CATALOG, etc.) to complete. Increase for slow catalog backends.
Default: 60
connection.sql_gateway_url
One of string, null
SQL Gateway REST API endpoint (e.g., http://localhost:8083). Enables platform resolution for SQL/Table API lineage. When provided, the connector resolves table references to their actual platform (kafka, postgres, iceberg, etc.) via catalog introspection.
Default: None
connection.timeout_seconds
integer
HTTP request timeout in seconds.
Default: 30
connection.token
One of string(password), null
Bearer token for authentication. Mutually exclusive with username/password.
Default: None
connection.username
One of string, null
Username for HTTP Basic authentication. Must be paired with 'password'.
Default: None
connection.verify_ssl
boolean
Verify SSL certificates for HTTPS connections.
Default: True
include_lineage
boolean
Extract source/sink lineage from Flink execution plans.
Default: True
include_run_history
boolean
Emit DataProcessInstance entities for job execution tracking.
Default: True
max_workers
integer
Max parallel threads for fetching job details from the Flink REST API.
Default: 10
operator_granularity
Enum
One of: "job", "vertex"
Default: job
platform_instance
One of string, null
The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.
Default: None
platform_instance_map
One of string, null
A holder for platform -> platform_instance mappings to generate correct dataset urns
Default: None
env
string
The environment that all assets produced by this connector belong to
Default: PROD
catalog_platform_map
map(str,CatalogPlatformDetail)
Platform details for a Flink catalog, used in dataset URN construction.

Provides two pieces of information for a given Flink catalog:

- platform: The DataHub platform name (e.g., "iceberg", "postgres").
On Flink 1.20+, this is auto-detected via DESCRIBE CATALOG and only needs
to be specified for catalogs where auto-detection fails. On Flink < 1.20,
this is required for Iceberg and Paimon catalogs (which don't expose a
connector property in SHOW CREATE TABLE).

- platform_instance: The DataHub platform instance (e.g., "prod-postgres").
Used when a Flink cluster connects to multiple deployments of the same
platform and you need distinct dataset URNs per deployment.

Follows the same pattern as Fivetran's PlatformDetail and Looker's
LookerConnectionDefinition.
catalog_platform_map.key.platform
One of string, null
DataHub platform name for datasets in this catalog (e.g., 'iceberg', 'postgres', 'kafka'). When omitted, the connector auto-detects the platform via SQL Gateway.
Default: None
catalog_platform_map.key.platform_instance
One of string, null
DataHub platform instance for datasets in this catalog (e.g., 'prod-postgres', 'us-east-kafka'). Used to distinguish multiple deployments of the same platform.
Default: None
include_job_states
array
Flink job states to include in ingestion.
Default: ['RUNNING', 'FINISHED', 'FAILED', 'CANCELED']
include_job_states.string
string
job_name_pattern
AllowDenyPattern
A class to store allow deny regexes
job_name_pattern.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
stateful_ingestion
One of StatefulStaleMetadataRemovalConfig, null
Stateful ingestion for soft-deleting stale entities.
Default: None
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False
stateful_ingestion.fail_safe_threshold
number
Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.
Default: 75.0
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True

Capabilities

Lineage Extraction

The connector extracts table-level lineage by analyzing Flink execution plans. It handles two distinct cases:

DataStream API (Kafka only): The connector recognizes KafkaSource-{topic} and KafkaSink-{topic} patterns in operator descriptions. Platform is always kafka, and the topic name is extracted directly from the description. No SQL Gateway needed.

SQL/Table API (all connectors): The connector parses TableSourceScan(table=[[catalog, db, table]]) and Sink(table=[[catalog, db, table]]) patterns. These are generic Flink plan formats — the same for Kafka, JDBC, Iceberg, Paimon, and every other connector. The connector resolves the actual platform via SQL Gateway catalog introspection:

  1. catalog_platform_map config — user-provided overrides; take priority over all auto-detection
  2. DESCRIBE CATALOG (Flink 1.20+) — determines catalog type (jdbc, iceberg, paimon, hive, etc.)
  3. SHOW CREATE TABLE — reads the connector property from the table DDL (for hive/generic_in_memory catalogs with mixed connector types)

Platform Resolution Examples

A Flink job reads from a Postgres JDBC catalog table pg_catalog.mydb.public.users:

Plan: TableSourceScan(table=[[pg_catalog, mydb, public.users]])
→ SQL Gateway: DESCRIBE CATALOG → type=jdbc, base-url=jdbc:postgresql:// (Flink 1.20+)
→ URN: urn:li:dataset:(urn:li:dataPlatform:postgres, mydb.public.users, PROD)

A Flink job reads from an Iceberg catalog table ice_catalog.lake.events:

Plan: TableSourceScan(table=[[ice_catalog, lake, events]])
→ SQL Gateway: DESCRIBE CATALOG → type=iceberg (Flink 1.20+)
→ URN: urn:li:dataset:(urn:li:dataPlatform:iceberg, lake.events, PROD)

Platform Instance Mapping

If your datasets belong to specific platform instances (e.g., a particular Kafka cluster or Postgres deployment), use catalog_platform_map for per-catalog mapping or platform_instance_map as a platform-wide fallback:

source:
type: "flink"
config:
connection:
rest_api_url: "http://localhost:8081"
sql_gateway_url: "http://localhost:8083"
# Per-catalog: takes priority
catalog_platform_map:
pg_us:
platform_instance: "us-postgres"
pg_eu:
platform_instance: "eu-postgres"
# Platform-wide fallback
platform_instance_map:
kafka: "prod-kafka-cluster"

On Flink versions before 1.20, DESCRIBE CATALOG is not available. The connector falls back to SHOW CREATE TABLE, but Iceberg and Paimon tables do not have a connector property in their DDL. In this case, provide the platform explicitly via catalog_platform_map:

source:
type: "flink"
config:
connection:
rest_api_url: "http://localhost:8081"
sql_gateway_url: "http://localhost:8083"
catalog_platform_map:
ice_catalog:
platform: "iceberg"
paimon_catalog:
platform: "paimon"

On Flink 1.20+, this config is not needed — the platform is auto-detected from the catalog type.

Operator Granularity

By default (operator_granularity: job), the connector emits one DataJob per Flink job with all source and sink lineage coalesced into that single DataJob.

Set operator_granularity: vertex to emit one DataJob per operator/vertex in the execution plan. This gives finer-grained lineage at the cost of more entities.

Run History

When include_run_history is enabled (the default), the connector emits DataProcessInstance entities that track individual job executions:

  • Start and end timestamps from the Flink job timeline
  • Run result: FINISHED maps to SUCCESS, FAILED maps to FAILURE, CANCELED maps to SKIPPED
  • Process type: STREAMING or BATCH, based on the Flink job type

Jobs in RUNNING state emit a start event only. Completed jobs emit both start and end events.

Limitations

  1. SQL Gateway required for SQL/Table API lineage. Without a SQL Gateway URL, the connector cannot resolve TableSourceScan(table=[[catalog, db, table]]) references to their actual platform. DataStream Kafka lineage (KafkaSource-{topic}) works without SQL Gateway.

  2. Catalogs must be visible to the SQL Gateway session. Catalogs registered programmatically in job code, via ephemeral SQL client sessions, or in a separate FileCatalogStore are invisible to the connector. Production deployments should use a persistent catalog (e.g., HiveCatalog backed by Hive Metastore) so that table definitions are visible across sessions.

  3. Iceberg/Paimon on Flink < 1.20 require config. DESCRIBE CATALOG was introduced in Flink 1.20. On earlier versions, Iceberg and Paimon catalogs cannot be auto-detected because their tables don't have a connector property in SHOW CREATE TABLE. Use catalog_platform_map to specify the platform manually.

  4. Operator-chained sinks have no catalog info. The tableName[N]: Writer pattern produced by Flink's operator chaining does not include catalog or database information. Only the bare table name is available. These sinks cannot be resolved to a platform and are reported as unclassified.

  5. Temporary tables are invisible to SQL Gateway. CREATE TEMPORARY TABLE definitions are session-scoped and not persisted in any catalog. The SQL Gateway cannot look up their definitions, so temporary tables cannot be resolved to a platform and are reported as unclassified.

  6. DataStream non-Kafka connectors are not supported. Only KafkaSource-{topic} and KafkaSink-{topic} DataStream patterns are recognized. Other DataStream connectors (Kinesis, Pulsar, RabbitMQ, custom) produce user-provided names with no platform information.

  7. No column-level lineage. Only table-level (coarse) lineage is extracted from execution plans.

Troubleshooting

"Failed to connect to Flink cluster" Verify the rest_api_url is correct and reachable. Test manually: curl http://<host>:8081/v1/config

Jobs appear but no lineage is extracted Check the ingestion report for "unclassified" nodes. Common causes:

  • SQL/Table API jobs without sql_gateway_url configured — add the SQL Gateway URL
  • Tables in default_catalog (GenericInMemoryCatalog) created in ephemeral sessions — use a persistent catalog like HiveCatalog
  • DataStream jobs using non-Kafka connectors — not currently supported

SQL Gateway configured but platform not resolved On Flink < 1.20, DESCRIBE CATALOG is unavailable. Check if the table's SHOW CREATE TABLE output includes a connector property. For Iceberg/Paimon catalogs, add catalog_platform_map config.

Lineage URNs don't match other connectors (e.g., Kafka connector) Ensure platform_instance_map or catalog_platform_map produces the same platform instance as your other ingestion sources. For example, if the Kafka connector uses platform_instance: "prod-cluster", configure:

platform_instance_map:
kafka: "prod-cluster"

Code Coordinates

  • Class Name: datahub.ingestion.source.flink.source.FlinkSource
  • Browse on GitHub
Questions?

If you've got any questions on configuring ingestion for Flink, feel free to ping us on our Slack.

💡 Contributing to this documentation

This page is auto-generated from the underlying source code. To make changes, please edit the relevant source files in the metadata-ingestion directory.

Tip: For quick typo fixes or documentation updates, you can click the ✏️ Edit icon directly in the GitHub UI to open a Pull Request. For larger changes and PR naming conventions, please refer to our Contributing Guide.