Skip to main content
Version: Next

DataFlow

A DataFlow represents a high-level data processing pipeline or workflow orchestrated by systems like Apache Airflow, Azkaban, Prefect, Dagster, or similar workflow management platforms. DataFlows serve as containers for related DataJobs, representing the overall execution context and organization of data processing tasks.

Identity

DataFlows are uniquely identified by three components:

  • Orchestrator: The workflow management platform that executes the flow (e.g., airflow, azkaban, prefect, dagster)
  • Flow ID: A unique identifier for the workflow within the orchestrator (typically the DAG name, pipeline name, or workflow identifier)
  • Cluster: The execution environment or cluster where the flow runs (e.g., prod, staging, dev)

The URN structure follows this pattern:

urn:li:dataFlow:(<orchestrator>,<flowId>,<cluster>)

URN Examples

Apache Airflow DAG in production:

urn:li:dataFlow:(airflow,daily_sales_pipeline,prod)

Prefect flow in staging:

urn:li:dataFlow:(prefect,customer_analytics,staging)

Azkaban workflow in development:

urn:li:dataFlow:(azkaban,data_quality_checks,dev)

Important Capabilities

Core DataFlow Information

DataFlows maintain essential metadata about the pipeline through the dataFlowInfo aspect:

  • Name: The display name of the flow (may differ from flow_id)
  • Description: Detailed explanation of what the flow does and its purpose
  • Project: Optional namespace or project the flow belongs to
  • Created/Modified Timestamps: When the flow was created and last modified in the source system
  • External URL: Link to the flow in the orchestration platform's UI
  • Custom Properties: Key-value pairs for additional platform-specific metadata
  • Environment: The fabric type (PROD, DEV, STAGING, etc.)

Editable Properties

The editableDataFlowProperties aspect allows users to modify certain properties through the DataHub UI without interfering with ingestion from source systems:

  • Description: User-edited documentation that takes precedence over ingested descriptions

This separation ensures that edits made in DataHub are preserved and not overwritten by subsequent ingestion runs.

Version Information

The versionInfo aspect tracks versioning details for the flow:

  • Version: An identifier like a git commit hash or md5 hash
  • Version Type: The type of version identifier being used (e.g., "git", "md5")

This is particularly useful for tracking changes to pipeline code and correlating pipeline versions with their execution history.

Relationship with DataJobs

DataFlows act as parent entities for DataJobs. Each DataJob's identity includes a reference to its parent DataFlow through the flow field in the DataJobKey. This creates a hierarchical relationship:

DataFlow (Pipeline)
└─ DataJob (Task 1)
└─ DataJob (Task 2)
└─ DataJob (Task 3)

This structure mirrors how workflow orchestrators organize tasks within DAGs or pipelines.

Incidents Tracking

The incidentsSummary aspect provides visibility into data quality or operational issues:

  • Active Incidents: Currently unresolved incidents affecting this flow
  • Resolved Incidents: Historical record of incidents that have been addressed
  • Incident Details: Type, priority, creation time, and resolution time for each incident

This enables DataHub to serve as a centralized incident management system for data pipelines.

Code Examples

Creating a DataFlow

Python SDK: Create a basic DataFlow
# Inlined from /metadata-ingestion/examples/library/dataflow_create.py
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient

client = DataHubClient.from_env()

dataflow = DataFlow(
name="example_dataflow",
platform="airflow",
description="airflow pipeline for production",
tags=[TagUrn(name="production"), TagUrn(name="data_engineering")],
)

client.entities.upsert(dataflow)

Creating a DataFlow with Comprehensive Metadata

Python SDK: Create a DataFlow with owners, tags, and custom properties
# Inlined from /metadata-ingestion/examples/library/dataflow_comprehensive.py
# metadata-ingestion/examples/library/dataflow_comprehensive.py
from datetime import datetime, timezone

from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn, GlossaryTermUrn, TagUrn
from datahub.sdk import DataFlow, DataHubClient

client = DataHubClient.from_env()

# Create a DataFlow with comprehensive metadata
dataflow = DataFlow(
platform="airflow",
name="daily_sales_aggregation",
display_name="Daily Sales Aggregation Pipeline",
platform_instance="PROD-US-EAST",
env="PROD",
description="Aggregates daily sales data from multiple sources and updates reporting tables",
external_url="https://airflow.company.com/dags/daily_sales_aggregation",
custom_properties={
"team": "analytics",
"schedule": "0 2 * * *",
"sla_hours": "4",
"priority": "high",
},
created=datetime(2024, 1, 15, tzinfo=timezone.utc),
last_modified=datetime.now(timezone.utc),
subtype="ETL",
owners=[
CorpUserUrn("jdoe"),
CorpGroupUrn("data-engineering"),
],
tags=[
TagUrn(name="production"),
TagUrn(name="sales"),
TagUrn(name="critical"),
],
terms=[
GlossaryTermUrn("Classification.Confidential"),
],
domain="urn:li:domain:sales",
)

# Upsert the DataFlow
client.entities.upsert(dataflow)

print(f"Created DataFlow: {dataflow.urn}")
print(f"Display Name: {dataflow.display_name}")
print(f"Description: {dataflow.description}")
print(f"External URL: {dataflow.external_url}")
print(f"Custom Properties: {dataflow.custom_properties}")

Reading DataFlow Metadata

Python SDK: Read a DataFlow and access its properties
# Inlined from /metadata-ingestion/examples/library/dataflow_read.py
from datahub.sdk import DataFlowUrn, DataHubClient

client = DataHubClient.from_env()

# Or get this from the UI (share -> copy urn) and use DataFlowUrn.from_string(...)
dataflow_urn = DataFlowUrn("airflow", "example_dataflow_id")

dataflow_entity = client.entities.get(dataflow_urn)
print("DataFlow name:", dataflow_entity.name)
print("DataFlow platform:", dataflow_entity.platform)
print("DataFlow description:", dataflow_entity.description)

Adding Tags and Terms to a DataFlow

Python SDK: Add tags and glossary terms to a DataFlow
# Inlined from /metadata-ingestion/examples/library/dataflow_add_tags_terms.py
# metadata-ingestion/examples/library/dataflow_add_tags_terms.py
from datahub.metadata.urns import DataFlowUrn, GlossaryTermUrn, TagUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

# Get the existing DataFlow
dataflow_urn = DataFlowUrn("airflow", "daily_sales_pipeline", "prod")
dataflow = client.entities.get(dataflow_urn)

# Add tags
dataflow.add_tag(TagUrn(name="pii"))
dataflow.add_tag(TagUrn(name="quarterly-review"))

# Add glossary terms
dataflow.add_term(GlossaryTermUrn("DataQuality.Validated"))
dataflow.add_term(GlossaryTermUrn("BusinessCritical.Revenue"))

# Save changes
client.entities.upsert(dataflow)

print(f"Updated DataFlow: {dataflow.urn}")
print(f"Tags: {[str(tag) for tag in dataflow.tags] if dataflow.tags else []}")
print(f"Terms: {[str(term) for term in dataflow.terms] if dataflow.terms else []}")

Adding Ownership to a DataFlow

Python SDK: Set owners for a DataFlow
# Inlined from /metadata-ingestion/examples/library/dataflow_add_ownership.py
# metadata-ingestion/examples/library/dataflow_add_ownership.py
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn, DataFlowUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

# Get the existing DataFlow
dataflow_urn = DataFlowUrn("airflow", "daily_sales_pipeline", "prod")
dataflow = client.entities.get(dataflow_urn)

# Add individual owners
dataflow.add_owner((CorpUserUrn("alice"), "DATAOWNER"))
dataflow.add_owner((CorpUserUrn("bob"), "DEVELOPER"))

# Add group owner
dataflow.add_owner((CorpGroupUrn("analytics-team"), "DATAOWNER"))

# Save changes
client.entities.upsert(dataflow)

print(f"Updated DataFlow: {dataflow.urn}")
print(f"Owners: {dataflow.owners}")

Querying DataFlow via REST API

DataFlows can be queried using the standard DataHub REST APIs:

REST API: Fetch a DataFlow entity
# Get a complete DataFlow snapshot
curl 'http://localhost:8080/entities/urn%3Ali%3AdataFlow%3A(airflow,daily_sales_pipeline,prod)'

Response includes all aspects:

{
"urn": "urn:li:dataFlow:(airflow,daily_sales_pipeline,prod)",
"aspects": {
"dataFlowKey": {
"orchestrator": "airflow",
"flowId": "daily_sales_pipeline",
"cluster": "prod"
},
"dataFlowInfo": {
"name": "Daily Sales Pipeline",
"description": "Processes daily sales data and updates aggregates",
"project": "analytics",
"externalUrl": "https://airflow.company.com/dags/daily_sales_pipeline"
},
"ownership": { ... },
"globalTags": { ... }
}
}

Creating DataFlow with DataJobs

Python SDK: Create a DataFlow with associated DataJobs
# Inlined from /metadata-ingestion/examples/library/datajob_create_full.py
from datahub.metadata.urns import DatasetUrn, TagUrn
from datahub.sdk import DataFlow, DataHubClient, DataJob

client = DataHubClient.from_env()

# datajob will inherit the platform and platform instance from the flow

dataflow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="PROD",
description="example dataflow",
tags=[TagUrn(name="tag1"), TagUrn(name="tag2")],
)

datajob = DataJob(
name="example_datajob",
flow=dataflow,
inlets=[
DatasetUrn(platform="hdfs", name="dataset1", env="PROD"),
],
outlets=[
DatasetUrn(platform="hdfs", name="dataset2", env="PROD"),
],
)

client.entities.upsert(dataflow)
client.entities.upsert(datajob)

Integration Points

Relationship with DataJobs

DataFlows have a parent-child relationship with DataJobs through the IsPartOf relationship. This is the primary integration point:

from datahub.sdk import DataFlow, DataJob, DataHubClient

# DataJob automatically links to its parent DataFlow
flow = DataFlow(platform="airflow", name="my_dag")
job = DataJob(name="extract_data", flow=flow)

Relationship with Datasets

While DataFlows don't directly reference datasets, their child DataJobs establish lineage relationships with datasets through:

  • Inlets: Input datasets consumed by jobs
  • Outlets: Output datasets produced by jobs

This creates indirect lineage from DataFlows to datasets through their constituent jobs.

Orchestrator Platform Integration

Common orchestrators that produce DataFlow entities:

  • Apache Airflow: Each DAG becomes a DataFlow, tasks become DataJobs
  • Prefect: Flows are DataFlows, tasks are DataJobs
  • Dagster: Jobs/Pipelines are DataFlows, ops/solids are DataJobs
  • Azkaban: Flows are DataFlows, jobs are DataJobs
  • Fivetran: Connectors are represented as DataFlows with sync operations as DataJobs

DataProcessInstance Tracking

DataFlow executions can be tracked using DataProcessInstance entities, which record:

  • Start and end times of flow runs
  • Success/failure status
  • Input and output datasets for specific runs

This enables tracking of pipeline run history and troubleshooting failures.

GraphQL Integration

The DataFlow entity is exposed through DataHub's GraphQL API with full support for:

  • Querying flow metadata
  • Browsing flows by orchestrator and cluster
  • Searching flows by name, description, and project
  • Updating flow properties (ownership, tags, terms, etc.)

Key GraphQL resolvers:

  • dataFlow: Fetch a single DataFlow by URN
  • searchAcrossEntities: Search for DataFlows with filters
  • updateDataFlow: Modify DataFlow properties

Notable Exceptions

Cluster vs Environment

While DataFlows use the cluster field in their URN for identification, they also have an env field in the dataFlowInfo aspect. These serve different purposes:

  • Cluster: Part of the identity, typically matches the environment but can represent specific deployment instances
  • Environment (env): A semantic indicator of the fabric type (PROD, DEV, STAGING) that's searchable and filterable

In most cases, these should align (e.g., cluster="prod" and env="PROD"), but they can diverge when multiple production clusters exist or when representing complex deployment topologies.

Platform vs Orchestrator

Throughout the codebase, you'll see both terms used:

  • In the URN structure and key aspect, the field is called orchestrator
  • In Python SDK and some APIs, it's referred to as platform

These are synonymous and refer to the same concept: the workflow management system executing the flow.

Legacy API vs SDK

Two APIs exist for creating DataFlows:

  1. Legacy API (datahub.api.entities.datajob.DataFlow): Uses the older emitter pattern
  2. Modern SDK (datahub.sdk.DataFlow): Preferred approach with cleaner interfaces

New code should use the modern SDK (imported from datahub.sdk), though both are maintained for backward compatibility.

DataFlow vs DataPipeline

In some contexts, you might see references to "data pipelines" in documentation or UI. These are informal terms that refer to DataFlows. The formal entity type in the metadata model is dataFlow, not dataPipeline.

Technical Reference Guide

The sections above provide an overview of how to use this entity. The following sections provide detailed technical information about how metadata is stored and represented in DataHub.

Aspects are the individual pieces of metadata that can be attached to an entity. Each aspect contains specific information (like ownership, tags, or properties) and is stored as a separate record, allowing for flexible and incremental metadata updates.

Relationships show how this entity connects to other entities in the metadata graph. These connections are derived from the fields within each aspect and form the foundation of DataHub's knowledge graph.

Reading the Field Tables

Each aspect's field table includes an Annotations column that provides additional metadata about how fields are used:

  • ⚠️ Deprecated: This field is deprecated and may be removed in a future version. Check the description for the recommended alternative
  • Searchable: This field is indexed and can be searched in DataHub's search interface
  • Searchable (fieldname): When the field name in parentheses is shown, it indicates the field is indexed under a different name in the search index. For example, dashboardTool is indexed as tool
  • → RelationshipName: This field creates a relationship to another entity. The arrow indicates this field contains a reference (URN) to another entity, and the name indicates the type of relationship (e.g., → Contains, → OwnedBy)

Fields with complex types (like Edge, AuditStamp) link to their definitions in the Common Types section below.

Aspects

dataFlowKey

Key for a Data Flow

FieldTypeRequiredDescriptionAnnotations
orchestratorstringWorkflow manager like azkaban, airflow which orchestrates the flowSearchable
flowIdstringUnique Identifier of the data flowSearchable
clusterstringCluster where the flow is executedSearchable

dataFlowInfo

Information about a Data processing flow

FieldTypeRequiredDescriptionAnnotations
customPropertiesmapCustom property bag.Searchable
externalUrlstringURL where the reference existSearchable
namestringFlow nameSearchable
descriptionstringFlow descriptionSearchable
projectstringOptional project/namespace associated with the flowSearchable
createdTimeStampA timestamp documenting when the asset was created in the source Data Platform (not on DataHub)Searchable
lastModifiedTimeStampA timestamp documenting when the asset was last modified in the source Data Platform (not on Data...Searchable
envFabricTypeEnvironment for this flowSearchable

editableDataFlowProperties

Stores editable changes made to properties. This separates changes made from ingestion pipelines and edits in the UI to avoid accidental overwrites of user-provided data by ingestion pipelines

FieldTypeRequiredDescriptionAnnotations
createdAuditStampAn AuditStamp corresponding to the creation of this resource/association/sub-resource. A value of...
lastModifiedAuditStampAn AuditStamp corresponding to the last modification of this resource/association/sub-resource. I...
deletedAuditStampAn AuditStamp corresponding to the deletion of this resource/association/sub-resource. Logically,...
descriptionstringEdited documentation of the data flowSearchable (editedDescription)

ownership

Ownership information of an entity.

FieldTypeRequiredDescriptionAnnotations
ownersOwner[]List of owners of the entity.
ownerTypesmapOwnership type to Owners map, populated via mutation hook.Searchable
lastModifiedAuditStampAudit stamp containing who last modified the record and when. A value of 0 in the time field indi...

status

The lifecycle status metadata of an entity, e.g. dataset, metric, feature, etc. This aspect is used to represent soft deletes conventionally.

FieldTypeRequiredDescriptionAnnotations
removedbooleanWhether the entity has been removed (soft-deleted).Searchable

globalTags

Tag aspect used for applying tags to an entity

FieldTypeRequiredDescriptionAnnotations
tagsTagAssociation[]Tags associated with a given entitySearchable, → TaggedWith

browsePaths

Shared aspect containing Browse Paths to be indexed for an entity.

FieldTypeRequiredDescriptionAnnotations
pathsstring[]A list of valid browse paths for the entity. Browse paths are expected to be forward slash-separ...Searchable

glossaryTerms

Related business terms information

FieldTypeRequiredDescriptionAnnotations
termsGlossaryTermAssociation[]The related business terms
auditStampAuditStampAudit stamp containing who reported the related business term

institutionalMemory

Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity.

FieldTypeRequiredDescriptionAnnotations
elementsInstitutionalMemoryMetadata[]List of records that represent institutional memory of an entity. Each record consists of a link,...

dataPlatformInstance

The specific instance of the data platform that this entity belongs to

FieldTypeRequiredDescriptionAnnotations
platformstringData PlatformSearchable
instancestringInstance of the data platform (e.g. db instance)Searchable (platformInstance)

browsePathsV2

Shared aspect containing a Browse Path to be indexed for an entity.

FieldTypeRequiredDescriptionAnnotations
pathBrowsePathEntry[]A valid browse path for the entity. This field is provided by DataHub by default. This aspect is ...Searchable

domains

Links from an Asset to its Domains

FieldTypeRequiredDescriptionAnnotations
domainsstring[]The Domains attached to an AssetSearchable, → AssociatedWith

applications

Links from an Asset to its Applications

FieldTypeRequiredDescriptionAnnotations
applicationsstring[]The Applications attached to an AssetSearchable, → AssociatedWith

deprecation

Deprecation status of an entity

FieldTypeRequiredDescriptionAnnotations
deprecatedbooleanWhether the entity is deprecated.Searchable
decommissionTimelongThe time user plan to decommission this entity.
notestringAdditional information about the entity deprecation plan, such as the wiki, doc, RB.
actorstringThe user URN which will be credited for modifying this deprecation content.
replacementstring

versionInfo

Information about a Data processing job

FieldTypeRequiredDescriptionAnnotations
customPropertiesmapCustom property bag.Searchable
externalUrlstringURL where the reference existSearchable
versionstringThe version which can indentify a job version like a commit hash or md5 hash
versionTypestringThe type of the version like git hash or md5 hash

container

Link from an asset to its parent container

FieldTypeRequiredDescriptionAnnotations
containerstringThe parent container of an assetSearchable, → IsPartOf

structuredProperties

Properties about an entity governed by StructuredPropertyDefinition

FieldTypeRequiredDescriptionAnnotations
propertiesStructuredPropertyValueAssignment[]Custom property bag.

incidentsSummary

Summary related incidents on an entity.

FieldTypeRequiredDescriptionAnnotations
resolvedIncidentsstring[]Resolved incidents for an asset Deprecated! Use the richer resolvedIncidentsDetails instead.⚠️ Deprecated
activeIncidentsstring[]Active incidents for an asset Deprecated! Use the richer activeIncidentsDetails instead.⚠️ Deprecated
resolvedIncidentDetailsIncidentSummaryDetails[]Summary details about the set of resolved incidentsSearchable, → ResolvedIncidents
activeIncidentDetailsIncidentSummaryDetails[]Summary details about the set of active incidentsSearchable, → ActiveIncidents

forms

Forms that are assigned to this entity to be filled out

FieldTypeRequiredDescriptionAnnotations
incompleteFormsFormAssociation[]All incomplete forms assigned to the entity.Searchable
completedFormsFormAssociation[]All complete forms assigned to the entity.Searchable
verificationsFormVerificationAssociation[]Verifications that have been applied to the entity via completed forms.Searchable

subTypes

Sub Types. Use this aspect to specialize a generic Entity e.g. Making a Dataset also be a View or also be a LookerExplore

FieldTypeRequiredDescriptionAnnotations
typeNamesstring[]The names of the specific types.Searchable

testResults

Information about a Test Result

FieldTypeRequiredDescriptionAnnotations
failingTestResult[]Results that are failingSearchable, → IsFailing
passingTestResult[]Results that are passingSearchable, → IsPassing

Common Types

These types are used across multiple aspects in this entity.

AuditStamp

Data captured on a resource/association/sub-resource level giving insight into when that resource/association/sub-resource moved into a particular lifecycle stage, and who acted to move it into that specific lifecycle stage.

Fields:

  • time (long): When did the resource/association/sub-resource move into the specific lifecyc...
  • actor (string): The entity (e.g. a member URN) which will be credited for moving the resource...
  • impersonator (string?): The entity (e.g. a service URN) which performs the change on behalf of the Ac...
  • message (string?): Additional context around how DataHub was informed of the particular change. ...

FormAssociation

Properties of an applied form.

Fields:

  • urn (string): Urn of the applied form
  • incompletePrompts (FormPromptAssociation[]): A list of prompts that are not yet complete for this form.
  • completedPrompts (FormPromptAssociation[]): A list of prompts that have been completed for this form.

IncidentSummaryDetails

Summary statistics about incidents on an entity.

Fields:

  • urn (string): The urn of the incident
  • type (string): The type of an incident
  • createdAt (long): The time at which the incident was raised in milliseconds since epoch.
  • resolvedAt (long?): The time at which the incident was marked as resolved in milliseconds since e...
  • priority (int?): The priority of the incident

TestResult

Information about a Test Result

Fields:

  • test (string): The urn of the test
  • type (TestResultType): The type of the result
  • testDefinitionMd5 (string?): The md5 of the test definition that was used to compute this result. See Test...
  • lastComputed (AuditStamp?): The audit stamp of when the result was computed, including the actor who comp...

TimeStamp

A standard event timestamp

Fields:

  • time (long): When did the event occur
  • actor (string?): Optional: The actor urn involved in the event.

Relationships

Outgoing

These are the relationships stored in this entity's aspects

  • OwnedBy

    • Corpuser via ownership.owners.owner
    • CorpGroup via ownership.owners.owner
  • ownershipType

    • OwnershipType via ownership.owners.typeUrn
  • TaggedWith

    • Tag via globalTags.tags
  • TermedWith

    • GlossaryTerm via glossaryTerms.terms.urn
  • AssociatedWith

    • Domain via domains.domains
    • Application via applications.applications
  • IsPartOf

    • Container via container.container
  • ResolvedIncidents

    • Incident via incidentsSummary.resolvedIncidentDetails
  • ActiveIncidents

    • Incident via incidentsSummary.activeIncidentDetails
  • IsFailing

    • Test via testResults.failing
  • IsPassing

    • Test via testResults.passing

Incoming

These are the relationships stored in other entity's aspects

  • IsPartOf

    • DataJob via dataJobKey.flow

Global Metadata Model

Global Graph