Dataset
The dataset entity is one the most important entities in the metadata model. They represent collections of data that are typically represented as Tables or Views in a database (e.g. BigQuery, Snowflake, Redshift etc.), Streams in a stream-processing environment (Kafka, Pulsar etc.), bundles of data found as Files or Folders in data lake systems (S3, ADLS, etc.).
Identity
Datasets are identified by three pieces of information:
- The platform that they belong to: this is the specific data technology that hosts this dataset. Examples are
hive,bigquery,redshiftetc. See dataplatform for more details. - The name of the dataset in the specific platform. Each platform will have a unique way of naming assets within its system. Usually, names are composed by combining the structural elements of the name and separating them by
.. e.g. relational datasets are usually named as<db>.<schema>.<table>, except for platforms like MySQL which do not have the concept of aschema; as a result MySQL datasets are named<db>.<table>. In cases where the specific platform can have multiple instances (e.g. there are multiple different instances of MySQL databases that have different data assets in them), names can also include instance ids, making the general pattern for a name<platform_instance>.<db>.<schema>.<table>. - The environment or fabric in which the dataset belongs: this is an additional qualifier available on the identifier, to allow disambiguating datasets that live in Production environments from datasets that live in Non-production environments, such as Staging, QA, etc. The full list of supported environments / fabrics is available in FabricType.pdl.
An example of a dataset identifier is urn:li:dataset:(urn:li:dataPlatform:redshift,userdb.public.customer_table,PROD).
Important Capabilities
Schemas
Datasets support flat and nested schemas. Metadata about schemas are contained in the schemaMetadata aspect. Schemas are represented as an array of fields, each identified by a specific field path.
Field Paths explained
Fields that are either top-level or expressible unambiguously using a . based notation can be identified via a v1 path name, whereas fields that are part of a union need further disambiguation using [type=X] markers.
Taking a simple nested schema as described below:
{
"type": "record",
"name": "Customer",
"fields":[
{
"type": "record",
"name": "address",
"fields": [
{ "name": "zipcode", "type": string},
{"name": "street", "type": string}]
}],
}
- v1 field path:
address.zipcode - v2 field path:
[version=2.0].[type=struct].address.[type=string].zipcode". More examples and a formal specification of a v2 fieldPath can be found here.
Understanding field paths is important, because they are the identifiers through which tags, terms, documentation on fields are expressed. Besides the type and name of the field, schemas also contain descriptions attached to the individual fields, as well as information about primary and foreign keys.
The following code snippet shows you how to add a Schema containing 3 fields to a dataset.
Python SDK: Add a schema to a dataset
from datahub.sdk import DataHubClient, Dataset
client = DataHubClient.from_env()
dataset = Dataset(
platform="hive",
name="realestate_db.sales",
schema=[
# tuples of (field name / field path, data type, description)
(
"address.zipcode",
"varchar(50)",
"This is the zipcode of the address. Specified using extended form and limited to addresses in the United States",
),
("address.street", "varchar(100)", "Street corresponding to the address"),
("last_sold_date", "date", "Date of the last sale date for this property"),
],
)
client.entities.upsert(dataset)
Tags and Glossary Terms
Datasets can have Tags or Terms attached to them. Read this blog to understand the difference between tags and terms so you understand when you should use which.
Adding Tags or Glossary Terms at the top-level to a dataset
At the top-level, tags are added to datasets using the globalTags aspect, while terms are added using the glossaryTerms aspect.
Here is an example for how to add a tag to a dataset. Note that this involves reading the currently set tags on the dataset and then adding a new one if needed.
Python SDK: Add a tag to a dataset at the top-level
from datahub.sdk import DataHubClient, DatasetUrn, TagUrn
client = DataHubClient.from_env()
dataset = client.entities.get(DatasetUrn(platform="hive", name="realestate_db.sales"))
dataset.add_tag(TagUrn("purchase"))
client.entities.update(dataset)
Here is an example of adding a term to a dataset. Note that this involves reading the currently set terms on the dataset and then adding a new one if needed.
Python SDK: Add a term to a dataset at the top-level
from typing import List, Optional, Union
from datahub.sdk import DataHubClient, DatasetUrn, GlossaryTermUrn
def add_terms_to_dataset(
client: DataHubClient,
dataset_urn: DatasetUrn,
term_urns: List[Union[GlossaryTermUrn, str]],
) -> None:
"""
Add glossary terms to a dataset.
Args:
client: DataHub client to use
dataset_urn: URN of the dataset to update
term_urns: List of term URNs or term names to add
"""
dataset = client.entities.get(dataset_urn)
for term in term_urns:
if isinstance(term, str):
resolved_term_urn = client.resolve.term(name=term)
dataset.add_term(resolved_term_urn)
else:
dataset.add_term(term)
client.entities.update(dataset)
def main(client: Optional[DataHubClient] = None) -> None:
"""
Main function to add terms to dataset example.
Args:
client: Optional DataHub client (for testing). If not provided, creates one from env.
"""
client = client or DataHubClient.from_env()
dataset_urn = DatasetUrn(platform="hive", name="realestate_db.sales", env="PROD")
# Add terms using both URN and name resolution
add_terms_to_dataset(
client=client,
dataset_urn=dataset_urn,
term_urns=[
GlossaryTermUrn("Classification.HighlyConfidential"),
"PII", # Will be resolved by name
],
)
if __name__ == "__main__":
main()
Adding Tags or Glossary Terms to columns / fields of a dataset
Tags and Terms can also be attached to an individual column (field) of a dataset. These attachments are done via the schemaMetadata aspect by ingestion connectors / transformers and via the editableSchemaMetadata aspect by the UI.
This separation allows the writes from the replication of metadata from the source system to be isolated from the edits made in the UI.
Here is an example of how you can add a tag to a field in a dataset using the low-level Python SDK.
Python SDK: Add a tag to a column (field) of a dataset
from datahub.sdk import DataHubClient, DatasetUrn, TagUrn
client = DataHubClient.from_env()
dataset = client.entities.get(
DatasetUrn(platform="hive", name="fct_users_created", env="PROD")
)
dataset["user_name"].add_tag(TagUrn("deprecated"))
client.entities.update(dataset)
Similarly, here is an example of how you would add a term to a field in a dataset using the low-level Python SDK.
Python SDK: Add a term to a column (field) of a dataset
from datahub.sdk import DataHubClient, DatasetUrn, GlossaryTermUrn
client = DataHubClient.from_env()
dataset = client.entities.get(
DatasetUrn(platform="hive", name="realestate_db.sales", env="PROD")
)
dataset["address.zipcode"].add_term(GlossaryTermUrn("Classification.Location"))
client.entities.update(dataset)
Ownership
Ownership is associated to a dataset using the ownership aspect. Owners can be of a few different types, DATAOWNER, PRODUCER, DEVELOPER, CONSUMER, etc. See OwnershipType.pdl for the full list of ownership types and their meanings. Ownership can be inherited from source systems, or additionally added in DataHub using the UI. Ingestion connectors for sources will automatically set owners when the source system supports it.
Adding Owners
The following script shows you how to add an owner to a dataset using the low-level Python SDK.
Python SDK: Add an owner to a dataset
from datahub.sdk import CorpUserUrn, DataHubClient, DatasetUrn
client = DataHubClient.from_env()
dataset = client.entities.get(DatasetUrn(platform="hive", name="realestate_db.sales"))
# Add owner with the TECHNICAL_OWNER type
dataset.add_owner(CorpUserUrn("jdoe"))
client.entities.update(dataset)
Fine-grained lineage
Fine-grained lineage at field level can be associated to a dataset in two ways - either directly attached to the upstreamLineage aspect of a dataset, or captured as part of the dataJobInputOutput aspect of a dataJob.
Python SDK: Add fine-grained lineage to a dataset
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
upstream_urn = DatasetUrn(platform="snowflake", name="upstream_table")
downstream_urn = DatasetUrn(platform="snowflake", name="downstream_table")
transformation_text = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HighValueFilter").getOrCreate()
df = spark.read.table("customers")
high_value = df.filter("lifetime_value > 10000")
high_value.write.saveAsTable("high_value_customers")
"""
client.lineage.add_lineage(
upstream=upstream_urn,
downstream=downstream_urn,
transformation_text=transformation_text,
column_lineage={"id": ["id", "customer_id"]},
)
# by passing the transformation_text, the query node will be created with the table level lineage.
# transformation_text can be any transformation logic e.g. a spark job, an airflow DAG, python script, etc.
# if you have a SQL query, we recommend using add_dataset_lineage_from_sql instead.
# note that transformation_text itself will not create a column level lineage.
Python SDK: Add fine-grained lineage to a datajob
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)
from datahub.metadata.schema_classes import DataJobInputOutputClass
def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl)
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)
# Lineage of fields output by a job
# bar.c1 <-- unknownFunc(bar2.c1, bar4.c1)
# bar.c2 <-- myfunc(bar3.c2)
# {bar.c3,bar.c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# bar.c5 <-- unknownFunc(bar3)
# {bar.c6,bar.c7} <-- unknownFunc(bar4)
# bar2.c9 has no upstream i.e. its values are somehow created independently within this job.
# Note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.
fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3", "c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore=0.8,
transformOperation="myfunc",
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore=0.7,
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")],
),
]
# The lineage of output col bar.c9 is unknown. So there is no lineage for it above.
# Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs.
dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
inputDatajobs=None,
inputDatasetFields=[
fldUrn("bar2", "c1"),
fldUrn("bar2", "c2"),
fldUrn("bar2", "c3"),
fldUrn("bar3", "c1"),
fldUrn("bar3", "c2"),
fldUrn("bar4", "c1"),
],
outputDatasetFields=[
fldUrn("bar", "c1"),
fldUrn("bar", "c2"),
fldUrn("bar", "c3"),
fldUrn("bar", "c4"),
fldUrn("bar", "c5"),
fldUrn("bar", "c6"),
fldUrn("bar", "c7"),
fldUrn("bar", "c9"),
fldUrn("bar2", "c9"),
],
fineGrainedLineages=fineGrainedLineages,
)
dataJobLineageMcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"),
aspect=dataJobInputOutput,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit metadata!
emitter.emit_mcp(dataJobLineageMcp)
Querying lineage information
The standard GET APIs to retrieve entities can be used to fetch the dataset/datajob created by the above example. The response will include the fine-grained lineage information as well.
Fetch entity snapshot, including fine-grained lineages
curl 'http://localhost:8080/entities/urn%3Ali%3Adataset%3A(urn%3Ali%3AdataPlatform%3Apostgres,bar,PROD)'
curl 'http://localhost:8080/entities/urn%3Ali%3AdataJob%3A(urn%3Ali%3AdataFlow%3A(spark,Flow1,prod),Task1)'
The below queries can be used to find the upstream/downstream datasets/fields of a dataset/datajob.
Find upstream datasets and fields of a dataset
curl 'http://localhost:8080/relationships?direction=OUTGOING&urn=urn%3Ali%3Adataset%3A(urn%3Ali%3AdataPlatform%3Apostgres,bar,PROD)&types=DownstreamOf'
{
"start": 0,
"count": 9,
"relationships": [
{
"type": "DownstreamOf",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar4,PROD)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD),c1)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c3)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c2)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD),c2)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar4,PROD),c1)"
},
{
"type": "DownstreamOf",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c1)"
}
],
"total": 9
}
Find the datasets and fields consumed by a datajob i.e. inputs to a datajob
curl 'http://localhost:8080/relationships?direction=OUTGOING&urn=urn%3Ali%3AdataJob%3A(urn%3Ali%3AdataFlow%3A(spark,Flow1,prod),Task1)&types=Consumes'
{
"start": 0,
"count": 9,
"relationships": [
{
"type": "Consumes",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar4,PROD)"
},
{
"type": "Consumes",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD)"
},
{
"type": "Consumes",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar4,PROD),c1)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD),c2)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar3,PROD),c1)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c3)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c2)"
},
{
"type": "Consumes",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c1)"
}
],
"total": 9
}
Find the datasets and fields produced by a datajob i.e. outputs of a datajob
curl 'http://localhost:8080/relationships?direction=OUTGOING&urn=urn%3Ali%3AdataJob%3A(urn%3Ali%3AdataFlow%3A(spark,Flow1,prod),Task1)&types=Produces'
{
"start": 0,
"count": 11,
"relationships": [
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD),c9)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c9)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c7)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c6)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c5)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c4)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c3)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c2)"
},
{
"type": "Produces",
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD),c1)"
},
{
"type": "Produces",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar2,PROD)"
},
{
"type": "Produces",
"entity": "urn:li:dataset:(urn:li:dataPlatform:postgres,bar,PROD)"
}
],
"total": 11
}
Documentation, Links etc.
Documentation for Datasets is available via the datasetProperties aspect (typically filled out via ingestion connectors when information is already present in the source system) and via the editableDatasetProperties aspect (filled out via the UI typically)
Links that contain more knowledge about the dataset (e.g. links to Confluence pages) can be added via the institutionalMemory aspect.
Here is a simple script that shows you how to add documentation for a dataset including some links to pages using the low-level Python SDK.
Python SDK: Add documentation, links to a dataset
from datahub.sdk import DataHubClient, DatasetUrn
client = DataHubClient.from_env()
dataset = client.entities.get(DatasetUrn(platform="hive", name="realestate_db.sales"))
# Add dataset documentation
documentation = """## The Real Estate Sales Dataset
This is a really important Dataset that contains all the relevant information about sales that have happened organized by address.
"""
dataset.set_description(documentation)
# Add link to institutional memory
dataset.add_link(
(
"https://wikipedia.com/real_estate",
"This is the definition of what real estate means", # link description
)
)
client.entities.update(dataset)
Notable Exceptions
The following overloaded uses of the Dataset entity exist for convenience, but will likely move to fully modeled entity types in the future.
- OpenAPI endpoints: the GET API of OpenAPI endpoints are currently modeled as Datasets, but should really be modeled as a Service/API entity once this is created in the metadata model.
- DataHub's Logical Entities (e.g.. Dataset, Chart, Dashboard) are represented as Datasets, with sub-type Entity. These should really be modeled as Entities in a logical ER model once this is created in the metadata model.
Technical Reference
For technical details about fields, searchability, and relationships, view the Columns tab in DataHub.