Skip to main content

Pulsar

Overview

Pulsar is a streaming or integration platform. Learn more in the official Pulsar documentation.

The DataHub integration for Pulsar covers streaming/integration entities such as topics, connectors, pipelines, or jobs. Depending on module capabilities, it can also capture features such as lineage, usage, profiling, ownership, tags, and stateful deletion detection.

Concept Mapping

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Module pulsar

Incubating

Important Capabilities

CapabilityStatusNotes
Detect Deleted EntitiesEnabled by default via stateful ingestion.
DomainsSupported via the domain config field.
Platform InstanceEnabled by default.
Schema MetadataEnabled by default.

Overview

The pulsar module ingests metadata from Pulsar into DataHub. It is intended for production ingestion workflows and module-specific capabilities are documented below.

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Prerequisites

  • Pulsar Instance: Access with valid access token (if authentication enabled)
  • Version: Pulsar 2.7.0 or later
  • Role: superUser required to list all tenants
info

A superUser role is required for listing all existing tenants within a Pulsar instance.

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
client_id
One of string, null
The application's client ID
Default: None
client_secret
One of string(password), null
The application's client secret
Default: None
exclude_individual_partitions
boolean
Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.
Default: True
issuer_url
One of string, null
The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.
Default: None
oid_config
object
Placeholder for OpenId discovery document
platform_instance
One of string, null
The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.
Default: None
timeout
integer
Timout setting, how long to wait for the Pulsar rest api to send data before giving up
Default: 5
token
One of string(password), null
The access token for the application. Mandatory for token based authentication.
Default: None
verify_ssl
One of boolean, string
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.
Default: True
web_service_url
string
The web URL for the cluster.
Default: http://localhost:8080
env
string
The environment that all assets produced by this connector belong to
Default: PROD
domain
map(str,AllowDenyPattern)
A class to store allow deny regexes
domain.key.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
domain.key.allow.string
string
domain.key.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
domain.key.deny
array
List of regex patterns to exclude from ingestion.
Default: []
domain.key.deny.string
string
namespace_patterns
AllowDenyPattern
A class to store allow deny regexes
namespace_patterns.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
namespace_patterns.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
namespace_patterns.allow.string
string
namespace_patterns.deny
array
List of regex patterns to exclude from ingestion.
Default: []
namespace_patterns.deny.string
string
tenant_patterns
AllowDenyPattern
A class to store allow deny regexes
tenant_patterns.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
tenant_patterns.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
tenant_patterns.allow.string
string
tenant_patterns.deny
array
List of regex patterns to exclude from ingestion.
Default: []
tenant_patterns.deny.string
string
tenants
array
Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role
Default: []
tenants.string
string
topic_patterns
AllowDenyPattern
A class to store allow deny regexes
topic_patterns.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
topic_patterns.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
topic_patterns.allow.string
string
topic_patterns.deny
array
List of regex patterns to exclude from ingestion.
Default: []
topic_patterns.deny.string
string
stateful_ingestion
One of StatefulStaleMetadataRemovalConfig, null
see Stateful Ingestion
Default: None
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False
stateful_ingestion.fail_safe_threshold
number
Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.
Default: 75.0
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True

Capabilities

Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.

danger

Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Limitations

Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.

Troubleshooting

If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub
Questions?

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack.

💡 Contributing to this documentation

This page is auto-generated from the underlying source code. To make changes, please edit the relevant source files in the metadata-ingestion directory.

Tip: For quick typo fixes or documentation updates, you can click the ✏️ Edit icon directly in the GitHub UI to open a Pull Request. For larger changes and PR naming conventions, please refer to our Contributing Guide.