Kafka Connect
Integration Details
This plugin extracts the following:
- Source and Sink Connectors in Kafka Connect as Data Pipelines
- For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per {connector_name}:{source_dataset}combination
- For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per {connector_name}:{topic}combination
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
| Source Concept | DataHub Concept | Notes | 
|---|---|---|
| "kafka-connect" | Data Platform | |
| Connector | DataFlow | |
| Kafka Topic | Dataset | 
Current limitations
Works only for
- Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
- Sink connectors: BigQuery, Confluent, S3, Snowflake
Important Capabilities
| Capability | Status | Notes | 
|---|---|---|
| Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata | 
| Platform Instance | ✅ | Enabled by default | 
| Schema Metadata | ✅ | Enabled by default | 
| Table-Level Lineage | ✅ | Enabled by default | 
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
  type: "kafka-connect"
  config:
    # Coordinates
    connect_uri: "http://localhost:8083"
    # Credentials
    username: admin
    password: password
    # Optional
    # Platform instance mapping to use when constructing URNs.
    # Use if single instance of platform is referred across connectors.
    platform_instance_map:
      mysql: mysql_platform_instance
sink:
  # sink configs
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| cluster_name string | Cluster to ingest from. Default: connect-cluster | 
| connect_to_platform_map map(str,map) | |
| connect_uri string | URI to connect to. Default: http://localhost:8083/ | 
| convert_lineage_urns_to_lowercase boolean | Whether to convert the urns of ingested lineage dataset to lowercase Default: False | 
| password string | Kafka Connect password. | 
| platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. | 
| platform_instance_map map(str,string) | |
| username string | Kafka Connect username. | 
| env string | The environment that all assets produced by this connector belong to Default: PROD | 
| connector_patterns AllowDenyPattern | regex patterns for connectors to filter for ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| connector_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| connector_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| connector_patterns.allow.string string | |
| connector_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| connector_patterns.deny.string string | |
| generic_connectors array | Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector Default: [] | 
| generic_connectors.GenericConnectorConfig GenericConnectorConfig | |
| generic_connectors.GenericConnectorConfig.connector_name ❓ string | |
| generic_connectors.GenericConnectorConfig.source_dataset ❓ string | |
| generic_connectors.GenericConnectorConfig.source_platform ❓ string | |
| provided_configs array | Provided Configurations | 
| provided_configs.ProvidedConfig ProvidedConfig | |
| provided_configs.ProvidedConfig.path_key ❓ string | |
| provided_configs.ProvidedConfig.provider ❓ string | |
| provided_configs.ProvidedConfig.value ❓ string | |
| stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | 
| stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_apiis specified, otherwise FalseDefault: False | 
| stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 | 
| stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True | 
The JSONSchema for this configuration is inlined below.
{
  "title": "KafkaConnectSourceConfig",
  "description": "Any source that connects to a platform should inherit this class",
  "type": "object",
  "properties": {
    "stateful_ingestion": {
      "$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
    },
    "env": {
      "title": "Env",
      "description": "The environment that all assets produced by this connector belong to",
      "default": "PROD",
      "type": "string"
    },
    "platform_instance_map": {
      "title": "Platform Instance Map",
      "description": "Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { \"hive\": \"warehouse\" }`",
      "type": "object",
      "additionalProperties": {
        "type": "string"
      }
    },
    "platform_instance": {
      "title": "Platform Instance",
      "description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
      "type": "string"
    },
    "connect_uri": {
      "title": "Connect Uri",
      "description": "URI to connect to.",
      "default": "http://localhost:8083/",
      "type": "string"
    },
    "username": {
      "title": "Username",
      "description": "Kafka Connect username.",
      "type": "string"
    },
    "password": {
      "title": "Password",
      "description": "Kafka Connect password.",
      "type": "string"
    },
    "cluster_name": {
      "title": "Cluster Name",
      "description": "Cluster to ingest from.",
      "default": "connect-cluster",
      "type": "string"
    },
    "convert_lineage_urns_to_lowercase": {
      "title": "Convert Lineage Urns To Lowercase",
      "description": "Whether to convert the urns of ingested lineage dataset to lowercase",
      "default": false,
      "type": "boolean"
    },
    "connector_patterns": {
      "title": "Connector Patterns",
      "description": "regex patterns for connectors to filter for ingestion.",
      "default": {
        "allow": [
          ".*"
        ],
        "deny": [],
        "ignoreCase": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/AllowDenyPattern"
        }
      ]
    },
    "provided_configs": {
      "title": "Provided Configs",
      "description": "Provided Configurations",
      "type": "array",
      "items": {
        "$ref": "#/definitions/ProvidedConfig"
      }
    },
    "connect_to_platform_map": {
      "title": "Connect To Platform Map",
      "description": "Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either `platform_instance_map` or `connect_to_platform_map`. e.g.`connect_to_platform_map: { \"postgres-connector-finance-db\": \"postgres\": \"core_finance_instance\" }`",
      "type": "object",
      "additionalProperties": {
        "type": "object",
        "additionalProperties": {
          "type": "string"
        }
      }
    },
    "generic_connectors": {
      "title": "Generic Connectors",
      "description": "Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector",
      "default": [],
      "type": "array",
      "items": {
        "$ref": "#/definitions/GenericConnectorConfig"
      }
    }
  },
  "additionalProperties": false,
  "definitions": {
    "DynamicTypedStateProviderConfig": {
      "title": "DynamicTypedStateProviderConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the state provider to use. For DataHub use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
          "default": {},
          "type": "object"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "StatefulStaleMetadataRemovalConfig": {
      "title": "StatefulStaleMetadataRemovalConfig",
      "description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
          "default": false,
          "type": "boolean"
        },
        "remove_stale_metadata": {
          "title": "Remove Stale Metadata",
          "description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
          "default": true,
          "type": "boolean"
        },
        "fail_safe_threshold": {
          "title": "Fail Safe Threshold",
          "description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
          "default": 75.0,
          "minimum": 0.0,
          "maximum": 100.0,
          "type": "number"
        }
      },
      "additionalProperties": false
    },
    "AllowDenyPattern": {
      "title": "AllowDenyPattern",
      "description": "A class to store allow deny regexes",
      "type": "object",
      "properties": {
        "allow": {
          "title": "Allow",
          "description": "List of regex patterns to include in ingestion",
          "default": [
            ".*"
          ],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "deny": {
          "title": "Deny",
          "description": "List of regex patterns to exclude from ingestion.",
          "default": [],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "ignoreCase": {
          "title": "Ignorecase",
          "description": "Whether to ignore case sensitivity during pattern matching.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "ProvidedConfig": {
      "title": "ProvidedConfig",
      "type": "object",
      "properties": {
        "provider": {
          "title": "Provider",
          "type": "string"
        },
        "path_key": {
          "title": "Path Key",
          "type": "string"
        },
        "value": {
          "title": "Value",
          "type": "string"
        }
      },
      "required": [
        "provider",
        "path_key",
        "value"
      ],
      "additionalProperties": false
    },
    "GenericConnectorConfig": {
      "title": "GenericConnectorConfig",
      "type": "object",
      "properties": {
        "connector_name": {
          "title": "Connector Name",
          "type": "string"
        },
        "source_dataset": {
          "title": "Source Dataset",
          "type": "string"
        },
        "source_platform": {
          "title": "Source Platform",
          "type": "string"
        }
      },
      "required": [
        "connector_name",
        "source_dataset",
        "source_platform"
      ],
      "additionalProperties": false
    }
  }
}
Advanced Configurations
Working with Platform Instances
If you've multiple instances of kafka OR source/sink systems that are referred in your kafka-connect setup, you'd need to configure platform instance for these systems in kafka-connect recipe to generate correct lineage edges. You must have already set platform_instance in recipes of original source/sink systems. Refer the document Working with Platform Instances to understand more about this.
There are two options available to declare source/sink system's platform_instance in kafka-connect recipe. If single instance of platform is used across all kafka-connect connectors, you can use platform_instance_map to specify platform_instance to use for a platform when constructing URNs for lineage.
Example:
# Map of platform name to platform instance
platform_instance_map:
  snowflake: snowflake_platform_instance
  mysql: mysql_platform_instance
If multiple instances of platform are used across kafka-connect connectors, you'd need to specify platform_instance to use for platform for every connector.
Example - Multiple MySQL Source Connectors each reading from different mysql instance
# Map of platform name to platform instance per connector
connect_to_platform_map:
  mysql_connector1:
    mysql: mysql_instance1
  mysql_connector2:
    mysql: mysql_instance2
Here mysql_connector1 and mysql_connector2 are names of MySQL source connectors as defined in kafka-connect connector config.
Example - Multiple MySQL Source Connectors each reading from difference mysql instance and writing to different kafka cluster
connect_to_platform_map:
  mysql_connector1:
    mysql: mysql_instance1
    kafka: kafka_instance1
  mysql_connector2:
    mysql: mysql_instance2
    kafka: kafka_instance2
You can also use combination of platform_instance_map and connect_to_platform_map in your recipe. Note that, the platform_instance specified for the connector in connect_to_platform_map will always take higher precedance even if platform_instance for same platform is set in platform_instance_map.
If you do not use platform_instance in original source/sink recipes, you do not need to specify them in above configurations.
Note that, you do not need to specify platform_instance for BigQuery.
Example - Multiple BigQuery Sink Connectors each writing to different kafka cluster
connect_to_platform_map:
  bigquery_connector1:
    kafka: kafka_instance1
  bigquery_connector2:
    kafka: kafka_instance2
Provided Configurations from External Sources
Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in provided_configs section in recipe for DataHub to generate correct lineage.
# Optional mapping of provider configurations if using
provided_configs:
  - provider: env
    path_key: MYSQL_CONNECTION_URL
    value: jdbc:mysql://test_mysql:3306/librarydb
Code Coordinates
- Class Name: datahub.ingestion.source.kafka_connect.kafka_connect.KafkaConnectSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack.