Upsert Compact Merge Task
Describes the Minion Task that can remove upserted/tombstoned records from Upsert tables, and can also merge compacted segments.
The Upsert Compact Merge Task, also known as the small segment merger, allows you to merge small segments into larger ones for upsert-enabled real-time tables, reducing the total segment count while maintaining storage efficiency.
This task is only supported for REALTIME tables with upsert enabled.
ℹ️ New in Pinot 1.3.0: This task was introduced in version 1.3.0 to address segment proliferation in upsert tables. It works alongside the existing UpsertCompactionTask but focuses on merging multiple small segments rather than compacting individual segments.
⚠️ Important Bug Fix: An issue was identified (#15846) in the initial implementation, which has been fixed via PR #16034 and PR #16086. These fixes are not present in Pinot 1.3.0. If you plan to use this task, make sure to cherry-pick these PRs or wait for Pinot 1.4.0 release.
Task overview
The Upsert Compact Merge Task (small segment merger) addresses the challenge of segment proliferation in upsert tables, where the number of segments can grow continuously over time, leading to operational challenges and performance degradation. While individual segments may be efficiently compacted by the UpsertCompactionTask, the overall segment count continues to increase.
Relationship to UpsertCompactionTask
The Upsert Compact Merge Task complements the existing UpsertCompactionTask:
UpsertCompactionTask: Compacts individual segments by removing invalid records within each segment
UpsertCompactMergeTask: Merges multiple small segments into larger ones while simultaneously compacting each segment during the merge process, reducing both total segment count and invalid records
Both tasks can run independently and are designed to work together for optimal upsert table management.
Over time, upsert tables can accumulate thousands of segments, which creates several challenges:
ZooKeeper limitations: Each segment stores metadata in ZooKeeper, and high segment counts can cause significant ZK latencies
Server restart/replacement risks: More segments mean longer restart times and higher probability of loading failures
Operational complexity: Table rebalancing and metadata operations become increasingly strained
Query performance degradation: Higher disk I/O demands due to processing more segments simultaneously
How it works
The Upsert Compact Merge Task uses the Minion Task Framework and consists of Generator and Executor classes:
UpsertCompactMergeTaskGenerator: Invoked by the Pinot Controller according to the specified schedule. It:
Retrieves segment metadata for the table's completed segments
Retrieves validDocIds from the servers hosting the completed segments
Selects segments for merging based on the configured strategy (continuous time-based selection)
Generates tasks to merge multiple segments into fewer, larger segments
UpsertCompactMergeTaskExecutor: Invoked by a Pinot Minion. It:
Downloads the segments specified in the task config from deep store or servers
Retrieves validDocIds snapshots from the servers
Uses a CompactedRecordReader to merge records from multiple segments
Creates new merged segments using UploadedRealtimeSegmentNameGenerator with "compacted" prefix
Uploads the new segments back to the controller
Segment selection strategy
The task uses a continuous time-based selection approach:
Starts with the oldest segment and processes segments in chronological order
Groups continuous segments based on their invalid document ratios and merging feasibility
Continues selecting segments as long as the total valid records in chosen segments plus the next segment ≤
maxNumRecordsPerSegmentRespects the
maxNumSegmentsPerTasklimit to ensure reasonable task sizesMaintains time-order continuity in segment selection
Once we have created multiple groups of segments that can be merged, we select the one with highest gain for each partitions for merging.
Size-based selection (when outputSegmentMaxSize is configured):
When
outputSegmentMaxSizeis specified, the task also considers segment size limitsSegments are selected to fit within both record count and size constraints
Segment cleanup approach
The task uses a safe two-phase approach for segment management:
Phase 1: Upload new merged segments without immediately deleting the original segments
Phase 2: Post one segment commit cycle when validDocIDSnapshot is updated, identify original segments with
validDocIDCount = 0and delete them
This approach leverages ZooKeeper metadata and enableSnapshot runs to safely clean up old segments while avoiding data inconsistencies.
Implementation details
Segment naming: New merged segments are created with the "compacted" prefix using the UploadedRealtimeSegmentNameGenerator.
Size-based compaction: When outputSegmentMaxSize is configured, the task performs both record-count-based and size-based segment selection.
Batch processing: To optimize performance, the task fetches validDocIds from servers in batches controlled by the numSegmentsBatchPerServerRequest parameter.
CRC validation: The task performs CRC (Cyclic Redundancy Check) validation to ensure data integrity during segment processing and merging operations.
Partition constraints: All segments selected for merging must belong to the same partition. The task validates this constraint before proceeding with the merge operation.
validDocIds snapshot requirement: The task requires validDocIds to be available from the servers hosting the segments. This depends on the enableSnapshot configuration being active.
Configuration
Start a Pinot Minion.
Set up your REALTIME table with upsert enabled. Add "UpsertCompactMergeTask" in the task configs:
"tableName": "upsert_enabled_REALTIME",
"tableType": "REALTIME",
"task": {
"taskTypeConfigsMap": {
"UpsertCompactMergeTask": {
"schedule": "0 */6 * ? * *",
"bufferTimePeriod": "2d",
"maxNumRecordsPerSegment": "5000000",
"maxNumRecordsPerTask": "50000000",
"maxNumSegmentsPerTask": "10",
"outputSegmentMaxSize": "200MB",
"numSegmentsBatchPerServerRequest": "500"
}
}
}Ensure your upsert table has snapshot enabled:
"upsertConfig": {
"mode": "FULL", // or "PARTIAL"
"enableSnapshot": true
}Enable PinotTaskManager (disabled by default) by adding the
controller.taskproperties below to your controller conf, and then restart the controller (required).
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=1hConfiguration Properties
schedule
Cron expression for task scheduling. Less frequent scheduling (e.g., every 6 hours) is recommended as this is a resource-intensive operation
None (required)
bufferTimePeriod
Minimum time that must elapse since segment completion (endTime) before it becomes eligible for merging
2d
maxNumRecordsPerSegment
Maximum number of valid rows to include in a merged segment. Controls the size of output segments
5000000
maxNumRecordsPerTask
Maximum number of records to process in a single task. Prevents overly large tasks
50000000
maxNumSegmentsPerTask
Maximum number of segments to merge in a single task. Prevents overly large tasks
10
outputSegmentMaxSize
Maximum size of output segments in bytes. When specified, enables size-based segment merging in addition to record-count-based merging. Accepts formats like '200MB', '1GB', etc.
None (size-based disabled by default)
numSegmentsBatchPerServerRequest
Number of segments to query in one batch when fetching validDocIds from servers
500
Prerequisites
Pinot version: Requires Pinot version 1.3.0 or later
Upsert enabled: Table must have upsert configuration with
enableSnapshot: trueCompleted segments: Task only operates on completed (non-consuming) segments with validDocIds available
Same partition: All segments being merged must belong to the same partition to maintain upsert consistency
Minion cluster: At least one Pinot Minion must be running and accessible to the controller
Table validation: The task validates table configuration during execution, ensuring upsert is enabled and snapshot functionality is available
Example
Consider a table with the following characteristics:
16 Kafka partitions producing 4000 messages/second
Upsert compaction running every 5 minutes with aggressive settings
Segments are typically small (< 500K valid records) after compaction
Before UpsertCompactMergeTask:
Table accumulates ~175 new segments per day
After 6 months: ~31,500 segments total
High ZooKeeper load and slow server restarts
After enabling UpsertCompactMergeTask with the above configuration:
Task runs every 6 hours, merging 5-10 small segments into 1 larger segment
Segment count growth rate reduced by 80-90%
Improved server restart times and operational stability
Maintained query performance while reducing segment management overhead
Monitoring and Troubleshooting
Task Progress and Monitoring
Monitor task execution through the following methods:
Controller API: Use the Pinot Controller API to check task status:
GET /tasks/{taskType}- List all tasks for UpsertCompactMergeTaskGET /tasks/{taskType}/{taskId}- Get specific task details and status
Task State Tracking: Tasks progress through these states:
IN_PROGRESS- Task is currently executing on a MinionCOMPLETED- Task finished successfullyFAILED- Task encountered an errorTIMEOUT- Task exceeded configured timeout
Progress Notifications: The task provides progress updates during execution showing:
Number of segments being processed
Current processing stage (downloading, merging, uploading)
Estimated completion time
Common Issues
Task not generating:
Verify
enableSnapshot: truein upsert configCheck that segments meet the selection criteria (small enough and old enough based on
bufferTimePeriod)Ensure completed segments exist for the table with validDocIds available
Verify table configuration passes validation (upsert enabled, correct table type)
Segments not being cleaned up:
Verify
enableSnapshotis running between task executionsCheck ZooKeeper metadata for merge timestamps
Original segments should be cleaned up in subsequent task runs when validDocIDCount = 0
Ensure the two-phase cleanup process is completing properly
Task failures:
Check Minion logs for CRC validation errors
Verify all segments belong to the same partition
Ensure sufficient resources on Minion nodes for segment processing
Check deep store connectivity for segment download/upload operations
Performance impact:
Consider reducing
maxNumSegmentsPerTaskormaxNumRecordsPerTaskif tasks are taking too longAdjust
bufferTimePeriodto balance segment count reduction with task frequencyUse
outputSegmentMaxSizeto control output segment sizes when neededTune
numSegmentsBatchPerServerRequestfor optimal validDocIds fetching performanceMonitor server resources during task execution
Configuration issues:
Ensure all numeric values are properly formatted (some accept string representations)
Verify cron schedule format is correct
Check that outputSegmentMaxSize uses proper size format (e.g., "200MB", "1GB")
Limitations
Only supports upsert-enabled REALTIME tables
Requires
enableSnapshot: truein upsert configurationCannot merge segments across different partitions
Original segments are not immediately deleted (cleaned up in subsequent runs)
Resource-intensive operation that should be scheduled appropriately
Best Practices
Scheduling: Use less frequent scheduling (6-12 hours) to balance effectiveness with resource usage
Sizing: Configure
maxNumRecordsPerSegmentand optionallyoutputSegmentMaxSizebased on your query patterns and server resourcesMonitoring: Track segment count trends and task execution metrics
Coordination: Coordinate with UpsertCompactionTask scheduling to avoid resource conflicts
Testing: Test the configuration on a smaller table first to understand the impact
ℹ️ Design doc: The detailed technical design for this feature is available in the RFC document provided with this implementation.
Last updated
Was this helpful?

