DynamoDB
Important Capabilities
| Capability | Status | Notes | 
|---|---|---|
| Classification | ✅ | Optionally enabled via classification.enabled | 
| Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata | 
| Platform Instance | ✅ | By default, platform_instance will use the AWS account id | 
This plugin extracts the following:
AWS DynamoDB table names with their region, and infer schema of attribute names and types by scanning the table
Prerequisities
Notice of breaking change: Starting v0.13.3, aws_region is now a required configuration for DynamoDB Connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
In order to execute this source, you need to attach the AmazonDynamoDBReadOnlyAccess policy to a user in your AWS account. Then create an API access key and secret for the user. This future proofs it in case we need to make further changes. But you can use these privileges to run this source for now
dynamodb:ListTables
dynamodb:DescribeTable
dynamodb:Scan
We need dynamodb:Scan because Dynamodb does not return the schema in dynamodb:DescribeTable and thus we sample few values to understand the schema.
Concept Mapping
| Source Concept | DataHub Concept | Notes | 
|---|---|---|
| "dynamodb" | Data Platform | |
| DynamoDB Table | Dataset | 
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
  type: dynamodb
  config:
    platform_instance: "AWS_ACCOUNT_ID"
    aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
    aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
    aws_session_token: "${AWS_SESSION_TOKEN}"
    aws_region: "${AWS_REGION}"
    #
    # If there are items that have most representative fields of the table, users could use the
    # `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
    # For each `region.table`, the list of primary keys can be at most 100.
    # We include these items in addition to the first 100 items in the table when we scan it.
    #
    # include_table_item:
    #   region.table_name:
    #     [
    #       {
    #         "partition_key_name": { "attribute_type": "attribute_value" },
    #         "sort_key_name": { "attribute_type": "attribute_value" },
    #       },
    #     ]
sink:
  # sink configs
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| aws_access_key_id string | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. | 
| aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. | 
| aws_endpoint_url string | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. | 
| aws_profile string | The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config. | 
| aws_proxy map(str,string) | |
| aws_region string | AWS region code. | 
| aws_retry_mode Enum | One of: "legacy", "standard", "adaptive" Default: standard | 
| aws_retry_num integer | Number of times to retry failed AWS requests. See the botocore.retry docs for details. Default: 5 | 
| aws_secret_access_key string | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. | 
| aws_session_token string | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. | 
| platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. | 
| read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 | 
| env string | The environment that all assets produced by this connector belong to Default: PROD | 
| aws_role One of string, array | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role. | 
| aws_role.union One of string, AwsAssumeRoleConfig | |
| aws_role.union.RoleArn ❓ string | ARN of the role to assume. | 
| aws_role.union.ExternalId string | External ID to use when assuming the role. | 
| database_pattern AllowDenyPattern | regex patterns for databases to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| database_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| database_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| database_pattern.allow.string string | |
| database_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| database_pattern.deny.string string | |
| domain map(str,AllowDenyPattern) | A class to store allow deny regexes | 
| domain. key.allowarray | List of regex patterns to include in ingestion Default: ['.*'] | 
| domain. key.allow.stringstring | |
| domain. key.ignoreCaseboolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| domain. key.denyarray | List of regex patterns to exclude from ingestion. Default: [] | 
| domain. key.deny.stringstring | |
| include_table_item map(str,array) | |
| include_table_item. key.objectobject | |
| table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. The table name format is 'region.table' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| table_pattern.allow.string string | |
| table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| table_pattern.deny.string string | |
| classification ClassificationConfig | For details, refer to Classification. Default: {'enabled': False, 'sample_size': 100, 'max_worker... | 
| classification.enabled boolean | Whether classification should be used to auto-detect glossary terms Default: False | 
| classification.info_type_to_term map(str,string) | |
| classification.max_workers integer | Number of worker processes to use for classification. Set to 1 to disable. Default: 14 | 
| classification.sample_size integer | Number of sample values used for classification. Default: 100 | 
| classification.classifiers array | Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance. Default: [{'type': 'datahub', 'config': None}] | 
| classification.classifiers.DynamicTypedClassifierConfig DynamicTypedClassifierConfig | |
| classification.classifiers.DynamicTypedClassifierConfig.type ❓ string | The type of the classifier to use. For DataHub,  use datahub | 
| classification.classifiers.DynamicTypedClassifierConfig.config object | The configuration required for initializing the classifier. If not specified, uses defaults for classifer type. | 
| classification.column_pattern AllowDenyPattern | Regex patterns to filter columns for classification. This is used in combination with other patterns in parent config. Specify regex to match the column name in database.schema.table.columnformat.Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| classification.column_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| classification.column_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| classification.column_pattern.allow.string string | |
| classification.column_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| classification.column_pattern.deny.string string | |
| classification.table_pattern AllowDenyPattern | Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in database.schema.tableformat. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| classification.table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| classification.table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| classification.table_pattern.allow.string string | |
| classification.table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| classification.table_pattern.deny.string string | |
| stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | 
| stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_apiis specified, otherwise FalseDefault: False | 
| stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 | 
| stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True | 
The JSONSchema for this configuration is inlined below.
{
  "title": "DynamoDBConfig",
  "description": "Any source that is a primary producer of Dataset metadata should inherit this class",
  "type": "object",
  "properties": {
    "aws_access_key_id": {
      "title": "Aws Access Key Id",
      "description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
      "type": "string"
    },
    "aws_secret_access_key": {
      "title": "Aws Secret Access Key",
      "description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
      "type": "string"
    },
    "aws_session_token": {
      "title": "Aws Session Token",
      "description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
      "type": "string"
    },
    "aws_role": {
      "title": "Aws Role",
      "description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "array",
          "items": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "$ref": "#/definitions/AwsAssumeRoleConfig"
              }
            ]
          }
        }
      ]
    },
    "aws_profile": {
      "title": "Aws Profile",
      "description": "The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.",
      "type": "string"
    },
    "aws_region": {
      "title": "Aws Region",
      "description": "AWS region code.",
      "type": "string"
    },
    "aws_endpoint_url": {
      "title": "Aws Endpoint Url",
      "description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
      "type": "string"
    },
    "aws_proxy": {
      "title": "Aws Proxy",
      "description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
      "type": "object",
      "additionalProperties": {
        "type": "string"
      }
    },
    "aws_retry_num": {
      "title": "Aws Retry Num",
      "description": "Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
      "default": 5,
      "type": "integer"
    },
    "aws_retry_mode": {
      "title": "Aws Retry Mode",
      "description": "Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
      "default": "standard",
      "enum": [
        "legacy",
        "standard",
        "adaptive"
      ],
      "type": "string"
    },
    "read_timeout": {
      "title": "Read Timeout",
      "description": "The timeout for reading from the connection (in seconds).",
      "default": 60,
      "type": "number"
    },
    "aws_advanced_config": {
      "title": "Aws Advanced Config",
      "description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
      "type": "object"
    },
    "env": {
      "title": "Env",
      "description": "The environment that all assets produced by this connector belong to",
      "default": "PROD",
      "type": "string"
    },
    "database_pattern": {
      "title": "Database Pattern",
      "description": "regex patterns for databases to filter in ingestion.",
      "default": {
        "allow": [
          ".*"
        ],
        "deny": [],
        "ignoreCase": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/AllowDenyPattern"
        }
      ]
    },
    "table_pattern": {
      "title": "Table Pattern",
      "description": "Regex patterns for tables to filter in ingestion. The table name format is 'region.table'",
      "default": {
        "allow": [
          ".*"
        ],
        "deny": [],
        "ignoreCase": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/AllowDenyPattern"
        }
      ]
    },
    "classification": {
      "title": "Classification",
      "description": "For details, refer to [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
      "default": {
        "enabled": false,
        "sample_size": 100,
        "max_workers": 14,
        "table_pattern": {
          "allow": [
            ".*"
          ],
          "deny": [],
          "ignoreCase": true
        },
        "column_pattern": {
          "allow": [
            ".*"
          ],
          "deny": [],
          "ignoreCase": true
        },
        "info_type_to_term": {},
        "classifiers": [
          {
            "type": "datahub",
            "config": null
          }
        ]
      },
      "allOf": [
        {
          "$ref": "#/definitions/ClassificationConfig"
        }
      ]
    },
    "stateful_ingestion": {
      "$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
    },
    "platform_instance": {
      "title": "Platform Instance",
      "description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
      "type": "string"
    },
    "domain": {
      "title": "Domain",
      "description": "regex patterns for tables to filter to assign domain_key. ",
      "default": {},
      "type": "object",
      "additionalProperties": {
        "$ref": "#/definitions/AllowDenyPattern"
      }
    },
    "include_table_item": {
      "title": "Include Table Item",
      "description": "[Advanced] The primary keys of items of a table in dynamodb format the user would like to include in schema. Refer \"Advanced Configurations\" section for more details",
      "type": "object",
      "additionalProperties": {
        "type": "array",
        "items": {
          "type": "object"
        }
      }
    }
  },
  "additionalProperties": false,
  "definitions": {
    "AwsAssumeRoleConfig": {
      "title": "AwsAssumeRoleConfig",
      "type": "object",
      "properties": {
        "RoleArn": {
          "title": "Rolearn",
          "description": "ARN of the role to assume.",
          "type": "string"
        },
        "ExternalId": {
          "title": "Externalid",
          "description": "External ID to use when assuming the role.",
          "type": "string"
        }
      },
      "required": [
        "RoleArn"
      ]
    },
    "AllowDenyPattern": {
      "title": "AllowDenyPattern",
      "description": "A class to store allow deny regexes",
      "type": "object",
      "properties": {
        "allow": {
          "title": "Allow",
          "description": "List of regex patterns to include in ingestion",
          "default": [
            ".*"
          ],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "deny": {
          "title": "Deny",
          "description": "List of regex patterns to exclude from ingestion.",
          "default": [],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "ignoreCase": {
          "title": "Ignorecase",
          "description": "Whether to ignore case sensitivity during pattern matching.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "DynamicTypedClassifierConfig": {
      "title": "DynamicTypedClassifierConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the classifier to use. For DataHub,  use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the classifier. If not specified, uses defaults for classifer type."
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "ClassificationConfig": {
      "title": "ClassificationConfig",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "Whether classification should be used to auto-detect glossary terms",
          "default": false,
          "type": "boolean"
        },
        "sample_size": {
          "title": "Sample Size",
          "description": "Number of sample values used for classification.",
          "default": 100,
          "type": "integer"
        },
        "max_workers": {
          "title": "Max Workers",
          "description": "Number of worker processes to use for classification. Set to 1 to disable.",
          "default": 14,
          "type": "integer"
        },
        "table_pattern": {
          "title": "Table Pattern",
          "description": "Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in `database.schema.table` format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
          "default": {
            "allow": [
              ".*"
            ],
            "deny": [],
            "ignoreCase": true
          },
          "allOf": [
            {
              "$ref": "#/definitions/AllowDenyPattern"
            }
          ]
        },
        "column_pattern": {
          "title": "Column Pattern",
          "description": "Regex patterns to filter columns for classification. This is used in combination with other patterns in parent config. Specify regex to match the column name in `database.schema.table.column` format.",
          "default": {
            "allow": [
              ".*"
            ],
            "deny": [],
            "ignoreCase": true
          },
          "allOf": [
            {
              "$ref": "#/definitions/AllowDenyPattern"
            }
          ]
        },
        "info_type_to_term": {
          "title": "Info Type To Term",
          "description": "Optional mapping to provide glossary term identifier for info type",
          "default": {},
          "type": "object",
          "additionalProperties": {
            "type": "string"
          }
        },
        "classifiers": {
          "title": "Classifiers",
          "description": "Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance.",
          "default": [
            {
              "type": "datahub",
              "config": null
            }
          ],
          "type": "array",
          "items": {
            "$ref": "#/definitions/DynamicTypedClassifierConfig"
          }
        }
      },
      "additionalProperties": false
    },
    "DynamicTypedStateProviderConfig": {
      "title": "DynamicTypedStateProviderConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the state provider to use. For DataHub use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
          "default": {},
          "type": "object"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "StatefulStaleMetadataRemovalConfig": {
      "title": "StatefulStaleMetadataRemovalConfig",
      "description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
          "default": false,
          "type": "boolean"
        },
        "remove_stale_metadata": {
          "title": "Remove Stale Metadata",
          "description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
          "default": true,
          "type": "boolean"
        },
        "fail_safe_threshold": {
          "title": "Fail Safe Threshold",
          "description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
          "default": 75.0,
          "minimum": 0.0,
          "maximum": 100.0,
          "type": "number"
        }
      },
      "additionalProperties": false
    }
  }
}
Advanced Configurations
Using include_table_item config
If there are items that have most representative fields of the table, users could use the include_table_item option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it.
Take AWS DynamoDB Developer Guide Example tables and data as an example, if a account has a table Reply in the us-west-2 region with composite primary key Id and ReplyDateTime, users can use include_table_item to include 2 items as following:
Example:
# The table name should be in the format of region.table_name
# The primary keys should be in the DynamoDB format
include_table_item:
  us-west-2.Reply:
    [
      {
        "ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" },
        "Id": { "S": "Amazon DynamoDB#DynamoDB Thread 1" },
      },
      {
        "ReplyDateTime": { "S": "2015-10-05T19:58:22.947Z" },
        "Id": { "S": "Amazon DynamoDB#DynamoDB Thread 2" },
      },
    ]
Code Coordinates
- Class Name: datahub.ingestion.source.dynamodb.dynamodb.DynamoDBSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DynamoDB, feel free to ping us on our Slack.