Skip to main content
Version: Next

DataJob

Data jobs represent individual units of data processing work within a data pipeline or workflow. They are the tasks, steps, or operations that transform, move, or process data as part of a larger data flow. Examples include Airflow tasks, dbt models, Spark jobs, Databricks notebooks, and similar processing units in orchestration systems.

Identity

Data jobs are identified by two pieces of information:

  • The data flow (pipeline/workflow) that they belong to: this is represented as a URN pointing to the parent dataFlow entity. The data flow defines the orchestrator (e.g., airflow, spark, dbt), the flow ID (e.g., the DAG name or pipeline name), and the cluster where it runs.
  • The unique job identifier within that flow: this is a string that uniquely identifies the task within its parent flow (e.g., task name, step name, model name).

The URN structure for a data job is: urn:li:dataJob:(urn:li:dataFlow:(<orchestrator>,<flow_id>,<cluster>),<job_id>)

Examples

Airflow task:

urn:li:dataJob:(urn:li:dataFlow:(airflow,daily_etl_dag,prod),transform_customer_data)

dbt model:

urn:li:dataJob:(urn:li:dataFlow:(dbt,analytics_project,prod),staging.stg_customers)

Spark job:

urn:li:dataJob:(urn:li:dataFlow:(spark,data_processing_pipeline,PROD),aggregate_sales_task)

Databricks notebook:

urn:li:dataJob:(urn:li:dataFlow:(databricks,etl_workflow,production),process_events_notebook)

Important Capabilities

Job Information (dataJobInfo)

The dataJobInfo aspect captures the core properties of a data job:

  • Name: Human-readable name of the job (searchable with autocomplete)
  • Description: Detailed description of what the job does
  • Type: The type of job (e.g., SQL, Python, Spark, etc.)
  • Flow URN: Reference to the parent data flow
  • Created/Modified timestamps: When the job was created or last modified in the source system
  • Environment: The fabric/environment where the job runs (PROD, DEV, QA, etc.)
  • Custom properties: Additional key-value properties specific to the source system
  • External references: Links to external documentation or definitions (e.g., GitHub links)

Input/Output Lineage (dataJobInputOutput)

The dataJobInputOutput aspect defines the data lineage relationships for the job:

  • Input datasets: Datasets consumed by the job during processing (via inputDatasetEdges)
  • Output datasets: Datasets produced by the job (via outputDatasetEdges)
  • Input data jobs: Other data jobs that this job depends on (via inputDatajobEdges)
  • Input dataset fields: Specific schema fields consumed from input datasets
  • Output dataset fields: Specific schema fields produced in output datasets
  • Fine-grained lineage: Column-level lineage mappings showing which upstream fields contribute to downstream fields

This aspect establishes the critical relationships that enable DataHub to build and visualize data lineage graphs across your entire data ecosystem.

Editable Properties (editableDataJobProperties)

The editableDataJobProperties aspect stores documentation edits made through the DataHub UI:

  • Description: User-edited documentation that complements or overrides the ingested description
  • Change audit stamps: Tracks who made edits and when

This separation ensures that manual edits in the UI are preserved and not overwritten by ingestion pipelines.

Ownership

Like other entities, data jobs support ownership through the ownership aspect. Owners can be users or groups with various ownership types (DATAOWNER, PRODUCER, DEVELOPER, etc.). This helps identify who is responsible for maintaining and troubleshooting the job.

Tags and Glossary Terms

Data jobs can be tagged and associated with glossary terms:

  • Tags (globalTags aspect): Used for categorization, classification, or operational purposes (e.g., PII, critical, deprecated)
  • Glossary terms (glossaryTerms aspect): Link jobs to business terminology and concepts from your glossary

Domains and Applications

Data jobs can be organized into:

  • Domains (domains aspect): Business domains or data domains for organizational structure
  • Applications (applications aspect): Associated with specific applications or systems

Structured Properties and Forms

Data jobs support:

  • Structured properties: Custom typed properties defined by your organization
  • Forms: Structured documentation forms for consistency

Code Examples

Creating a Data Job

The simplest way to create a data job is using the Python SDK v2:

Python SDK: Create a basic data job
# metadata-ingestion/examples/library/datajob_create_basic.py
from datahub.metadata.urns import DataFlowUrn, DatasetUrn
from datahub.sdk import DataHubClient, DataJob

client = DataHubClient.from_env()

datajob = DataJob(
name="transform_customer_data",
flow_urn=DataFlowUrn(
orchestrator="airflow",
flow_id="daily_etl_pipeline",
cluster="prod",
),
description="Transforms raw customer data into analytics-ready format",
inlets=[
DatasetUrn(platform="postgres", name="raw.customers", env="PROD"),
DatasetUrn(platform="postgres", name="raw.addresses", env="PROD"),
],
outlets=[
DatasetUrn(platform="snowflake", name="analytics.dim_customers", env="PROD"),
],
)

client.entities.upsert(datajob)
print(f"Created data job: {datajob.urn}")

Adding Tags, Terms, and Ownership

Common metadata can be added to data jobs to enhance discoverability and governance:

Python SDK: Add tags, terms, and ownership to a data job
# metadata-ingestion/examples/library/datajob_add_tags_terms_ownership.py
from datahub.metadata.urns import (
CorpUserUrn,
DataFlowUrn,
DataJobUrn,
GlossaryTermUrn,
TagUrn,
)
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

datajob_urn = DataJobUrn(
job_id="transform_customer_data",
flow=DataFlowUrn(
orchestrator="airflow", flow_id="daily_etl_pipeline", cluster="prod"
),
)

datajob = client.entities.get(datajob_urn)

datajob.add_tag(TagUrn("Critical"))
datajob.add_tag(TagUrn("ETL"))

datajob.add_term(GlossaryTermUrn("CustomerData"))
datajob.add_term(GlossaryTermUrn("DataTransformation"))

datajob.add_owner(CorpUserUrn("data_engineering_team"))
datajob.add_owner(CorpUserUrn("john.doe"))

client.entities.update(datajob)

print(f"Added tags, terms, and ownership to {datajob_urn}")

Updating Job Properties

You can update job properties like descriptions using the low-level APIs:

Python SDK: Update data job description
# metadata-ingestion/examples/library/datajob_update_description.py
from datahub.sdk import DataFlowUrn, DataHubClient, DataJobUrn

client = DataHubClient.from_env()

dataflow_urn = DataFlowUrn(
orchestrator="airflow", flow_id="daily_etl_pipeline", cluster="prod"
)
datajob_urn = DataJobUrn(flow=dataflow_urn, job_id="transform_customer_data")

datajob = client.entities.get(datajob_urn)
datajob.set_description(
"This job performs critical customer data transformation. "
"It joins raw customer records with address information and applies "
"data quality rules before loading into the analytics warehouse."
)

client.entities.update(datajob)

print(f"Updated description for {datajob_urn}")

Querying Data Job Information

Retrieve data job information via the REST API:

REST API: Query a data job
# metadata-ingestion/examples/library/datajob_query_rest.py
import json
from urllib.parse import quote

import requests

datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,daily_etl_pipeline,prod),transform_customer_data)"

gms_server = "http://localhost:8080"
url = f"{gms_server}/entities/{quote(datajob_urn, safe='')}"

response = requests.get(url)

if response.status_code == 200:
data = response.json()
print(json.dumps(data, indent=2))

if "aspects" in data:
aspects = data["aspects"]

if "dataJobInfo" in aspects:
job_info = aspects["dataJobInfo"]["value"]
print(f"\nJob Name: {job_info.get('name')}")
print(f"Description: {job_info.get('description')}")
print(f"Type: {job_info.get('type')}")

if "dataJobInputOutput" in aspects:
lineage = aspects["dataJobInputOutput"]["value"]
print(f"\nInput Datasets: {len(lineage.get('inputDatasetEdges', []))}")
print(f"Output Datasets: {len(lineage.get('outputDatasetEdges', []))}")

if "ownership" in aspects:
ownership = aspects["ownership"]["value"]
print(f"\nOwners: {len(ownership.get('owners', []))}")
for owner in ownership.get("owners", []):
print(f" - {owner.get('owner')} ({owner.get('type')})")

if "globalTags" in aspects:
tags = aspects["globalTags"]["value"]
print("\nTags:")
for tag in tags.get("tags", []):
print(f" - {tag.get('tag')}")
else:
print(f"Failed to retrieve data job: {response.status_code}")
print(response.text)

Adding Lineage to Data Jobs

Data jobs are often used to define lineage relationships. See the existing lineage examples:

Python SDK: Add lineage using DataJobPatchBuilder
from datahub.emitter.mce_builder import (
make_data_job_urn,
make_dataset_urn,
make_schema_field_urn,
)
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.metadata.schema_classes import (
FineGrainedLineageClass as FineGrainedLineage,
FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType,
)
from datahub.specific.datajob import DataJobPatchBuilder

# Create DataHub Client
datahub_client = DataHubGraph(DataHubGraphConfig(server="http://localhost:8080"))

# Create DataJob URN
datajob_urn = make_data_job_urn(
orchestrator="airflow", flow_id="dag_abc", job_id="task_456"
)

# Create DataJob Patch to Add Lineage
patch_builder = DataJobPatchBuilder(datajob_urn)
patch_builder.add_input_dataset(
make_dataset_urn(platform="kafka", name="SampleKafkaDataset", env="PROD")
)
patch_builder.add_output_dataset(
make_dataset_urn(platform="hive", name="SampleHiveDataset", env="PROD")
)
patch_builder.add_input_datajob(
make_data_job_urn(orchestrator="airflow", flow_id="dag_abc", job_id="task_123")
)
patch_builder.add_input_dataset_field(
make_schema_field_urn(
parent_urn=make_dataset_urn(
platform="hive", name="fct_users_deleted", env="PROD"
),
field_path="user_id",
)
)
patch_builder.add_output_dataset_field(
make_schema_field_urn(
parent_urn=make_dataset_urn(
platform="hive", name="fct_users_created", env="PROD"
),
field_path="user_id",
)
)

# Update column-level lineage through the Data Job
lineage1 = FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
make_schema_field_urn(make_dataset_urn("postgres", "raw_data.users"), "user_id")
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(
make_dataset_urn("postgres", "analytics.user_metrics"),
"user_id",
)
],
transformOperation="IDENTITY",
confidenceScore=1.0,
)
patch_builder.add_fine_grained_lineage(lineage1)
patch_builder.remove_fine_grained_lineage(lineage1)
# Replaces all existing fine-grained lineages
patch_builder.set_fine_grained_lineages([lineage1])

patch_mcps = patch_builder.build()

# Emit DataJob Patch
for patch_mcp in patch_mcps:
datahub_client.emit(patch_mcp)

Python SDK: Define fine-grained lineage through a data job
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)
from datahub.metadata.schema_classes import DataJobInputOutputClass


def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl)


def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)


# Lineage of fields output by a job
# bar.c1 <-- unknownFunc(bar2.c1, bar4.c1)
# bar.c2 <-- myfunc(bar3.c2)
# {bar.c3,bar.c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# bar.c5 <-- unknownFunc(bar3)
# {bar.c6,bar.c7} <-- unknownFunc(bar4)
# bar2.c9 has no upstream i.e. its values are somehow created independently within this job.

# Note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.

fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3", "c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore=0.8,
transformOperation="myfunc",
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore=0.7,
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")],
),
]

# The lineage of output col bar.c9 is unknown. So there is no lineage for it above.
# Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs.

dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
inputDatajobs=None,
inputDatasetFields=[
fldUrn("bar2", "c1"),
fldUrn("bar2", "c2"),
fldUrn("bar2", "c3"),
fldUrn("bar3", "c1"),
fldUrn("bar3", "c2"),
fldUrn("bar4", "c1"),
],
outputDatasetFields=[
fldUrn("bar", "c1"),
fldUrn("bar", "c2"),
fldUrn("bar", "c3"),
fldUrn("bar", "c4"),
fldUrn("bar", "c5"),
fldUrn("bar", "c6"),
fldUrn("bar", "c7"),
fldUrn("bar", "c9"),
fldUrn("bar2", "c9"),
],
fineGrainedLineages=fineGrainedLineages,
)

dataJobLineageMcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"),
aspect=dataJobInputOutput,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(dataJobLineageMcp)

Integration Points

Relationship with DataFlow

Every data job belongs to exactly one dataFlow entity, which represents the parent pipeline or workflow. The data flow captures:

  • The orchestrator/platform (Airflow, Spark, dbt, etc.)
  • The flow/pipeline/DAG identifier
  • The cluster or environment where it executes

This hierarchical relationship allows DataHub to organize jobs within their workflows and understand the execution context.

Relationship with Datasets

Data jobs establish lineage by defining:

  • Consumes relationships with input datasets
  • Produces relationships with output datasets

These relationships are the foundation of DataHub's lineage graph. When a job processes data, it creates a connection between upstream sources and downstream outputs, enabling impact analysis and data discovery.

Relationship with DataProcessInstance

While dataJob represents the definition of a processing task, dataProcessInstance represents a specific execution or run of that job. Process instances capture:

  • Runtime information (start time, end time, duration)
  • Status (success, failure, running)
  • Input/output datasets for that specific run
  • Error messages and logs

This separation allows you to track both the static definition of a job and its dynamic runtime behavior.

GraphQL Resolvers

The DataHub GraphQL API provides rich query capabilities for data jobs:

  • DataJobType: Main type for querying data job information
  • DataJobRunsResolver: Resolves execution history and run information
  • DataFlowDataJobsRelationshipsMapper: Maps relationships between flows and jobs
  • UpdateLineageResolver: Handles lineage updates for jobs

Ingestion Sources

Data jobs are commonly ingested from:

  • Airflow: Tasks and DAGs with lineage extraction
  • dbt: Models as data jobs with SQL-based lineage
  • Spark: Job definitions with dataset dependencies
  • Databricks: Notebooks and workflows
  • Dagster: Ops and assets as processing units
  • Prefect: Tasks and flows
  • AWS Glue: ETL jobs
  • Azure Data Factory: Pipeline activities
  • Looker: LookML models and derived tables

These connectors automatically extract job definitions, lineage, and metadata from the source systems.

Notable Exceptions

DataHub Ingestion Jobs

DataHub's own ingestion pipelines are represented as data jobs with special aspects:

  • datahubIngestionRunSummary: Tracks ingestion run statistics, entities processed, warnings, and errors
  • datahubIngestionCheckpoint: Maintains state for incremental ingestion

These aspects are specific to DataHub's internal ingestion framework and are not used for general-purpose data jobs.

Job Status Deprecation

The status field in dataJobInfo is deprecated in favor of the dataProcessInstance model. Instead of storing job status on the job definition itself, create separate process instance entities for each execution with their own status information. This provides a cleaner separation between job definitions and runtime execution history.

Subtype Usage

The subTypes aspect allows you to classify jobs into categories:

  • SQL jobs
  • Python jobs
  • Notebook jobs
  • Container jobs
  • Custom job types

This helps with filtering and organizing jobs in the UI and API queries.

Technical Reference

For technical details about fields, searchability, and relationships, view the Columns tab in DataHub.