Skip to main content
Version: Next

DataFlow & DataJob

Why Would You Use DataFlow and DataJob?

The DataFlow and DataJob entities are used to represent data processing pipelines and jobs within a data ecosystem. They allow users to define, manage, and monitor the flow of data through various stages of processing, from ingestion to transformation and storage.

Goal Of This Guide

This guide will show you how to

  • Create a DataFlow.
  • Create a Datajob with a DataFlow.

Prerequisites

For this tutorial, you need to deploy DataHub Quickstart and ingest sample data. For detailed steps, please refer to Datahub Quickstart Guide.

Create DataFlow

# Inlined from /metadata-ingestion/examples/library/create_dataflow.py
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient

client = DataHubClient.from_env()

dataflow = DataFlow(
name="example_dataflow",
platform="airflow",
description="airflow pipeline for production",
tags=[TagUrn(name="production"), TagUrn(name="data_engineering")],
)

client.entities.upsert(dataflow)

Create DataJob

DataJob must be associated with a DataFlow. You can create a DataJob by providing the DataFlow object or the DataFlow URN and its platform instance.

```python # Inlined from /metadata-ingestion/examples/library/create_datajob.py from datahub.metadata.urns import DatasetUrn, TagUrn from datahub.sdk import DataFlow, DataHubClient, DataJob

client = DataHubClient.from_env()

datajob will inherit the platform and platform instance from the flow

dataflow = DataFlow( platform="airflow", name="example_dag", platform_instance="PROD", description="example dataflow", tags=[TagUrn(name="tag1"), TagUrn(name="tag2")], )

datajob = DataJob( name="example_datajob", flow=dataflow, inlets=[ DatasetUrn(platform="hdfs", name="dataset1", env="PROD"), ], outlets=[ DatasetUrn(platform="hdfs", name="dataset2", env="PROD"), ], )

client.entities.upsert(dataflow) client.entities.upsert(datajob)


</TabItem>
<TabItem value="python" label="Create DataJob with DataFlow URN">
```python
# Inlined from /metadata-ingestion/examples/library/create_datajob_with_flow_urn.py
from datahub.metadata.urns import DataFlowUrn, DatasetUrn
from datahub.sdk import DataHubClient, DataJob

client = DataHubClient.from_env()

# datajob will inherit the platform and platform instance from the flow

datajob = DataJob(
name="example_datajob",
flow_urn=DataFlowUrn(
orchestrator="airflow",
flow_id="example_dag",
cluster="PROD",
),
platform_instance="PROD",
inlets=[
DatasetUrn(platform="hdfs", name="dataset1", env="PROD"),
],
outlets=[
DatasetUrn(platform="hdfs", name="dataset2", env="PROD"),
],
)

client.entities.upsert(datajob)

Read DataFlow

# Inlined from /metadata-ingestion/examples/library/read_dataflow.py
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient

client = DataHubClient.from_env()

dataflow = DataFlow(
name="example_dataflow",
platform="airflow",
description="airflow pipeline for production",
tags=[TagUrn(name="production"), TagUrn(name="data_engineering")],
)

client.entities.upsert(dataflow)

dataflow_entity = client.entities.get(dataflow.urn)
print("DataFlow name:", dataflow_entity.name)
print("DataFlow platform:", dataflow_entity.platform)
print("DataFlow description:", dataflow_entity.description)

Example Output

>> DataFlow name: example_dataflow
>> DataFlow platform: urn:li:dataPlatform:airflow
>> DataFlow description: airflow pipeline for production

Read DataJob

# Inlined from /metadata-ingestion/examples/library/read_datajob.py
from datahub.sdk import DataFlow, DataHubClient, DataJob

client = DataHubClient.from_env()

dataflow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="PROD",
)

# datajob will inherit the platform and platform instance from the flow
datajob = DataJob(
name="example_datajob",
description="example datajob",
flow=dataflow,
)

client.entities.upsert(dataflow)
client.entities.upsert(datajob)

datajob_entity = client.entities.get(datajob.urn)

print("DataJob name:", datajob_entity.name)
print("DataJob Flow URN:", datajob_entity.flow_urn)
print("DataJob description:", datajob_entity.description)

Example Output

>> DataJob name: example_datajob
>> DataJob Flow URN: urn:li:dataFlow:(airflow,PROD.example_dag,PROD)
>> DataJob description: example datajob