Skip to main content
Version: Next

Load Indices: High-Performance Bulk Index Loading

LoadIndices is a high-performance upgrade task designed for bulk loading metadata aspects directly from the database into Elasticsearch/OpenSearch indices. Unlike RestoreIndices which focuses on correctness and consistency, LoadIndices is optimized for speed and throughput during initial deployments or large-scale data migrations.

Overview

LoadIndices bypasses the standard event-driven processing pipeline to directly stream data from the metadata_aspect_v2 table into search indices using optimized bulk operations. This approach provides significant performance improvements for large installations while making specific architectural trade-offs that prioritize speed over consistency.

🚨 CRITICAL WARNING: LoadIndices is designed for specific use cases only and should NEVER be used in production environments with active concurrent writes, MCL-dependent systems, or real-time consistency requirements. See Performance Trade-offs & Implications for complete details.

Key Features

  • 🚀 High Performance: Direct streaming from database with optimized bulk operations
  • ⚡ Fast Bulk Loading: Optimized for speed over consistency during initial loads
  • 🔧 Refresh Management: Automatically disables refresh intervals during loading for optimal performance
  • 📊 Comprehensive Monitoring: Real-time progress reporting and performance metrics
  • ⚙️ Configurable Isolation: Utilizes READ_UNCOMMITTED transactions for faster scanning

Performance Trade-offs & Implications

⚠️ Critical Understanding: LoadIndices prioritizes performance over consistency by making several architectural trade-offs. Understanding these implications is crucial before using LoadIndices in production environments.

🚨 Key Trade-offs Made

1. BYPASS Kafka/MCL Event Pipeline

  • What: LoadIndices completely bypasses Kafka MCL (Metadata Change Log) topics that normally propagate all metadata changes
  • Architecture: Database → LoadIndices → Elasticsearch vs normal flow of Database → Kafka MCL → Multiple Consumers → Elasticsearch/Graph/etc
  • Impact: No MCL events published - downstream systems lose visibility into metadata changes
  • Critical Implication:
    • MCL-Dependent Analytics: Won't have audit trail of metadata changes
    • Integrations: External systems won't be notified of changes
    • Custom MCL Consumers: Any custom consumers will miss these events entirely
    • ✅ Graph Service: WILL be updated (UpdateIndicesService handles graph indices) ⚠️ Only when Elasticsearch is used for graph storage

2. BROKEN DataHub Event Architecture

  • What: Violates DataHub's core design principle that "all metadata changes flow through Kafka MCL"
  • Normal Flow: Metadata Change → MCL Event → Kafka → Multiple Consumers → Various Stores
  • LoadIndices Flow: Metadata Change → LoadIndices → Direct ES Write (Skips Kafka entirely)

3. READ_UNCOMMITTED Isolation

  • What: Uses TxIsolation.READ_UNCOMMITTED for faster database scanning
  • Impact: May read uncommitted changes or dirty reads from concurrent transactions
  • Implication: Data consistency not guaranteed during active writes to database

4. Refresh Interval Manipulation

  • What: Automatically disables refresh intervals during bulk operations
  • Impact: Recent updates may not be immediately searchable
  • Implication: Users won't see real-time updates in search until refresh intervals are restored

5. No Write Concurrency Controls

  • What: No coordination with concurrent Elasticsearch writes from live ingestion
  • Impact: Potential conflicts with active ingestion pipelines
  • Implication: Concurrent writes may cause data inconsistency or operation failures

⚠️ When NOT to Use LoadIndices

❌ DO NOT use LoadIndices if you have:

  • Active ingestion pipelines writing to Elasticsearch simultaneously
  • MCL-dependent systems that need event notifications
  • Neo4j-based graph storage (graph updates will be missing)
  • Real-time search requirements during the loading process
  • Production traffic that requires immediate search consistency

✅ When LoadIndices is Appropriate

✅ Safe to use LoadIndices when:

  • Fresh deployment with empty Elasticsearch cluster
  • Offline migration with no concurrent users
  • Standalone indexing without DataHub services running
  • Read-only replica environments with no active writes
  • Development/testing environments
  • Disaster recovery scenarios where faster restoration is prioritized
  • Independent cluster setup where you need to populate indices before services start
  • Elasticsearch-based graph storage (graph gets updated automatically)

🔒 Safety Requirements

Before using LoadIndices in any environment:

  1. Verify Minimal Infrastructure:

    • Database: MySQL/PostgreSQL with metadata_aspect_v2 table accessible (via Ebean ORM)
    • Elasticsearch: Running cluster accessible via HTTP/HTTPS
    • DataHub Services: ✅ NOT required - LoadIndices can run independently
    • ⚠️ Check Graph Storage: Verify if using Elasticsearch-based graph storage
    • ⚠️ Check Database Type: Confirm NOT using Cassandra (not supported)
  2. Stop All Ingestion (if DataHub is running):

    # Disable all Kafka consumers
    kubectl scale deployment --replicas=0 datahub-mae-consumer
    kubectl scale deployment --replicas=0 datahub-mce-consumer
    kubectl scale deployment --replicas=0 datahub-gms
  3. Check Database Configuration:

    # Check if using Cassandra (LoadIndices NOT supported)
    grep -i cassandra /path/to/datahub/docker/docker-compose.yml

    # Verify MySQL/PostgreSQL database is configured
    grep -E "mysql\|postgres" /path/to/datahub/docker/docker-compose.yml

    # ⚠️ If Cassandra detected, LoadIndices is NOT available
    # Must use RestoreIndices instead
  4. Check Graph Storage Configuration:

    # Check if using Neo4j (graph updates will be MISSING)
    grep -r "neo4j" /path/to/datahub/docker/docker-compose.yml

    # Check DataHub configuration for graph service selection
    grep -i "graph.*elasticsearch\|neo4j" /path/to/datahub/conf/application.yml

    # ⚠️ If Neo4j is detected, LoadIndices will NOT update graph
  5. Verify No Concurrent Writes:

    # Check for active Elasticsearch indexing
    curl -s "localhost:9200/_nodes/stats" | grep "index_current"
    # Should show "index_current": 0
  6. Index Clean State:

    # Ensure clean indexing state
    curl -s "localhost:9200/_nodes/stats" | grep -E "refresh.*active"
  7. Coordinate with Operations:

    • Maintenance window scheduling
    • User notification of search unavailability
    • Monitoring of downstream system dependencies

📊 Consistency Guarantees

LevelLoadIndicesRestoreIndices
URN-level Ordering✅ Guaranteed✅ Guaranteed
Real-time Searchability❌ Delayed✅ Immediate
Graph Service Updates (ES-based)✅ Updated✅ Updated
Graph Service Updates (Neo4j-based)❌ Missing✅ Updated
MCL Event Propagation❌ Bypassed✅ Full propagation
Concurrent Write Safety❌ Not safe✅ Safe

2. Restore Normal Operations

  • Re-enable ingestion pipelines gradually
  • Monitor Elasticsearch for conflicts
  • Validate downstream systems are synchronized

3. Emergency Rollback Plan

# If issues arise, prepare rollback:
# 1. Stop LoadIndices immediately
# 2. Restore from backup indices
# 3. Re-run with RestoreIndices for correctness

How LoadIndices Works

LoadIndices operates as an upgrade task that can run independently without requiring DataHub services to be running. It consists of two main steps:

  1. BuildIndicesStep: Creates and configures Elasticsearch indices (creates indices if they don't exist)
  2. LoadIndicesStep: Streams aspects from database and bulk loads them into indices

🔧 Independent Operation Mode

Key Advantage: LoadIndices only requires:

  • MySQL/PostgreSQL source database (via Ebean ORM)
  • Elasticsearch/OpenSearch destination cluster
  • No DataHub services (maui, frontend, etc.) required
  • Cassandra: ⚠️ NOT supported (Ebean doesn't support Cassandra)

This enables offline bulk operations during maintenance windows or initial deployments where DataHub infrastructure is being set up incrementally.

Index Creation: The BuildIndicesStep automatically creates all required Elasticsearch indices based on IndexConvention patterns, so empty Elasticsearch clusters are fully supported.

Architecture Flow

graph TD
A[LoadIndices Upgrade] --> B[BuildIndicesStep]
B --> C[Create/Configure Indices]
C --> D[LoadIndicesStep]
D --> E[Disable Refresh Intervals]
E --> F[Stream Aspects from DB]
F --> G[Batch Processing]
G --> H[Convert to MCL Events]
H --> I[Bulk Write to ES]
I --> J[Restore Refresh Intervals]

Key Differences from RestoreIndices

AspectRestoreIndicesLoadIndices
PurposeCorrectness & consistencySpeed & throughput
ProcessingEvent-driven via MCL eventsDirect bulk operations
IsolationREAD_COMMITTEDREAD_UNCOMMITTED
Refresh ManagementStatic configurationDynamic disable/restore
Performance FocusAccurate replayMaximal speed
Use CaseRecovery from inconsistenciesInitial loads & migrations

Deployment & Execution

🚀 Standalone Deployment Advantage

Key Benefit: LoadIndices can run with minimal infrastructure without requiring DataHub services to be running:

# Minimal requirements
✅ MySQL/PostgreSQL database (with metadata_aspect_v2 table)
✅ Elasticsearch/OpenSearch cluster
❌ DataHub GMS/Maui services - NOT needed
❌ Kafka cluster - NOT needed
❌ Frontend services - NOT needed

🔧 Execution Methods

LoadIndices can be executed via:

  1. Gradle Task (Recommended)
# From datahub-upgrade directory
./gradlew runLoadIndices

# With custom thread count
./gradlew runLoadIndices -PesThreadCount=6
  1. IDE Execution: Run UpgradeTask.main() with LoadIndices arguments

  2. Standalone JAR: Build and run datahub-upgrade JAR independently


LoadIndices Configuration Options

🔄 Performance & Throttling

ArgumentDescriptionDefaultExample
batchSizeNumber of aspects per batch for processing10000-a batchSize=5000
limitMaximum number of aspects to processInteger.MAX_VALUE (no limit)-a limit=50000

📅 Time Filtering

ArgumentDescriptionExample
gePitEpochMsOnly process aspects created after this timestamp (milliseconds)-a gePitEpochMs=1609459200000
lePitEpochMsOnly process aspects created before this timestamp (milliseconds)-a lePitEpochMs=1640995200000

🔍 Content Filtering

ArgumentDescriptionExample
urnLikeSQL LIKE pattern to filter URNs-a urnLike=urn:li:dataset:%
aspectNamesComma-separated list of aspect names to process-a aspectNames=ownership,schemaMetadata
lastUrnResume processing from this URN (inclusive)-a lastUrn=urn:li:dataset:my-dataset

⚙️ System Configuration

Environment VariableDescriptionDefaultExample
ELASTICSEARCH_THREAD_COUNTNumber of I/O threads for BulkProcessor2 (app config), 4 (Gradle task)ELASTICSEARCH_THREAD_COUNT=4
ES_BULK_ASYNCEnable asynchronous bulk operationstrueES_BULK_ASYNC=true
ES_BULK_REQUESTS_LIMITMaximum bulk requests per buffer10000ES_BULK_REQUESTS_LIMIT=15000
ES_BULK_FLUSH_PERIODBulk flush interval in seconds300 (5 minutes)ES_BULK_FLUSH_PERIOD=300

Running LoadIndices

🐳 Docker Compose

If you're using Docker Compose with the DataHub source repository:

# Basic LoadIndices execution
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices

# LoadIndices with performance tuning
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices \
-a batchSize=15000 \
-a limit=100000

🎯 Gradle Task (Development)

For development and testing environments:

# Run LoadIndices with default settings
./gradlew :datahub-upgrade:runLoadIndices

# Run with custom thread count and batch size
./gradlew :datahub-upgrade:runLoadIndices \
-PesThreadCount=4 \
-PbatchSize=15000 \
-Plimit=50000

The Gradle task supports these parameters:

  • esThreadCount: Set ELASTICSEARCH_THREAD_COUNT (default: 4)
  • batchSize: Override batch size (default: 10000)
  • limit: Set processing limit
  • urnLike: Filter by URN pattern
  • aspectNames: Filter by aspect names
  • lePitEpochMs: Process records created before this timestamp
  • gePitEpochMs: Process records created after this timestamp
  • lastUrn: Resume processing from this URN (inclusive)

🐳 Docker Environment Variables

Configure LoadIndices through Docker environment:

# Target specific entity types
docker run --rm datahub-upgrade \
-u LoadIndices \
-a urnLike=urn:li:dataset:% \
-a batchSize=20000

# Process specific aspects only
docker run --rm datahub-upgrade \
-u LoadIndices \
-a aspectNames=ownership,status,schemaMetadata \
-a batchSize=15000

# Time-based filtering
docker run --rm datahub-upgrade \
-u LoadIndices \
-a gePitEpochMs=1640995200000 \
-a limit=50000

# Resume from a specific URN
docker run --rm datahub-upgrade \
-u LoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000

🔄 Resume Functionality

LoadIndices supports resuming from a specific URN when processing is interrupted:

Resume from Last Processed URN

When LoadIndices runs, it logs the last URN processed in each batch:

Batch completed - Last URN processed: urn:li:dataset:my-dataset
Processed 10000 aspects - 150.2 aspects/sec - Last URN: urn:li:dataset:my-dataset

To resume from where you left off:

# Resume from the last URN that was successfully processed
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000

Resume Best Practices

  • Use the exact URN: Copy the URN exactly as logged (including any URL encoding)
  • Inclusive processing: The lastUrn parameter processes from the specified URN onwards (inclusive)
  • Monitor progress: Watch the logs for the "Last URN processed" messages to track progress
  • Batch boundaries: Resume works at the URN level, not batch level - some aspects may be reprocessed

Example Resume Workflow

# 1. Start initial processing
./gradlew :datahub-upgrade:runLoadIndices -a batchSize=5000

# 2. If interrupted, check logs for last URN:
# "Batch completed - Last URN processed: urn:li:dataset:my-dataset"

# 3. Resume from that URN
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=5000

Performance Optimization

🚀 Elasticsearch/OpenSearch Configuration

Bulk Processing Tuning

# Optimize bulk settings for LoadIndices
export ES_BULK_REQUESTS_LIMIT=15000
export ES_BULK_FLUSH_PERIOD=10
export ES_BULK_ASYNC=true
export ELASTICSEARCH_THREAD_COUNT=4

Connection Pool Optimization

LoadIndices automatically configures connection pooling based on thread count:

# datahub-upgrade/build.gradle configuration
environment "ELASTICSEARCH_THREAD_COUNT", "4" # Auto-adjusts maxConnectionsPerRoute

Comparison with RestoreIndices

Understanding when to use LoadIndices vs RestoreIndices is crucial for optimal performance and data consistency.

🎯 Purpose & Design Philosophy

AspectRestoreIndicesLoadIndices
Primary PurposeData consistency & correctnessSpeed & throughput
Design PhilosophyEvent-driven precisionPerformance optimization
Consistency ModelFull consistency guaranteeSpeed-optimized trade-offs
Use CaseProduction recoveryBulk migrations & initial loads

📊 Technical Comparison

FeatureRestoreIndicesLoadIndices
Database IsolationREAD_COMMITTEDREAD_UNCOMMITTED
MCL Events✅ Full MCL pipeline❌ Bypasses MCL entirely
Graph Updates (Elasticsearch)✅ Updated✅ Updated
Graph Updates (Neo4j)✅ Updated❌ Missing
Database SupportMySQL, PostgreSQL, CassandraMySQL, PostgreSQL only
PerformanceSlower, saferFaster, optimized
Real-time Consistency✅ Immediate❌ Delayed until refresh
Concurrency Safety✅ Safe❌ Not safe

🚀 When to Use Each Tool

Use RestoreIndices For:

  • Production recovery from inconsistencies
  • Neo4j-based graph storage deployments
  • Cassandra-based metadata storage
  • Active ingestion pipelines running
  • MCL-dependent systems requiring event notifications
  • Precise event replay scenarios

Use LoadIndices For:

  • Fresh deployments with empty clusters
  • Bulk migrations during maintenance windows
  • MySQL/PostgreSQL + Elasticsearch configurations
  • Offline scenarios with no concurrent writes
  • Development/testing environments
  • Performance-critical initial data loads