DataHubGc
DataHub Garbage Collection Source Documentation
Overview
The DataHub Garbage Collection (GC) source is a maintenance component responsible for cleaning up various types of metadata to maintain system performance and data quality. It performs multiple cleanup tasks, each focusing on different aspects of DataHub's metadata.
Cleanup Tasks
1. Index Cleanup
Manages Elasticsearch indices in DataHub, particularly focusing on time-series data.
Configuration
source:
  type: datahub-gc
  config:
    truncate_indices: true
    truncate_index_older_than_days: 30
    truncation_watch_until: 10000
    truncation_sleep_between_seconds: 30
Features
- Truncates old Elasticsearch indices for the following timeseries aspects:- DatasetOperations
- DatasetUsageStatistics
- ChartUsageStatistics
- DashboardUsageStatistics
- QueryUsageStatistics
- Timeseries Aspects
 
- Monitors truncation progress
- Implements safe deletion with monitoring thresholds
- Supports gradual truncation with sleep intervals
2. Expired Token Cleanup
Manages access tokens in DataHub to maintain security and prevent token accumulation.
Configuration
source:
  type: datahub-gc
  config:
    cleanup_expired_tokens: true
Features
- Automatically identifies and revokes expired access tokens
- Processes tokens in batches for efficiency
- Maintains system security by removing outdated credentials
- Reports number of tokens revoked
- Uses GraphQL API for token management
3. Data Process Cleanup
Manages the lifecycle of data processes, jobs, and their instances (DPIs) within DataHub.
Features
- Cleans up Data Process Instances (DPIs) based on age and count
- Can remove empty DataJobs and DataFlows
- Supports both soft and hard deletion
- Uses parallel processing for efficient cleanup
- Maintains configurable retention policies
Configuration
source:
  type: datahub-gc
  config:
    dataprocess_cleanup:
      enabled: true
      retention_days: 10
      keep_last_n: 5
      delete_empty_data_jobs: false
      delete_empty_data_flows: false
      hard_delete_entities: false
      batch_size: 500
      max_workers: 10
      delay: 0.25
Limitations
- Maximum 9000 DPIs per job for performance
4. Execution Request Cleanup
Manages DataHub execution request records to prevent accumulation of historical execution data.
Features
- Maintains execution history per ingestion source
- Preserves minimum number of recent requests
- Removes old requests beyond retention period
- Special handling for running/pending requests
- Automatic cleanup of corrupted records
Configuration
source:
  type: datahub-gc
  config:
    execution_request_cleanup:
      enabled: true
      keep_history_min_count: 10
      keep_history_max_count: 1000
      keep_history_max_days: 30
      batch_read_size: 100
      runtime_limit_seconds: 3600
      max_read_errors: 10
5. Soft-Deleted Entities Cleanup
Manages the permanent removal of soft-deleted entities after a retention period.
Features
- Permanently removes soft-deleted entities after retention period
- Handles entity references cleanup
- Special handling for query entities
- Supports filtering by entity type, platform, or environment
- Concurrent processing with safety limits
Configuration
source:
  type: datahub-gc
  config:
    soft_deleted_entities_cleanup:
      enabled: true
      retention_days: 10
      batch_size: 500
      max_workers: 10
      delay: 0.25
      entity_types: null # Optional list of entity types to clean
      platform: null # Optional platform filter
      env: null # Optional environment filter
      query: null # Optional custom query filter
      limit_entities_delete: 25000
      futures_max_at_time: 1000
      runtime_limit_seconds: 7200
Performance Considerations
- Concurrent processing using thread pools
- Configurable batch sizes for optimal performance
- Rate limiting through configurable delays
- Maximum limits on concurrent operations
Reporting
Each cleanup task maintains detailed reports including:
- Number of entities processed
- Number of entities removed
- Errors encountered
- Sample of affected entities
- Runtime statistics
- Task-specific metrics
DataHubGcSource is responsible for performing garbage collection tasks on DataHub.
This source performs the following tasks:
- Cleans up expired tokens.
- Truncates Elasticsearch indices based on configuration.
- Cleans up data processes and soft-deleted entities if configured.
CLI based Ingestion
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| cleanup_expired_tokens boolean | Whether to clean up expired tokens or not Default: True | 
| dry_run boolean | Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup. Default: False | 
| truncate_index_older_than_days integer | Indices older than this number of days will be truncated Default: 30 | 
| truncate_indices boolean | Whether to truncate elasticsearch indices or not which can be safely truncated Default: True | 
| truncation_sleep_between_seconds integer | Sleep between truncation monitoring. Default: 30 | 
| truncation_watch_until integer | Wait for truncation of indices until this number of documents are left Default: 10000 | 
| dataprocess_cleanup DataProcessCleanupConfig | |
| dataprocess_cleanup.batch_size integer | The number of entities to get in a batch from API Default: 500 | 
| dataprocess_cleanup.delay One of number, null | Delay between each batch Default: 0.25 | 
| dataprocess_cleanup.delete_empty_data_flows boolean | Whether to delete Data Flows without runs Default: False | 
| dataprocess_cleanup.delete_empty_data_jobs boolean | Whether to delete Data Jobs without runs Default: False | 
| dataprocess_cleanup.enabled boolean | Whether to do data process cleanup. Default: True | 
| dataprocess_cleanup.hard_delete_entities boolean | Whether to hard delete entities Default: False | 
| dataprocess_cleanup.keep_last_n One of integer, null | Number of latest aspects to keep Default: 5 | 
| dataprocess_cleanup.max_workers integer | The number of workers to use for deletion Default: 10 | 
| dataprocess_cleanup.retention_days One of integer, null | Number of days to retain metadata in DataHub Default: 10 | 
| dataprocess_cleanup.aspects_to_clean array | List of aspect names to clean up Default: ['DataprocessInstance'] | 
| dataprocess_cleanup.aspects_to_clean.string string | |
| execution_request_cleanup DatahubExecutionRequestCleanupConfig | |
| execution_request_cleanup.batch_read_size integer | Number of records per read operation Default: 100 | 
| execution_request_cleanup.enabled boolean | Global switch for this cleanup task Default: True | 
| execution_request_cleanup.keep_history_max_count integer | Maximum number of execution requests to keep, per ingestion source Default: 1000 | 
| execution_request_cleanup.keep_history_max_days integer | Maximum number of days to keep execution requests for, per ingestion source Default: 90 | 
| execution_request_cleanup.keep_history_min_count integer | Minimum number of execution requests to keep, per ingestion source Default: 10 | 
| execution_request_cleanup.limit_entities_delete One of integer, null | Max number of execution requests to hard delete. Default: 10000 | 
| execution_request_cleanup.max_read_errors integer | Maximum number of read errors before aborting Default: 10 | 
| execution_request_cleanup.runtime_limit_seconds integer | Maximum runtime in seconds for the cleanup task Default: 3600 | 
| soft_deleted_entities_cleanup SoftDeletedEntitiesCleanupConfig | |
| soft_deleted_entities_cleanup.batch_size integer | The number of entities to get in a batch from GraphQL Default: 500 | 
| soft_deleted_entities_cleanup.delay One of number, null | Delay between each batch Default: 0.25 | 
| soft_deleted_entities_cleanup.enabled boolean | Whether to do soft deletion cleanup. Default: True | 
| soft_deleted_entities_cleanup.futures_max_at_time integer | Max number of futures to have at a time. Default: 1000 | 
| soft_deleted_entities_cleanup.limit_entities_delete One of integer, null | Max number of entities to delete. Default: 25000 | 
| soft_deleted_entities_cleanup.max_workers integer | The number of workers to use for deletion Default: 10 | 
| soft_deleted_entities_cleanup.platform One of string, null | Platform to cleanup Default: None | 
| soft_deleted_entities_cleanup.query One of string, null | Query to filter entities Default: None | 
| soft_deleted_entities_cleanup.retention_days integer | Number of days to retain metadata in DataHub Default: 10 | 
| soft_deleted_entities_cleanup.runtime_limit_seconds integer | Runtime limit in seconds Default: 7200 | 
| soft_deleted_entities_cleanup.env One of string, null | Environment to cleanup Default: None | 
| soft_deleted_entities_cleanup.entity_types One of array, null | List of entity types to cleanup Default: ['dataset', 'dashboard', 'chart', 'mlmodel', 'mlmo... | 
| soft_deleted_entities_cleanup.entity_types.string string | 
The JSONSchema for this configuration is inlined below.
{
  "$defs": {
    "DataProcessCleanupConfig": {
      "additionalProperties": false,
      "properties": {
        "enabled": {
          "default": true,
          "description": "Whether to do data process cleanup.",
          "title": "Enabled",
          "type": "boolean"
        },
        "retention_days": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 10,
          "description": "Number of days to retain metadata in DataHub",
          "title": "Retention Days"
        },
        "aspects_to_clean": {
          "default": [
            "DataprocessInstance"
          ],
          "description": "List of aspect names to clean up",
          "items": {
            "type": "string"
          },
          "title": "Aspects To Clean",
          "type": "array"
        },
        "keep_last_n": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 5,
          "description": "Number of latest aspects to keep",
          "title": "Keep Last N"
        },
        "delete_empty_data_jobs": {
          "default": false,
          "description": "Whether to delete Data Jobs without runs",
          "title": "Delete Empty Data Jobs",
          "type": "boolean"
        },
        "delete_empty_data_flows": {
          "default": false,
          "description": "Whether to delete Data Flows without runs",
          "title": "Delete Empty Data Flows",
          "type": "boolean"
        },
        "hard_delete_entities": {
          "default": false,
          "description": "Whether to hard delete entities",
          "title": "Hard Delete Entities",
          "type": "boolean"
        },
        "batch_size": {
          "default": 500,
          "description": "The number of entities to get in a batch from API",
          "title": "Batch Size",
          "type": "integer"
        },
        "max_workers": {
          "default": 10,
          "description": "The number of workers to use for deletion",
          "title": "Max Workers",
          "type": "integer"
        },
        "delay": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": 0.25,
          "description": "Delay between each batch",
          "title": "Delay"
        }
      },
      "title": "DataProcessCleanupConfig",
      "type": "object"
    },
    "DatahubExecutionRequestCleanupConfig": {
      "additionalProperties": false,
      "properties": {
        "keep_history_min_count": {
          "default": 10,
          "description": "Minimum number of execution requests to keep, per ingestion source",
          "title": "Keep History Min Count",
          "type": "integer"
        },
        "keep_history_max_count": {
          "default": 1000,
          "description": "Maximum number of execution requests to keep, per ingestion source",
          "title": "Keep History Max Count",
          "type": "integer"
        },
        "keep_history_max_days": {
          "default": 90,
          "description": "Maximum number of days to keep execution requests for, per ingestion source",
          "title": "Keep History Max Days",
          "type": "integer"
        },
        "batch_read_size": {
          "default": 100,
          "description": "Number of records per read operation",
          "title": "Batch Read Size",
          "type": "integer"
        },
        "enabled": {
          "default": true,
          "description": "Global switch for this cleanup task",
          "title": "Enabled",
          "type": "boolean"
        },
        "runtime_limit_seconds": {
          "default": 3600,
          "description": "Maximum runtime in seconds for the cleanup task",
          "title": "Runtime Limit Seconds",
          "type": "integer"
        },
        "limit_entities_delete": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 10000,
          "description": "Max number of execution requests to hard delete.",
          "title": "Limit Entities Delete"
        },
        "max_read_errors": {
          "default": 10,
          "description": "Maximum number of read errors before aborting",
          "title": "Max Read Errors",
          "type": "integer"
        }
      },
      "title": "DatahubExecutionRequestCleanupConfig",
      "type": "object"
    },
    "SoftDeletedEntitiesCleanupConfig": {
      "additionalProperties": false,
      "properties": {
        "enabled": {
          "default": true,
          "description": "Whether to do soft deletion cleanup.",
          "title": "Enabled",
          "type": "boolean"
        },
        "retention_days": {
          "default": 10,
          "description": "Number of days to retain metadata in DataHub",
          "title": "Retention Days",
          "type": "integer"
        },
        "batch_size": {
          "default": 500,
          "description": "The number of entities to get in a batch from GraphQL",
          "title": "Batch Size",
          "type": "integer"
        },
        "delay": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": 0.25,
          "description": "Delay between each batch",
          "title": "Delay"
        },
        "max_workers": {
          "default": 10,
          "description": "The number of workers to use for deletion",
          "title": "Max Workers",
          "type": "integer"
        },
        "entity_types": {
          "anyOf": [
            {
              "items": {
                "type": "string"
              },
              "type": "array"
            },
            {
              "type": "null"
            }
          ],
          "default": [
            "dataset",
            "dashboard",
            "chart",
            "mlmodel",
            "mlmodelGroup",
            "mlfeatureTable",
            "mlfeature",
            "mlprimaryKey",
            "dataFlow",
            "dataJob",
            "glossaryTerm",
            "glossaryNode",
            "tag",
            "role",
            "corpuser",
            "corpGroup",
            "container",
            "domain",
            "dataProduct",
            "notebook",
            "businessAttribute",
            "schemaField",
            "query",
            "dataProcessInstance"
          ],
          "description": "List of entity types to cleanup",
          "title": "Entity Types"
        },
        "platform": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Platform to cleanup",
          "title": "Platform"
        },
        "env": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Environment to cleanup",
          "title": "Env"
        },
        "query": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Query to filter entities",
          "title": "Query"
        },
        "limit_entities_delete": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 25000,
          "description": "Max number of entities to delete.",
          "title": "Limit Entities Delete"
        },
        "futures_max_at_time": {
          "default": 1000,
          "description": "Max number of futures to have at a time.",
          "title": "Futures Max At Time",
          "type": "integer"
        },
        "runtime_limit_seconds": {
          "default": 7200,
          "description": "Runtime limit in seconds",
          "title": "Runtime Limit Seconds",
          "type": "integer"
        }
      },
      "title": "SoftDeletedEntitiesCleanupConfig",
      "type": "object"
    }
  },
  "additionalProperties": false,
  "properties": {
    "dry_run": {
      "default": false,
      "description": "Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup.",
      "title": "Dry Run",
      "type": "boolean"
    },
    "cleanup_expired_tokens": {
      "default": true,
      "description": "Whether to clean up expired tokens or not",
      "title": "Cleanup Expired Tokens",
      "type": "boolean"
    },
    "truncate_indices": {
      "default": true,
      "description": "Whether to truncate elasticsearch indices or not which can be safely truncated",
      "title": "Truncate Indices",
      "type": "boolean"
    },
    "truncate_index_older_than_days": {
      "default": 30,
      "description": "Indices older than this number of days will be truncated",
      "title": "Truncate Index Older Than Days",
      "type": "integer"
    },
    "truncation_watch_until": {
      "default": 10000,
      "description": "Wait for truncation of indices until this number of documents are left",
      "title": "Truncation Watch Until",
      "type": "integer"
    },
    "truncation_sleep_between_seconds": {
      "default": 30,
      "description": "Sleep between truncation monitoring.",
      "title": "Truncation Sleep Between Seconds",
      "type": "integer"
    },
    "dataprocess_cleanup": {
      "$ref": "#/$defs/DataProcessCleanupConfig",
      "description": "Configuration for data process cleanup"
    },
    "soft_deleted_entities_cleanup": {
      "$ref": "#/$defs/SoftDeletedEntitiesCleanupConfig",
      "description": "Configuration for soft deleted entities cleanup"
    },
    "execution_request_cleanup": {
      "$ref": "#/$defs/DatahubExecutionRequestCleanupConfig",
      "description": "Configuration for execution request cleanup"
    }
  },
  "title": "DataHubGcSourceConfig",
  "type": "object"
}
Code Coordinates
- Class Name: datahub.ingestion.source.gc.datahub_gc.DataHubGcSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DataHubGc, feel free to ping us on our Slack.