Subscriptions
Why Would You Use Subscriptions on Datasets?
Subscriptions are a way to receive notifications when entity changes occur (e.g. deprecations, schema changes, ownership changes, etc.) or when assertions change state (pass, fail, or error). Subscriptions can be created at the dataset level (affecting any changes on the dataset, as well as all assertions on the dataset) or at the assertion level (affecting only specific assertions).
Goal Of This Guide
This guide specifically covers how to use the DataHub Cloud Python SDK for managing Subscriptions:
- Create: create a subscription to a dataset or assertion.
- Remove: remove a subscription.
Prerequisites
- DataHub Cloud Python SDK installed (
pip install acryl-datahub-cloud
) - The actor making API calls must have the
Manage User Subscriptions
privilege for the datasets at hand. - If subscribing to a group, the actor should also be a member of the group.
Before creating subscriptions, you need to ensure the target datasets and groups are already present in your DataHub instance. If you attempt to create subscriptions for entities that do not exist, GMS will continuously report errors to the logs.
Create Subscription
You can create subscriptions to receive notifications when assertions change state (pass, fail, or error) or when other entity changes occur. Subscriptions can be created at the dataset level (affecting any changes on the dataset, as well as all assertions on the dataset) or at the assertion level (affecting only specific assertions).
- Python
# Inlined from /metadata-ingestion/examples/library/create_subscription.py
import logging
from datahub.sdk import DataHubClient
log = logging.getLogger(__name__)
# Initialize the client
client = DataHubClient(
server="https://your-datahub-cloud-instance.com", token="your-token"
)
# Subscribe to all assertion changes for a dataset
client.subscriptions.subscribe(
urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,purchases,PROD)",
subscriber_urn="urn:li:corpuser:john.doe",
# entity_change_types defaults to all available change types for datasets
)
log.info("Successfully subscribed to dataset notifications")
# Subscribe to specific assertion changes
client.subscriptions.subscribe(
urn="urn:li:assertion:your-assertion-id",
subscriber_urn="urn:li:corpuser:john.doe",
entity_change_types=["ASSERTION_PASSED", "ASSERTION_FAILED"],
)
log.info("Successfully subscribed to specific assertion changes")
# Subscribe a group to assertion changes
client.subscriptions.subscribe(
urn="urn:li:assertion:your-assertion-id",
subscriber_urn="urn:li:corpGroup:data-team",
entity_change_types=["ASSERTION_FAILED", "ASSERTION_ERROR"],
)
log.info("Successfully subscribed group to assertion failures and errors")
Remove Subscription
You can remove existing subscriptions to stop receiving notifications. The unsubscribe method supports selective removal of specific change types or complete removal of subscriptions.
- Python
# Inlined from /metadata-ingestion/examples/library/remove_subscription.py
import logging
from datahub.sdk import DataHubClient
log = logging.getLogger(__name__)
# Initialize the client
client = DataHubClient(
server="https://your-datahub-cloud-instance.com", token="your-token"
)
# Unsubscribe from all changes for a dataset
client.subscriptions.unsubscribe(
urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,purchases,PROD)",
subscriber_urn="urn:li:corpuser:john.doe",
# entity_change_types defaults to all existing change types
)
log.info("Successfully unsubscribed from all dataset notifications")
# Unsubscribe from specific assertion change types
client.subscriptions.unsubscribe(
urn="urn:li:assertion:your-assertion-id",
subscriber_urn="urn:li:corpuser:john.doe",
entity_change_types=[
"ASSERTION_PASSED"
], # Keep ASSERTION_FAILED and ASSERTION_ERROR
)
log.info("Successfully unsubscribed from specific assertion change types")
# Unsubscribe a group from assertion changes
client.subscriptions.unsubscribe(
urn="urn:li:assertion:your-assertion-id",
subscriber_urn="urn:li:corpGroup:data-team",
entity_change_types=["ASSERTION_FAILED", "ASSERTION_ERROR"],
)
log.info("Successfully unsubscribed group from assertion notifications")
Available Change Types
The following change types are available for subscriptions:
Schema Changes
OPERATION_COLUMN_ADDED
- When a new column is added to a datasetOPERATION_COLUMN_REMOVED
- When a column is removed from a datasetOPERATION_COLUMN_MODIFIED
- When an existing column is modified
Operational Metadata Changes
OPERATION_ROWS_INSERTED
- When rows are inserted into a datasetOPERATION_ROWS_UPDATED
- When rows are updated in a datasetOPERATION_ROWS_REMOVED
- When rows are removed from a dataset
Assertion Events
ASSERTION_PASSED
- When an assertion run passesASSERTION_FAILED
- When an assertion run failsASSERTION_ERROR
- When an assertion run encounters an error
Incident Status Changes
INCIDENT_RAISED
- When a new incident is raisedINCIDENT_RESOLVED
- When an incident is resolved
Test Status Changes
TEST_PASSED
- When a test passesTEST_FAILED
- When a test fails
Deprecation Status Changes
DEPRECATED
- When an entity is marked as deprecatedUNDEPRECATED
- When an entity's deprecation status is removed
Ingestion Status Changes
INGESTION_SUCCEEDED
- When ingestion completes successfullyINGESTION_FAILED
- When ingestion fails
Documentation Changes
DOCUMENTATION_CHANGE
- When documentation is modified
Ownership Changes
OWNER_ADDED
- When an owner is added to an entityOWNER_REMOVED
- When an owner is removed from an entity
Glossary Term Changes
GLOSSARY_TERM_ADDED
- When a glossary term is added to an entityGLOSSARY_TERM_REMOVED
- When a glossary term is removed from an entityGLOSSARY_TERM_PROPOSED
- When a glossary term is proposed for an entity
Tag Changes
TAG_ADDED
- When a tag is added to an entityTAG_REMOVED
- When a tag is removed from an entityTAG_PROPOSED
- When a tag is proposed for an entity