Load Indices: High-Performance Bulk Index Loading
LoadIndices is a high-performance upgrade task designed for bulk loading metadata aspects directly from the database into Elasticsearch/OpenSearch indices. Unlike RestoreIndices which focuses on correctness and consistency, LoadIndices is optimized for speed and throughput during initial deployments or large-scale data migrations.
Overview
LoadIndices bypasses the standard event-driven processing pipeline to directly stream data from the metadata_aspect_v2
table into search indices using optimized bulk operations. This approach provides significant performance improvements for large installations while making specific architectural trade-offs that prioritize speed over consistency.
🚨 CRITICAL WARNING: LoadIndices is designed for specific use cases only and should NEVER be used in production environments with active concurrent writes, MCL-dependent systems, or real-time consistency requirements. See Performance Trade-offs & Implications for complete details.
Key Features
- 🚀 High Performance: Direct streaming from database with optimized bulk operations
- ⚡ Fast Bulk Loading: Optimized for speed over consistency during initial loads
- 🔧 Refresh Management: Automatically disables refresh intervals during loading for optimal performance
- 📊 Comprehensive Monitoring: Real-time progress reporting and performance metrics
- ⚙️ Configurable Isolation: Utilizes READ_UNCOMMITTED transactions for faster scanning
Performance Trade-offs & Implications
⚠️ Critical Understanding: LoadIndices prioritizes performance over consistency by making several architectural trade-offs. Understanding these implications is crucial before using LoadIndices in production environments.
🚨 Key Trade-offs Made
1. BYPASS Kafka/MCL Event Pipeline
- What: LoadIndices completely bypasses Kafka MCL (Metadata Change Log) topics that normally propagate all metadata changes
- Architecture:
Database → LoadIndices → Elasticsearch
vs normal flow ofDatabase → Kafka MCL → Multiple Consumers → Elasticsearch/Graph/etc
- Impact: No MCL events published - downstream systems lose visibility into metadata changes
- Critical Implication:
- MCL-Dependent Analytics: Won't have audit trail of metadata changes
- Integrations: External systems won't be notified of changes
- Custom MCL Consumers: Any custom consumers will miss these events entirely
- ✅ Graph Service: WILL be updated (UpdateIndicesService handles graph indices) ⚠️ Only when Elasticsearch is used for graph storage
2. BROKEN DataHub Event Architecture
- What: Violates DataHub's core design principle that "all metadata changes flow through Kafka MCL"
- Normal Flow:
Metadata Change → MCL Event → Kafka → Multiple Consumers → Various Stores
- LoadIndices Flow:
Metadata Change → LoadIndices → Direct ES Write
(Skips Kafka entirely)
3. READ_UNCOMMITTED Isolation
- What: Uses
TxIsolation.READ_UNCOMMITTED
for faster database scanning - Impact: May read uncommitted changes or dirty reads from concurrent transactions
- Implication: Data consistency not guaranteed during active writes to database
4. Refresh Interval Manipulation
- What: Automatically disables refresh intervals during bulk operations
- Impact: Recent updates may not be immediately searchable
- Implication: Users won't see real-time updates in search until refresh intervals are restored
5. No Write Concurrency Controls
- What: No coordination with concurrent Elasticsearch writes from live ingestion
- Impact: Potential conflicts with active ingestion pipelines
- Implication: Concurrent writes may cause data inconsistency or operation failures
⚠️ When NOT to Use LoadIndices
❌ DO NOT use LoadIndices if you have:
- Active ingestion pipelines writing to Elasticsearch simultaneously
- MCL-dependent systems that need event notifications
- Neo4j-based graph storage (graph updates will be missing)
- Real-time search requirements during the loading process
- Production traffic that requires immediate search consistency
✅ When LoadIndices is Appropriate
✅ Safe to use LoadIndices when:
- Fresh deployment with empty Elasticsearch cluster
- Offline migration with no concurrent users
- Standalone indexing without DataHub services running
- Read-only replica environments with no active writes
- Development/testing environments
- Disaster recovery scenarios where faster restoration is prioritized
- Independent cluster setup where you need to populate indices before services start
- Elasticsearch-based graph storage (graph gets updated automatically)
🔒 Safety Requirements
Before using LoadIndices in any environment:
Verify Minimal Infrastructure:
- Database: MySQL/PostgreSQL with
metadata_aspect_v2
table accessible (via Ebean ORM) - Elasticsearch: Running cluster accessible via HTTP/HTTPS
- DataHub Services: ✅ NOT required - LoadIndices can run independently
- ⚠️ Check Graph Storage: Verify if using Elasticsearch-based graph storage
- ⚠️ Check Database Type: Confirm NOT using Cassandra (not supported)
- Database: MySQL/PostgreSQL with
Stop All Ingestion (if DataHub is running):
# Disable all Kafka consumers
kubectl scale deployment --replicas=0 datahub-mae-consumer
kubectl scale deployment --replicas=0 datahub-mce-consumer
kubectl scale deployment --replicas=0 datahub-gmsCheck Database Configuration:
# Check if using Cassandra (LoadIndices NOT supported)
grep -i cassandra /path/to/datahub/docker/docker-compose.yml
# Verify MySQL/PostgreSQL database is configured
grep -E "mysql\|postgres" /path/to/datahub/docker/docker-compose.yml
# ⚠️ If Cassandra detected, LoadIndices is NOT available
# Must use RestoreIndices insteadCheck Graph Storage Configuration:
# Check if using Neo4j (graph updates will be MISSING)
grep -r "neo4j" /path/to/datahub/docker/docker-compose.yml
# Check DataHub configuration for graph service selection
grep -i "graph.*elasticsearch\|neo4j" /path/to/datahub/conf/application.yml
# ⚠️ If Neo4j is detected, LoadIndices will NOT update graphVerify No Concurrent Writes:
# Check for active Elasticsearch indexing
curl -s "localhost:9200/_nodes/stats" | grep "index_current"
# Should show "index_current": 0Index Clean State:
# Ensure clean indexing state
curl -s "localhost:9200/_nodes/stats" | grep -E "refresh.*active"Coordinate with Operations:
- Maintenance window scheduling
- User notification of search unavailability
- Monitoring of downstream system dependencies
📊 Consistency Guarantees
Level | LoadIndices | RestoreIndices |
---|---|---|
URN-level Ordering | ✅ Guaranteed | ✅ Guaranteed |
Real-time Searchability | ❌ Delayed | ✅ Immediate |
Graph Service Updates (ES-based) | ✅ Updated | ✅ Updated |
Graph Service Updates (Neo4j-based) | ❌ Missing | ✅ Updated |
MCL Event Propagation | ❌ Bypassed | ✅ Full propagation |
Concurrent Write Safety | ❌ Not safe | ✅ Safe |
2. Restore Normal Operations
- Re-enable ingestion pipelines gradually
- Monitor Elasticsearch for conflicts
- Validate downstream systems are synchronized
3. Emergency Rollback Plan
# If issues arise, prepare rollback:
# 1. Stop LoadIndices immediately
# 2. Restore from backup indices
# 3. Re-run with RestoreIndices for correctness
How LoadIndices Works
LoadIndices operates as an upgrade task that can run independently without requiring DataHub services to be running. It consists of two main steps:
- BuildIndicesStep: Creates and configures Elasticsearch indices (creates indices if they don't exist)
- LoadIndicesStep: Streams aspects from database and bulk loads them into indices
🔧 Independent Operation Mode
Key Advantage: LoadIndices only requires:
- ✅ MySQL/PostgreSQL source database (via Ebean ORM)
- ✅ Elasticsearch/OpenSearch destination cluster
- ❌ No DataHub services (maui, frontend, etc.) required
- ❌ Cassandra: ⚠️ NOT supported (Ebean doesn't support Cassandra)
This enables offline bulk operations during maintenance windows or initial deployments where DataHub infrastructure is being set up incrementally.
Index Creation: The BuildIndicesStep automatically creates all required Elasticsearch indices based on IndexConvention
patterns, so empty Elasticsearch clusters are fully supported.
Architecture Flow
graph TD
A[LoadIndices Upgrade] --> B[BuildIndicesStep]
B --> C[Create/Configure Indices]
C --> D[LoadIndicesStep]
D --> E[Disable Refresh Intervals]
E --> F[Stream Aspects from DB]
F --> G[Batch Processing]
G --> H[Convert to MCL Events]
H --> I[Bulk Write to ES]
I --> J[Restore Refresh Intervals]
Key Differences from RestoreIndices
Aspect | RestoreIndices | LoadIndices |
---|---|---|
Purpose | Correctness & consistency | Speed & throughput |
Processing | Event-driven via MCL events | Direct bulk operations |
Isolation | READ_COMMITTED | READ_UNCOMMITTED |
Refresh Management | Static configuration | Dynamic disable/restore |
Performance Focus | Accurate replay | Maximal speed |
Use Case | Recovery from inconsistencies | Initial loads & migrations |
Deployment & Execution
🚀 Standalone Deployment Advantage
Key Benefit: LoadIndices can run with minimal infrastructure without requiring DataHub services to be running:
# Minimal requirements
✅ MySQL/PostgreSQL database (with metadata_aspect_v2 table)
✅ Elasticsearch/OpenSearch cluster
❌ DataHub GMS/Maui services - NOT needed
❌ Kafka cluster - NOT needed
❌ Frontend services - NOT needed
🔧 Execution Methods
LoadIndices can be executed via:
- Gradle Task (Recommended)
# From datahub-upgrade directory
./gradlew runLoadIndices
# With custom thread count
./gradlew runLoadIndices -PesThreadCount=6
IDE Execution: Run
UpgradeTask.main()
with LoadIndices argumentsStandalone JAR: Build and run datahub-upgrade JAR independently
LoadIndices Configuration Options
🔄 Performance & Throttling
Argument | Description | Default | Example |
---|---|---|---|
batchSize | Number of aspects per batch for processing | 10000 | -a batchSize=5000 |
limit | Maximum number of aspects to process | Integer.MAX_VALUE (no limit) | -a limit=50000 |
📅 Time Filtering
Argument | Description | Example |
---|---|---|
gePitEpochMs | Only process aspects created after this timestamp (milliseconds) | -a gePitEpochMs=1609459200000 |
lePitEpochMs | Only process aspects created before this timestamp (milliseconds) | -a lePitEpochMs=1640995200000 |
🔍 Content Filtering
Argument | Description | Example |
---|---|---|
urnLike | SQL LIKE pattern to filter URNs | -a urnLike=urn:li:dataset:% |
aspectNames | Comma-separated list of aspect names to process | -a aspectNames=ownership,schemaMetadata |
lastUrn | Resume processing from this URN (inclusive) | -a lastUrn=urn:li:dataset:my-dataset |
⚙️ System Configuration
Environment Variable | Description | Default | Example |
---|---|---|---|
ELASTICSEARCH_THREAD_COUNT | Number of I/O threads for BulkProcessor | 2 (app config), 4 (Gradle task) | ELASTICSEARCH_THREAD_COUNT=4 |
ES_BULK_ASYNC | Enable asynchronous bulk operations | true | ES_BULK_ASYNC=true |
ES_BULK_REQUESTS_LIMIT | Maximum bulk requests per buffer | 10000 | ES_BULK_REQUESTS_LIMIT=15000 |
ES_BULK_FLUSH_PERIOD | Bulk flush interval in seconds | 300 (5 minutes) | ES_BULK_FLUSH_PERIOD=300 |
Running LoadIndices
🐳 Docker Compose
If you're using Docker Compose with the DataHub source repository:
# Basic LoadIndices execution
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices
# LoadIndices with performance tuning
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices \
-a batchSize=15000 \
-a limit=100000
🎯 Gradle Task (Development)
For development and testing environments:
# Run LoadIndices with default settings
./gradlew :datahub-upgrade:runLoadIndices
# Run with custom thread count and batch size
./gradlew :datahub-upgrade:runLoadIndices \
-PesThreadCount=4 \
-PbatchSize=15000 \
-Plimit=50000
The Gradle task supports these parameters:
esThreadCount
: SetELASTICSEARCH_THREAD_COUNT
(default:4
)batchSize
: Override batch size (default:10000
)limit
: Set processing limiturnLike
: Filter by URN patternaspectNames
: Filter by aspect nameslePitEpochMs
: Process records created before this timestampgePitEpochMs
: Process records created after this timestamplastUrn
: Resume processing from this URN (inclusive)
🐳 Docker Environment Variables
Configure LoadIndices through Docker environment:
# Target specific entity types
docker run --rm datahub-upgrade \
-u LoadIndices \
-a urnLike=urn:li:dataset:% \
-a batchSize=20000
# Process specific aspects only
docker run --rm datahub-upgrade \
-u LoadIndices \
-a aspectNames=ownership,status,schemaMetadata \
-a batchSize=15000
# Time-based filtering
docker run --rm datahub-upgrade \
-u LoadIndices \
-a gePitEpochMs=1640995200000 \
-a limit=50000
# Resume from a specific URN
docker run --rm datahub-upgrade \
-u LoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000
🔄 Resume Functionality
LoadIndices supports resuming from a specific URN when processing is interrupted:
Resume from Last Processed URN
When LoadIndices runs, it logs the last URN processed in each batch:
Batch completed - Last URN processed: urn:li:dataset:my-dataset
Processed 10000 aspects - 150.2 aspects/sec - Last URN: urn:li:dataset:my-dataset
To resume from where you left off:
# Resume from the last URN that was successfully processed
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000
Resume Best Practices
- Use the exact URN: Copy the URN exactly as logged (including any URL encoding)
- Inclusive processing: The
lastUrn
parameter processes from the specified URN onwards (inclusive) - Monitor progress: Watch the logs for the "Last URN processed" messages to track progress
- Batch boundaries: Resume works at the URN level, not batch level - some aspects may be reprocessed
Example Resume Workflow
# 1. Start initial processing
./gradlew :datahub-upgrade:runLoadIndices -a batchSize=5000
# 2. If interrupted, check logs for last URN:
# "Batch completed - Last URN processed: urn:li:dataset:my-dataset"
# 3. Resume from that URN
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=5000
Performance Optimization
🚀 Elasticsearch/OpenSearch Configuration
Bulk Processing Tuning
# Optimize bulk settings for LoadIndices
export ES_BULK_REQUESTS_LIMIT=15000
export ES_BULK_FLUSH_PERIOD=10
export ES_BULK_ASYNC=true
export ELASTICSEARCH_THREAD_COUNT=4
Connection Pool Optimization
LoadIndices automatically configures connection pooling based on thread count:
# datahub-upgrade/build.gradle configuration
environment "ELASTICSEARCH_THREAD_COUNT", "4" # Auto-adjusts maxConnectionsPerRoute
Comparison with RestoreIndices
Understanding when to use LoadIndices vs RestoreIndices is crucial for optimal performance and data consistency.
🎯 Purpose & Design Philosophy
Aspect | RestoreIndices | LoadIndices |
---|---|---|
Primary Purpose | Data consistency & correctness | Speed & throughput |
Design Philosophy | Event-driven precision | Performance optimization |
Consistency Model | Full consistency guarantee | Speed-optimized trade-offs |
Use Case | Production recovery | Bulk migrations & initial loads |
📊 Technical Comparison
Feature | RestoreIndices | LoadIndices |
---|---|---|
Database Isolation | READ_COMMITTED | READ_UNCOMMITTED |
MCL Events | ✅ Full MCL pipeline | ❌ Bypasses MCL entirely |
Graph Updates (Elasticsearch) | ✅ Updated | ✅ Updated |
Graph Updates (Neo4j) | ✅ Updated | ❌ Missing |
Database Support | MySQL, PostgreSQL, Cassandra | MySQL, PostgreSQL only |
Performance | Slower, safer | Faster, optimized |
Real-time Consistency | ✅ Immediate | ❌ Delayed until refresh |
Concurrency Safety | ✅ Safe | ❌ Not safe |
🚀 When to Use Each Tool
✅ Use RestoreIndices For:
- Production recovery from inconsistencies
- Neo4j-based graph storage deployments
- Cassandra-based metadata storage
- Active ingestion pipelines running
- MCL-dependent systems requiring event notifications
- Precise event replay scenarios
✅ Use LoadIndices For:
- Fresh deployments with empty clusters
- Bulk migrations during maintenance windows
- MySQL/PostgreSQL + Elasticsearch configurations
- Offline scenarios with no concurrent writes
- Development/testing environments
- Performance-critical initial data loads