DataFlow
A DataFlow represents a high-level data processing pipeline or workflow orchestrated by systems like Apache Airflow, Azkaban, Prefect, Dagster, or similar workflow management platforms. DataFlows serve as containers for related DataJobs, representing the overall execution context and organization of data processing tasks.
Identity
DataFlows are uniquely identified by three components:
- Orchestrator: The workflow management platform that executes the flow (e.g.,
airflow,azkaban,prefect,dagster) - Flow ID: A unique identifier for the workflow within the orchestrator (typically the DAG name, pipeline name, or workflow identifier)
- Cluster: The execution environment or cluster where the flow runs (e.g.,
prod,staging,dev)
The URN structure follows this pattern:
urn:li:dataFlow:(<orchestrator>,<flowId>,<cluster>)
URN Examples
Apache Airflow DAG in production:
urn:li:dataFlow:(airflow,daily_sales_pipeline,prod)
Prefect flow in staging:
urn:li:dataFlow:(prefect,customer_analytics,staging)
Azkaban workflow in development:
urn:li:dataFlow:(azkaban,data_quality_checks,dev)
Important Capabilities
Core DataFlow Information
DataFlows maintain essential metadata about the pipeline through the dataFlowInfo aspect:
- Name: The display name of the flow (may differ from flow_id)
- Description: Detailed explanation of what the flow does and its purpose
- Project: Optional namespace or project the flow belongs to
- Created/Modified Timestamps: When the flow was created and last modified in the source system
- External URL: Link to the flow in the orchestration platform's UI
- Custom Properties: Key-value pairs for additional platform-specific metadata
- Environment: The fabric type (PROD, DEV, STAGING, etc.)
Editable Properties
The editableDataFlowProperties aspect allows users to modify certain properties through the DataHub UI without interfering with ingestion from source systems:
- Description: User-edited documentation that takes precedence over ingested descriptions
This separation ensures that edits made in DataHub are preserved and not overwritten by subsequent ingestion runs.
Version Information
The versionInfo aspect tracks versioning details for the flow:
- Version: An identifier like a git commit hash or md5 hash
- Version Type: The type of version identifier being used (e.g., "git", "md5")
This is particularly useful for tracking changes to pipeline code and correlating pipeline versions with their execution history.
Relationship with DataJobs
DataFlows act as parent entities for DataJobs. Each DataJob's identity includes a reference to its parent DataFlow through the flow field in the DataJobKey. This creates a hierarchical relationship:
DataFlow (Pipeline)
└─ DataJob (Task 1)
└─ DataJob (Task 2)
└─ DataJob (Task 3)
This structure mirrors how workflow orchestrators organize tasks within DAGs or pipelines.
Incidents Tracking
The incidentsSummary aspect provides visibility into data quality or operational issues:
- Active Incidents: Currently unresolved incidents affecting this flow
- Resolved Incidents: Historical record of incidents that have been addressed
- Incident Details: Type, priority, creation time, and resolution time for each incident
This enables DataHub to serve as a centralized incident management system for data pipelines.
Code Examples
Creating a DataFlow
Python SDK: Create a basic DataFlow
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient
client = DataHubClient.from_env()
dataflow = DataFlow(
name="example_dataflow",
platform="airflow",
description="airflow pipeline for production",
tags=[TagUrn(name="production"), TagUrn(name="data_engineering")],
)
client.entities.upsert(dataflow)
Creating a DataFlow with Comprehensive Metadata
Python SDK: Create a DataFlow with owners, tags, and custom properties
# metadata-ingestion/examples/library/dataflow_comprehensive.py
from datetime import datetime, timezone
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn, GlossaryTermUrn, TagUrn
from datahub.sdk import DataFlow, DataHubClient
client = DataHubClient.from_env()
# Create a DataFlow with comprehensive metadata
dataflow = DataFlow(
platform="airflow",
name="daily_sales_aggregation",
display_name="Daily Sales Aggregation Pipeline",
platform_instance="PROD-US-EAST",
env="PROD",
description="Aggregates daily sales data from multiple sources and updates reporting tables",
external_url="https://airflow.company.com/dags/daily_sales_aggregation",
custom_properties={
"team": "analytics",
"schedule": "0 2 * * *",
"sla_hours": "4",
"priority": "high",
},
created=datetime(2024, 1, 15, tzinfo=timezone.utc),
last_modified=datetime.now(timezone.utc),
subtype="ETL",
owners=[
CorpUserUrn("jdoe"),
CorpGroupUrn("data-engineering"),
],
tags=[
TagUrn(name="production"),
TagUrn(name="sales"),
TagUrn(name="critical"),
],
terms=[
GlossaryTermUrn("Classification.Confidential"),
],
domain="urn:li:domain:sales",
)
# Upsert the DataFlow
client.entities.upsert(dataflow)
print(f"Created DataFlow: {dataflow.urn}")
print(f"Display Name: {dataflow.display_name}")
print(f"Description: {dataflow.description}")
print(f"External URL: {dataflow.external_url}")
print(f"Custom Properties: {dataflow.custom_properties}")
Reading DataFlow Metadata
Python SDK: Read a DataFlow and access its properties
from datahub.sdk import DataFlowUrn, DataHubClient
client = DataHubClient.from_env()
# Or get this from the UI (share -> copy urn) and use DataFlowUrn.from_string(...)
dataflow_urn = DataFlowUrn("airflow", "example_dataflow_id")
dataflow_entity = client.entities.get(dataflow_urn)
print("DataFlow name:", dataflow_entity.name)
print("DataFlow platform:", dataflow_entity.platform)
print("DataFlow description:", dataflow_entity.description)
Adding Tags and Terms to a DataFlow
Python SDK: Add tags and glossary terms to a DataFlow
# metadata-ingestion/examples/library/dataflow_add_tags_terms.py
from datahub.metadata.urns import DataFlowUrn, GlossaryTermUrn, TagUrn
from datahub.sdk import DataHubClient
client = DataHubClient.from_env()
# Get the existing DataFlow
dataflow_urn = DataFlowUrn("airflow", "daily_sales_pipeline", "prod")
dataflow = client.entities.get(dataflow_urn)
# Add tags
dataflow.add_tag(TagUrn(name="pii"))
dataflow.add_tag(TagUrn(name="quarterly-review"))
# Add glossary terms
dataflow.add_term(GlossaryTermUrn("DataQuality.Validated"))
dataflow.add_term(GlossaryTermUrn("BusinessCritical.Revenue"))
# Save changes
client.entities.upsert(dataflow)
print(f"Updated DataFlow: {dataflow.urn}")
print(f"Tags: {[str(tag) for tag in dataflow.tags] if dataflow.tags else []}")
print(f"Terms: {[str(term) for term in dataflow.terms] if dataflow.terms else []}")
Adding Ownership to a DataFlow
Python SDK: Set owners for a DataFlow
# metadata-ingestion/examples/library/dataflow_add_ownership.py
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn, DataFlowUrn
from datahub.sdk import DataHubClient
client = DataHubClient.from_env()
# Get the existing DataFlow
dataflow_urn = DataFlowUrn("airflow", "daily_sales_pipeline", "prod")
dataflow = client.entities.get(dataflow_urn)
# Add individual owners
dataflow.add_owner((CorpUserUrn("alice"), "DATAOWNER"))
dataflow.add_owner((CorpUserUrn("bob"), "DEVELOPER"))
# Add group owner
dataflow.add_owner((CorpGroupUrn("analytics-team"), "DATAOWNER"))
# Save changes
client.entities.upsert(dataflow)
print(f"Updated DataFlow: {dataflow.urn}")
print(f"Owners: {dataflow.owners}")
Querying DataFlow via REST API
DataFlows can be queried using the standard DataHub REST APIs:
REST API: Fetch a DataFlow entity
# Get a complete DataFlow snapshot
curl 'http://localhost:8080/entities/urn%3Ali%3AdataFlow%3A(airflow,daily_sales_pipeline,prod)'
Response includes all aspects:
{
"urn": "urn:li:dataFlow:(airflow,daily_sales_pipeline,prod)",
"aspects": {
"dataFlowKey": {
"orchestrator": "airflow",
"flowId": "daily_sales_pipeline",
"cluster": "prod"
},
"dataFlowInfo": {
"name": "Daily Sales Pipeline",
"description": "Processes daily sales data and updates aggregates",
"project": "analytics",
"externalUrl": "https://airflow.company.com/dags/daily_sales_pipeline"
},
"ownership": { ... },
"globalTags": { ... }
}
}
Creating DataFlow with DataJobs
Python SDK: Create a DataFlow with associated DataJobs
from datahub.metadata.urns import DatasetUrn, TagUrn
from datahub.sdk import DataFlow, DataHubClient, DataJob
client = DataHubClient.from_env()
# datajob will inherit the platform and platform instance from the flow
dataflow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="PROD",
description="example dataflow",
tags=[TagUrn(name="tag1"), TagUrn(name="tag2")],
)
datajob = DataJob(
name="example_datajob",
flow=dataflow,
inlets=[
DatasetUrn(platform="hdfs", name="dataset1", env="PROD"),
],
outlets=[
DatasetUrn(platform="hdfs", name="dataset2", env="PROD"),
],
)
client.entities.upsert(dataflow)
client.entities.upsert(datajob)
Integration Points
Relationship with DataJobs
DataFlows have a parent-child relationship with DataJobs through the IsPartOf relationship. This is the primary integration point:
from datahub.sdk import DataFlow, DataJob, DataHubClient
# DataJob automatically links to its parent DataFlow
flow = DataFlow(platform="airflow", name="my_dag")
job = DataJob(name="extract_data", flow=flow)
Relationship with Datasets
While DataFlows don't directly reference datasets, their child DataJobs establish lineage relationships with datasets through:
- Inlets: Input datasets consumed by jobs
- Outlets: Output datasets produced by jobs
This creates indirect lineage from DataFlows to datasets through their constituent jobs.
Orchestrator Platform Integration
Common orchestrators that produce DataFlow entities:
- Apache Airflow: Each DAG becomes a DataFlow, tasks become DataJobs
- Prefect: Flows are DataFlows, tasks are DataJobs
- Dagster: Jobs/Pipelines are DataFlows, ops/solids are DataJobs
- Azkaban: Flows are DataFlows, jobs are DataJobs
- Fivetran: Connectors are represented as DataFlows with sync operations as DataJobs
DataProcessInstance Tracking
DataFlow executions can be tracked using DataProcessInstance entities, which record:
- Start and end times of flow runs
- Success/failure status
- Input and output datasets for specific runs
This enables tracking of pipeline run history and troubleshooting failures.
GraphQL Integration
The DataFlow entity is exposed through DataHub's GraphQL API with full support for:
- Querying flow metadata
- Browsing flows by orchestrator and cluster
- Searching flows by name, description, and project
- Updating flow properties (ownership, tags, terms, etc.)
Key GraphQL resolvers:
dataFlow: Fetch a single DataFlow by URNsearchAcrossEntities: Search for DataFlows with filtersupdateDataFlow: Modify DataFlow properties
Notable Exceptions
Cluster vs Environment
While DataFlows use the cluster field in their URN for identification, they also have an env field in the dataFlowInfo aspect. These serve different purposes:
- Cluster: Part of the identity, typically matches the environment but can represent specific deployment instances
- Environment (env): A semantic indicator of the fabric type (PROD, DEV, STAGING) that's searchable and filterable
In most cases, these should align (e.g., cluster="prod" and env="PROD"), but they can diverge when multiple production clusters exist or when representing complex deployment topologies.
Platform vs Orchestrator
Throughout the codebase, you'll see both terms used:
- In the URN structure and key aspect, the field is called
orchestrator - In Python SDK and some APIs, it's referred to as
platform
These are synonymous and refer to the same concept: the workflow management system executing the flow.
Legacy API vs SDK
Two APIs exist for creating DataFlows:
- Legacy API (
datahub.api.entities.datajob.DataFlow): Uses the older emitter pattern - Modern SDK (
datahub.sdk.DataFlow): Preferred approach with cleaner interfaces
New code should use the modern SDK (imported from datahub.sdk), though both are maintained for backward compatibility.
DataFlow vs DataPipeline
In some contexts, you might see references to "data pipelines" in documentation or UI. These are informal terms that refer to DataFlows. The formal entity type in the metadata model is dataFlow, not dataPipeline.
Technical Reference
For technical details about fields, searchability, and relationships, view the Columns tab in DataHub.