Skip to main content
Version: Next

DataProcess

DEPRECATED: This entity is deprecated and should not be used for new implementations.

Use dataFlow and dataJob instead.

The dataProcess entity was an early attempt to model data processing tasks but has been superseded by the more robust and flexible dataFlow and dataJob entities which better represent the hierarchical nature of modern data pipelines.

Deprecation Notice

The dataProcess entity was deprecated to provide a clearer separation between:

  • DataFlow: Represents the overall pipeline/workflow (e.g., an Airflow DAG, dbt project, Spark application)
  • DataJob: Represents individual tasks within a pipeline (e.g., an Airflow task, dbt model, Spark job)

This two-level hierarchy better matches how modern orchestration systems organize data processing work and provides more flexibility for lineage tracking, ownership assignment, and operational monitoring.

Why was it deprecated?

The original dataProcess entity had several limitations:

  1. No hierarchical structure: It couldn't represent the relationship between a pipeline and its constituent tasks
  2. Limited orchestrator support: The flat structure didn't map well to DAG-based orchestration platforms like Airflow, Prefect, or Dagster
  3. Unclear semantics: It was ambiguous whether a dataProcess represented a whole pipeline or a single task
  4. Poor lineage modeling: Without task-level granularity, lineage relationships were less precise

The new dataFlow and dataJob model addresses these limitations by providing a clear parent-child relationship that mirrors real-world data processing architectures.

Identity (Historical Reference)

DataProcess entities were identified by three components:

  • Name: The process name (typically an ETL job name)
  • Orchestrator: The workflow management platform (e.g., airflow, azkaban)
  • Origin (Fabric): The environment where the process runs (PROD, DEV, etc.)

The URN structure was:

urn:li:dataProcess:(<name>,<orchestrator>,<origin>)

Example URNs

urn:li:dataProcess:(customer_etl_job,airflow,PROD)
urn:li:dataProcess:(sales_aggregation,azkaban,DEV)

Important Capabilities (Historical Reference)

DataProcessInfo Aspect

The dataProcessInfo aspect captured inputs and outputs of the process:

  • Inputs: Array of dataset URNs consumed by the process
  • Outputs: Array of dataset URNs produced by the process

This established basic lineage relationships through "Consumes" relationships with datasets.

Common Aspects

Like other entities, dataProcess supported:

  • Ownership: Assigning owners to processes
  • Status: Marking processes as removed
  • Global Tags: Categorization and classification
  • Institutional Memory: Links to documentation

Migration Guide

When to use DataFlow vs DataJob

Use DataFlow when representing:

  • Airflow DAGs
  • dbt projects
  • Prefect flows
  • Dagster pipelines
  • Azkaban workflows
  • Any container of related data processing tasks

Use DataJob when representing:

  • Airflow tasks within a DAG
  • dbt models within a project
  • Prefect tasks within a flow
  • Dagster ops/assets within a pipeline
  • Individual processing steps

Use both together:

  • Create a DataFlow for the pipeline
  • Create DataJobs for each task within that pipeline
  • Link DataJobs to their parent DataFlow

Conceptual Mapping

DataProcess ConceptNew Model EquivalentNotes
Process with tasksDataFlow + DataJobsSplit into two entities
Process nameDataFlow flowIdBecomes the parent identifier
Single-step processDataFlow + 1 DataJobStill requires both entities
OrchestratorDataFlow orchestratorSame concept, better modeling
Origin/FabricDataFlow clusterOften matches environment
Inputs/OutputsDataJob dataJobInputOutputMoved to job level for precision

Migration Steps

To migrate from dataProcess to dataFlow/dataJob:

  1. Identify your process structure: Determine if your dataProcess represents a pipeline (has multiple steps) or a single task

  2. Create a DataFlow: This represents the overall pipeline/workflow

    • Use the same orchestrator value
    • Use the process name as the flow ID
    • Use a cluster identifier (often matches the origin/fabric)
  3. Create DataJob(s): Create one or more jobs within the flow

    • For single-step processes: create one job named after the process
    • For multi-step processes: create a job for each step
    • Link each job to its parent DataFlow
  4. Migrate lineage: Move input/output dataset relationships from the process level to the job level

  5. Migrate metadata: Transfer ownership, tags, and documentation to the appropriate entity (typically the DataFlow for pipeline-level metadata, or specific DataJobs for task-level metadata)

Migration Examples

Example 1: Simple single-task process

Old dataProcess:

urn:li:dataProcess:(daily_report,airflow,PROD)

New structure:

DataFlow: urn:li:dataFlow:(airflow,daily_report,prod)
DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,daily_report,prod),daily_report_task)

Example 2: Multi-step ETL pipeline

Old dataProcess:

urn:li:dataProcess:(customer_pipeline,airflow,PROD)

New structure:

DataFlow: urn:li:dataFlow:(airflow,customer_pipeline,prod)
DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),extract_customers)
DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),transform_customers)
DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),load_customers)

Code Examples

Querying Existing DataProcess Entities

If you need to query existing dataProcess entities for migration purposes:

Python SDK: Query a dataProcess entity
"""
Example: Query an existing (deprecated) dataProcess entity for migration purposes.

This example shows how to read a deprecated dataProcess entity from DataHub
to understand its structure before migrating it to dataFlow and dataJob entities.

Note: This is only for reading existing data. Do NOT create new dataProcess entities.
Use dataFlow and dataJob instead for all new implementations.
"""

from datahub.emitter.rest_emitter import DatahubRestEmitter

# Create emitter to read from DataHub
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

# URN of the deprecated dataProcess entity to query
dataprocess_urn = "urn:li:dataProcess:(customer_etl_job,airflow,PROD)"

# Fetch the entity using the REST API
try:
entity = emitter._session.get(
f"{emitter._gms_server}/entities/{dataprocess_urn}"
).json()

print(f"Found dataProcess: {dataprocess_urn}")
print("\n=== Entity Aspects ===")

# Extract key information for migration
if "aspects" in entity:
aspects = entity["aspects"]

# Key aspect (identity)
if "dataProcessKey" in aspects:
key = aspects["dataProcessKey"]
print("\nIdentity:")
print(f" Name: {key.get('name')}")
print(f" Orchestrator: {key.get('orchestrator')}")
print(f" Origin (Fabric): {key.get('origin')}")

# Core process information
if "dataProcessInfo" in aspects:
info = aspects["dataProcessInfo"]
print("\nProcess Info:")
if "inputs" in info:
print(f" Input Datasets: {len(info['inputs'])}")
for inp in info["inputs"]:
print(f" - {inp}")
if "outputs" in info:
print(f" Output Datasets: {len(info['outputs'])}")
for out in info["outputs"]:
print(f" - {out}")

# Ownership information
if "ownership" in aspects:
ownership = aspects["ownership"]
print("\nOwnership:")
for owner in ownership.get("owners", []):
print(f" - {owner['owner']} (type: {owner.get('type', 'UNKNOWN')})")

# Tags
if "globalTags" in aspects:
tags = aspects["globalTags"]
print("\nTags:")
for tag in tags.get("tags", []):
print(f" - {tag['tag']}")

# Status
if "status" in aspects:
status = aspects["status"]
print(f"\nStatus: {status.get('removed', False)}")

print("\n=== Migration Recommendation ===")
print("Replace this dataProcess with:")
print(
f" DataFlow URN: urn:li:dataFlow:({key.get('orchestrator')},{key.get('name')},{key.get('origin', 'PROD').lower()})"
)
print(
f" DataJob URN: urn:li:dataJob:(urn:li:dataFlow:({key.get('orchestrator')},{key.get('name')},{key.get('origin', 'PROD').lower()}),main_task)"
)
print("\nSee dataprocess_migrate_to_flow_job.py for migration code examples.")

except Exception as e:
print(f"Error querying dataProcess: {e}")
print("\nThis is expected if the entity doesn't exist.")
print("DataProcess is deprecated - use dataFlow and dataJob instead.")

Instead of using dataProcess, create the modern equivalent:

Python SDK: Create DataFlow and DataJob to replace dataProcess
"""
Example: Migrate from deprecated dataProcess to modern dataFlow and dataJob entities.

This example shows how to create the modern dataFlow and dataJob entities
that replace the deprecated dataProcess entity.

The dataProcess entity used a flat structure:
dataProcess -> inputs/outputs

The new model uses a hierarchical structure:
dataFlow (pipeline) -> dataJob (task) -> inputs/outputs

This provides better organization and more precise lineage tracking.
"""

from datahub.metadata.urns import DatasetUrn
from datahub.sdk import DataFlow, DataHubClient, DataJob

client = DataHubClient.from_env()

# Old dataProcess would have been:
# urn:li:dataProcess:(customer_etl_job,airflow,PROD)
# with inputs and outputs at the process level

# New approach: Create a DataFlow for the pipeline
dataflow = DataFlow(
platform="airflow", # Same as the old 'orchestrator' field
name="customer_etl_job", # Same as the old 'name' field
platform_instance="prod", # Based on old 'origin' field
description="ETL pipeline for customer data processing",
)

# Create DataJob(s) within the flow
# For a simple single-task process, create one job
# For complex multi-step processes, create multiple jobs
datajob = DataJob(
name="customer_etl_task", # Task name within the flow
flow=dataflow, # Link to parent flow
description="Main ETL task for customer data",
# Inputs and outputs now live at the job level for precise lineage
inlets=[
DatasetUrn(platform="mysql", name="raw_db.customers", env="PROD"),
DatasetUrn(platform="mysql", name="raw_db.orders", env="PROD"),
],
outlets=[
DatasetUrn(platform="postgres", name="analytics.customer_summary", env="PROD"),
],
)

# Upsert both entities
client.entities.upsert(dataflow)
client.entities.upsert(datajob)

print("Successfully migrated from dataProcess to dataFlow + dataJob!")
print(f"DataFlow URN: {dataflow.urn}")
print(f"DataJob URN: {datajob.urn}")
print("\nKey improvements over dataProcess:")
print("- Clear separation between pipeline (DataFlow) and task (DataJob)")
print("- Support for multi-step pipelines with multiple DataJobs")
print("- More precise lineage at the task level")
print("- Better integration with modern orchestration platforms")

Complete Migration Example

Python SDK: Full migration from dataProcess to dataFlow/dataJob
"""
Example: Complete migration from dataProcess to dataFlow/dataJob with metadata preservation.

This example demonstrates a full migration path that:
1. Reads an existing deprecated dataProcess entity
2. Extracts all its metadata (inputs, outputs, ownership, tags)
3. Creates equivalent dataFlow and dataJob entities
4. Preserves all metadata relationships

Use this as a template for migrating multiple dataProcess entities in bulk.
"""

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
GlobalTagsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.sdk import DataFlow, DataHubClient, DataJob

# Initialize clients
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
client = DataHubClient.from_env()

# Step 1: Define the dataProcess to migrate
old_dataprocess_urn = "urn:li:dataProcess:(sales_pipeline,airflow,PROD)"

print(f"Migrating: {old_dataprocess_urn}")

try:
# Step 2: Fetch the existing dataProcess entity
entity = rest_emitter._session.get(
f"{rest_emitter._gms_server}/entities/{old_dataprocess_urn}"
).json()

aspects = entity.get("aspects", {})

# Extract identity information
key = aspects.get("dataProcessKey", {})
name = key.get("name", "unknown_process")
orchestrator = key.get("orchestrator", "unknown")
origin = key.get("origin", "PROD")

# Extract process info
process_info = aspects.get("dataProcessInfo", {})
input_datasets = process_info.get("inputs", [])
output_datasets = process_info.get("outputs", [])

# Extract ownership
ownership_aspect = aspects.get("ownership", {})
owners = ownership_aspect.get("owners", [])

# Extract tags
tags_aspect = aspects.get("globalTags", {})
tags = tags_aspect.get("tags", [])

print("\n=== Extracted Metadata ===")
print(f"Name: {name}")
print(f"Orchestrator: {orchestrator}")
print(f"Environment: {origin}")
print(f"Inputs: {len(input_datasets)} datasets")
print(f"Outputs: {len(output_datasets)} datasets")
print(f"Owners: {len(owners)}")
print(f"Tags: {len(tags)}")

# Step 3: Create the new DataFlow
dataflow = DataFlow(
platform=orchestrator,
name=name,
platform_instance=origin.lower(),
description=f"Migrated from dataProcess {name}",
)

# Step 4: Create the DataJob(s)
# For simplicity, creating one job. In practice, you might split into multiple jobs.
datajob = DataJob(
name=f"{name}_main",
flow=dataflow,
description=f"Main task for {name}",
inlets=[inp for inp in input_datasets], # These should be dataset URNs
outlets=[out for out in output_datasets], # These should be dataset URNs
)

# Step 5: Upsert the entities
client.entities.upsert(dataflow)
client.entities.upsert(datajob)

print("\n=== Created New Entities ===")
print(f"DataFlow: {dataflow.urn}")
print(f"DataJob: {datajob.urn}")

# Step 6: Migrate ownership to DataFlow
if owners:
ownership_to_add = OwnershipClass(
owners=[
OwnerClass(
owner=owner.get("owner"),
type=getattr(OwnershipTypeClass, owner.get("type", "DATAOWNER")),
)
for owner in owners
]
)
rest_emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=str(dataflow.urn),
aspect=ownership_to_add,
)
)
print(f"Migrated {len(owners)} owner(s) to DataFlow")

# Step 7: Migrate tags to DataFlow
if tags:
tags_to_add = GlobalTagsClass(
tags=[TagAssociationClass(tag=tag.get("tag")) for tag in tags]
)
rest_emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=str(dataflow.urn),
aspect=tags_to_add,
)
)
print(f"Migrated {len(tags)} tag(s) to DataFlow")

print("\n=== Migration Complete ===")
print("Next steps:")
print("1. Verify the new entities in DataHub UI")
print("2. Update any downstream systems to reference the new URNs")
print("3. Consider soft-deleting the old dataProcess entity")

except Exception as e:
print(f"Error during migration: {e}")
print("\nCommon issues:")
print("- DataProcess entity doesn't exist (already migrated or never created)")
print("- Network connectivity to DataHub GMS")
print("- Permission issues writing to DataHub")

Integration Points

Historical Usage

The dataProcess entity was previously used by:

  1. Early ingestion connectors: Original Airflow, Azkaban connectors before they migrated to dataFlow/dataJob
  2. Custom integrations: User-built integrations that haven't been updated
  3. Legacy metadata: Historical data in existing DataHub instances

Modern Replacements

All modern DataHub connectors use dataFlow and dataJob:

  • Airflow: DAGs → DataFlow, Tasks → DataJob
  • dbt: Projects → DataFlow, Models → DataJob
  • Prefect: Flows → DataFlow, Tasks → DataJob
  • Dagster: Pipelines → DataFlow, Ops/Assets → DataJob
  • Fivetran: Connectors → DataFlow, Sync operations → DataJob
  • AWS Glue: Jobs → DataFlow, Steps → DataJob
  • Azure Data Factory: Pipelines → DataFlow, Activities → DataJob

DataProcessInstance

Note that dataProcessInstance is NOT deprecated. It represents a specific execution/run of either:

  • A dataJob (recommended)
  • A legacy dataProcess (for backward compatibility)

DataProcessInstance continues to be used for tracking pipeline run history, status, and runtime information.

Notable Exceptions

Timeline for Removal

  • Deprecated: Early 2021 (with introduction of dataFlow/dataJob)
  • Status: Still exists in the entity registry for backward compatibility
  • Current State: No active ingestion sources create dataProcess entities
  • Removal: No specific timeline, maintained for existing data

Reading Existing Data

The dataProcess entity remains readable through all DataHub APIs for backward compatibility. Existing dataProcess entities in your instance will continue to function and display in the UI.

While technically possible to create new dataProcess entities, it is strongly discouraged. All new integrations should use dataFlow and dataJob.

Upgrade Path

There is no automatic migration tool. Organizations with significant dataProcess data should:

  1. Use the Python SDK to query existing dataProcess entities
  2. Create equivalent dataFlow and dataJob entities
  3. Preserve URN mappings for lineage continuity
  4. Consider soft-deleting old dataProcess entities once migration is verified

GraphQL API

The dataProcess entity is minimally exposed in the GraphQL API. Modern GraphQL queries and mutations focus on dataFlow and dataJob entities.

Additional Resources

Technical Reference

For technical details about fields, searchability, and relationships, view the Columns tab in DataHub.