DataJob
Data jobs represent individual units of data processing work within a data pipeline or workflow. They are the tasks, steps, or operations that transform, move, or process data as part of a larger data flow. Examples include Airflow tasks, dbt models, Spark jobs, Databricks notebooks, and similar processing units in orchestration systems.
Identity
Data jobs are identified by two pieces of information:
- The data flow (pipeline/workflow) that they belong to: this is represented as a URN pointing to the parent
dataFlowentity. The data flow defines the orchestrator (e.g.,airflow,spark,dbt), the flow ID (e.g., the DAG name or pipeline name), and the cluster where it runs. - The unique job identifier within that flow: this is a string that uniquely identifies the task within its parent flow (e.g., task name, step name, model name).
The URN structure for a data job is: urn:li:dataJob:(urn:li:dataFlow:(<orchestrator>,<flow_id>,<cluster>),<job_id>)
Examples
Airflow task:
urn:li:dataJob:(urn:li:dataFlow:(airflow,daily_etl_dag,prod),transform_customer_data)
dbt model:
urn:li:dataJob:(urn:li:dataFlow:(dbt,analytics_project,prod),staging.stg_customers)
Spark job:
urn:li:dataJob:(urn:li:dataFlow:(spark,data_processing_pipeline,PROD),aggregate_sales_task)
Databricks notebook:
urn:li:dataJob:(urn:li:dataFlow:(databricks,etl_workflow,production),process_events_notebook)
Important Capabilities
Job Information (dataJobInfo)
The dataJobInfo aspect captures the core properties of a data job:
- Name: Human-readable name of the job (searchable with autocomplete)
- Description: Detailed description of what the job does
- Type: The type of job (e.g., SQL, Python, Spark, etc.)
- Flow URN: Reference to the parent data flow
- Created/Modified timestamps: When the job was created or last modified in the source system
- Environment: The fabric/environment where the job runs (PROD, DEV, QA, etc.)
- Custom properties: Additional key-value properties specific to the source system
- External references: Links to external documentation or definitions (e.g., GitHub links)
Input/Output Lineage (dataJobInputOutput)
The dataJobInputOutput aspect defines the data lineage relationships for the job:
- Input datasets: Datasets consumed by the job during processing (via
inputDatasetEdges) - Output datasets: Datasets produced by the job (via
outputDatasetEdges) - Input data jobs: Other data jobs that this job depends on (via
inputDatajobEdges) - Input dataset fields: Specific schema fields consumed from input datasets
- Output dataset fields: Specific schema fields produced in output datasets
- Fine-grained lineage: Column-level lineage mappings showing which upstream fields contribute to downstream fields
This aspect establishes the critical relationships that enable DataHub to build and visualize data lineage graphs across your entire data ecosystem.
Editable Properties (editableDataJobProperties)
The editableDataJobProperties aspect stores documentation edits made through the DataHub UI:
- Description: User-edited documentation that complements or overrides the ingested description
- Change audit stamps: Tracks who made edits and when
This separation ensures that manual edits in the UI are preserved and not overwritten by ingestion pipelines.
Ownership
Like other entities, data jobs support ownership through the ownership aspect. Owners can be users or groups with various ownership types (DATAOWNER, PRODUCER, DEVELOPER, etc.). This helps identify who is responsible for maintaining and troubleshooting the job.
Tags and Glossary Terms
Data jobs can be tagged and associated with glossary terms:
- Tags (
globalTagsaspect): Used for categorization, classification, or operational purposes (e.g., PII, critical, deprecated) - Glossary terms (
glossaryTermsaspect): Link jobs to business terminology and concepts from your glossary
Domains and Applications
Data jobs can be organized into:
- Domains (
domainsaspect): Business domains or data domains for organizational structure - Applications (
applicationsaspect): Associated with specific applications or systems
Structured Properties and Forms
Data jobs support:
- Structured properties: Custom typed properties defined by your organization
- Forms: Structured documentation forms for consistency
Code Examples
Creating a Data Job
The simplest way to create a data job is using the Python SDK v2:
Python SDK: Create a basic data job
# metadata-ingestion/examples/library/datajob_create_basic.py
from datahub.metadata.urns import DataFlowUrn, DatasetUrn
from datahub.sdk import DataHubClient, DataJob
client = DataHubClient.from_env()
datajob = DataJob(
name="transform_customer_data",
flow_urn=DataFlowUrn(
orchestrator="airflow",
flow_id="daily_etl_pipeline",
cluster="prod",
),
description="Transforms raw customer data into analytics-ready format",
inlets=[
DatasetUrn(platform="postgres", name="raw.customers", env="PROD"),
DatasetUrn(platform="postgres", name="raw.addresses", env="PROD"),
],
outlets=[
DatasetUrn(platform="snowflake", name="analytics.dim_customers", env="PROD"),
],
)
client.entities.upsert(datajob)
print(f"Created data job: {datajob.urn}")
Adding Tags, Terms, and Ownership
Common metadata can be added to data jobs to enhance discoverability and governance:
Python SDK: Add tags, terms, and ownership to a data job
# metadata-ingestion/examples/library/datajob_add_tags_terms_ownership.py
from datahub.metadata.urns import (
CorpUserUrn,
DataFlowUrn,
DataJobUrn,
GlossaryTermUrn,
TagUrn,
)
from datahub.sdk import DataHubClient
client = DataHubClient.from_env()
datajob_urn = DataJobUrn(
job_id="transform_customer_data",
flow=DataFlowUrn(
orchestrator="airflow", flow_id="daily_etl_pipeline", cluster="prod"
),
)
datajob = client.entities.get(datajob_urn)
datajob.add_tag(TagUrn("Critical"))
datajob.add_tag(TagUrn("ETL"))
datajob.add_term(GlossaryTermUrn("CustomerData"))
datajob.add_term(GlossaryTermUrn("DataTransformation"))
datajob.add_owner(CorpUserUrn("data_engineering_team"))
datajob.add_owner(CorpUserUrn("john.doe"))
client.entities.update(datajob)
print(f"Added tags, terms, and ownership to {datajob_urn}")
Updating Job Properties
You can update job properties like descriptions using the low-level APIs:
Python SDK: Update data job description
# metadata-ingestion/examples/library/datajob_update_description.py
from datahub.sdk import DataFlowUrn, DataHubClient, DataJobUrn
client = DataHubClient.from_env()
dataflow_urn = DataFlowUrn(
orchestrator="airflow", flow_id="daily_etl_pipeline", cluster="prod"
)
datajob_urn = DataJobUrn(flow=dataflow_urn, job_id="transform_customer_data")
datajob = client.entities.get(datajob_urn)
datajob.set_description(
"This job performs critical customer data transformation. "
"It joins raw customer records with address information and applies "
"data quality rules before loading into the analytics warehouse."
)
client.entities.update(datajob)
print(f"Updated description for {datajob_urn}")
Querying Data Job Information
Retrieve data job information via the REST API:
REST API: Query a data job
# metadata-ingestion/examples/library/datajob_query_rest.py
import json
from urllib.parse import quote
import requests
datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,daily_etl_pipeline,prod),transform_customer_data)"
gms_server = "http://localhost:8080"
url = f"{gms_server}/entities/{quote(datajob_urn, safe='')}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
print(json.dumps(data, indent=2))
if "aspects" in data:
aspects = data["aspects"]
if "dataJobInfo" in aspects:
job_info = aspects["dataJobInfo"]["value"]
print(f"\nJob Name: {job_info.get('name')}")
print(f"Description: {job_info.get('description')}")
print(f"Type: {job_info.get('type')}")
if "dataJobInputOutput" in aspects:
lineage = aspects["dataJobInputOutput"]["value"]
print(f"\nInput Datasets: {len(lineage.get('inputDatasetEdges', []))}")
print(f"Output Datasets: {len(lineage.get('outputDatasetEdges', []))}")
if "ownership" in aspects:
ownership = aspects["ownership"]["value"]
print(f"\nOwners: {len(ownership.get('owners', []))}")
for owner in ownership.get("owners", []):
print(f" - {owner.get('owner')} ({owner.get('type')})")
if "globalTags" in aspects:
tags = aspects["globalTags"]["value"]
print("\nTags:")
for tag in tags.get("tags", []):
print(f" - {tag.get('tag')}")
else:
print(f"Failed to retrieve data job: {response.status_code}")
print(response.text)
Adding Lineage to Data Jobs
Data jobs are often used to define lineage relationships. See the existing lineage examples:
Python SDK: Add lineage using DataJobPatchBuilder
from datahub.emitter.mce_builder import (
make_data_job_urn,
make_dataset_urn,
make_schema_field_urn,
)
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.metadata.schema_classes import (
FineGrainedLineageClass as FineGrainedLineage,
FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType,
)
from datahub.specific.datajob import DataJobPatchBuilder
# Create DataHub Client
datahub_client = DataHubGraph(DataHubGraphConfig(server="http://localhost:8080"))
# Create DataJob URN
datajob_urn = make_data_job_urn(
orchestrator="airflow", flow_id="dag_abc", job_id="task_456"
)
# Create DataJob Patch to Add Lineage
patch_builder = DataJobPatchBuilder(datajob_urn)
patch_builder.add_input_dataset(
make_dataset_urn(platform="kafka", name="SampleKafkaDataset", env="PROD")
)
patch_builder.add_output_dataset(
make_dataset_urn(platform="hive", name="SampleHiveDataset", env="PROD")
)
patch_builder.add_input_datajob(
make_data_job_urn(orchestrator="airflow", flow_id="dag_abc", job_id="task_123")
)
patch_builder.add_input_dataset_field(
make_schema_field_urn(
parent_urn=make_dataset_urn(
platform="hive", name="fct_users_deleted", env="PROD"
),
field_path="user_id",
)
)
patch_builder.add_output_dataset_field(
make_schema_field_urn(
parent_urn=make_dataset_urn(
platform="hive", name="fct_users_created", env="PROD"
),
field_path="user_id",
)
)
# Update column-level lineage through the Data Job
lineage1 = FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
make_schema_field_urn(make_dataset_urn("postgres", "raw_data.users"), "user_id")
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(
make_dataset_urn("postgres", "analytics.user_metrics"),
"user_id",
)
],
transformOperation="IDENTITY",
confidenceScore=1.0,
)
patch_builder.add_fine_grained_lineage(lineage1)
patch_builder.remove_fine_grained_lineage(lineage1)
# Replaces all existing fine-grained lineages
patch_builder.set_fine_grained_lineages([lineage1])
patch_mcps = patch_builder.build()
# Emit DataJob Patch
for patch_mcp in patch_mcps:
datahub_client.emit(patch_mcp)
Python SDK: Define fine-grained lineage through a data job
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)
from datahub.metadata.schema_classes import DataJobInputOutputClass
def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl)
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)
# Lineage of fields output by a job
# bar.c1 <-- unknownFunc(bar2.c1, bar4.c1)
# bar.c2 <-- myfunc(bar3.c2)
# {bar.c3,bar.c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# bar.c5 <-- unknownFunc(bar3)
# {bar.c6,bar.c7} <-- unknownFunc(bar4)
# bar2.c9 has no upstream i.e. its values are somehow created independently within this job.
# Note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.
fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3", "c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore=0.8,
transformOperation="myfunc",
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore=0.7,
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")],
),
]
# The lineage of output col bar.c9 is unknown. So there is no lineage for it above.
# Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs.
dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
inputDatajobs=None,
inputDatasetFields=[
fldUrn("bar2", "c1"),
fldUrn("bar2", "c2"),
fldUrn("bar2", "c3"),
fldUrn("bar3", "c1"),
fldUrn("bar3", "c2"),
fldUrn("bar4", "c1"),
],
outputDatasetFields=[
fldUrn("bar", "c1"),
fldUrn("bar", "c2"),
fldUrn("bar", "c3"),
fldUrn("bar", "c4"),
fldUrn("bar", "c5"),
fldUrn("bar", "c6"),
fldUrn("bar", "c7"),
fldUrn("bar", "c9"),
fldUrn("bar2", "c9"),
],
fineGrainedLineages=fineGrainedLineages,
)
dataJobLineageMcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"),
aspect=dataJobInputOutput,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit metadata!
emitter.emit_mcp(dataJobLineageMcp)
Integration Points
Relationship with DataFlow
Every data job belongs to exactly one dataFlow entity, which represents the parent pipeline or workflow. The data flow captures:
- The orchestrator/platform (Airflow, Spark, dbt, etc.)
- The flow/pipeline/DAG identifier
- The cluster or environment where it executes
This hierarchical relationship allows DataHub to organize jobs within their workflows and understand the execution context.
Relationship with Datasets
Data jobs establish lineage by defining:
- Consumes relationships with input datasets
- Produces relationships with output datasets
These relationships are the foundation of DataHub's lineage graph. When a job processes data, it creates a connection between upstream sources and downstream outputs, enabling impact analysis and data discovery.
Relationship with DataProcessInstance
While dataJob represents the definition of a processing task, dataProcessInstance represents a specific execution or run of that job. Process instances capture:
- Runtime information (start time, end time, duration)
- Status (success, failure, running)
- Input/output datasets for that specific run
- Error messages and logs
This separation allows you to track both the static definition of a job and its dynamic runtime behavior.
GraphQL Resolvers
The DataHub GraphQL API provides rich query capabilities for data jobs:
- DataJobType: Main type for querying data job information
- DataJobRunsResolver: Resolves execution history and run information
- DataFlowDataJobsRelationshipsMapper: Maps relationships between flows and jobs
- UpdateLineageResolver: Handles lineage updates for jobs
Ingestion Sources
Data jobs are commonly ingested from:
- Airflow: Tasks and DAGs with lineage extraction
- dbt: Models as data jobs with SQL-based lineage
- Spark: Job definitions with dataset dependencies
- Databricks: Notebooks and workflows
- Dagster: Ops and assets as processing units
- Prefect: Tasks and flows
- AWS Glue: ETL jobs
- Azure Data Factory: Pipeline activities
- Looker: LookML models and derived tables
These connectors automatically extract job definitions, lineage, and metadata from the source systems.
Notable Exceptions
DataHub Ingestion Jobs
DataHub's own ingestion pipelines are represented as data jobs with special aspects:
- datahubIngestionRunSummary: Tracks ingestion run statistics, entities processed, warnings, and errors
- datahubIngestionCheckpoint: Maintains state for incremental ingestion
These aspects are specific to DataHub's internal ingestion framework and are not used for general-purpose data jobs.
Job Status Deprecation
The status field in dataJobInfo is deprecated in favor of the dataProcessInstance model. Instead of storing job status on the job definition itself, create separate process instance entities for each execution with their own status information. This provides a cleaner separation between job definitions and runtime execution history.
Subtype Usage
The subTypes aspect allows you to classify jobs into categories:
- SQL jobs
- Python jobs
- Notebook jobs
- Container jobs
- Custom job types
This helps with filtering and organizing jobs in the UI and API queries.
Technical Reference
For technical details about fields, searchability, and relationships, view the Columns tab in DataHub.