Minion merge rollup task

The Minion merge rollup task lets you merge small segments into larger ones. This helps to improve query performance and disk storage by aggregating data at a courser granularity to reduce the data processed during query execution.

This task is supported for the following use cases:

  • OFFLINE tables, APPEND only

  • REALTIME tables, without upsert or dedup

Task overview

The Minion merge rollup task merges all segments of segment K time buckets (default 1) from the oldest to the newest records. After processing, the segments are time aligned by bucket.

For example, if the table has hourly records starting with 11-01-2021T13:56:00, and is configured to use bucket time of 1 day, the Merge rollup task merges the records for the window \[11-01-2021, 11-02-2021) in the first run, followed by \[11-02-2021, 11-03-2021) in the next run, followed by \[11-03-2021, 11-04-2021) in the next run, and so on.

Multi-level merge is supported to apply different compressions for different time ranges. For example, for 24 hours you can retain hourly records of data, rollup data from 1 week ago to 1 day ago into daily granularity, and rollup data older than a week to monthly granularity.

This feature uses the following metadata in Zookeeper:

  • CustomMap of SegmentZKMetadata: Keeps the mapping of { "MergeRollupTask.mergeLevel" : {mergeLevel} }. Indicates that the segment is the result of a merge rollup task. Used to skip time buckets that have all merged segments to avoid reprocessing.

  • MergeRollupTaskMetadata: Stored in the path: MINION\_TASK\_METADATA/MergeRollupTask/{tableNameWithType}. This metadata keeps the mapping from mergeLevel to waterMarkMs. Used to determine when to schedule the next merge rollup task run. The watermark is the start time of current processing buckets. All data before the watermark is merged and time aligned.

  • Merge rollup task uses SegmentReplacementProtocol to achieve broker-level atomic swap between the input segments and result segments. Broker refers to the SegmentLineage metadata to determine which segments should be routed.

This feature uses the pinot-minions and the Helix Task Executor framework, which consists of 2 parts:

  • MergeRollupTaskGenerator: The minion task scheduler, which schedules tasks of type MergeRollupTask. This task is scheduled by the controller periodic task, PinotTaskManager. For each mergeLevel from the highest to the lowest granularity (hourly -> daily -> monthly):

    • Time buckets calculation: Starting from the watermark, calculate up to k time buckets that has un-merged segments at best effort. Bump up the watermark if necessary.

    • Segments scheduling: For each time bucket, select all overlapping segments and create minion tasks.

  • MergeRollupTaskExecutor: The minion task executor, which executes the MergeRollupTask generated by the task generator. These tasks are run by the pinot-minion component.

    • Process segments: Download input segments as indicated in the task config. The segment processor framework partitions the data based on time value and rollup if configured.

    • Upload segments: Upload output segments with the segment replacement protocol. Once completed, the input segments are ready to be deleted and cleaned up by the retention manager.

Configure the Minion merge rollup task

  1. Start a pinot-minion.

  2. Set up your OFFLINE table. Add "MergeRollupTask" in the task configs, like this:

"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1day.mergeType": "concat",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d"  
      }
    }
  }
  1. Enable PinotTaskManager (disabled by default) by adding the controller.task properties below to your controller conf, and then restart the controller (required).

controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=1h  #Specify the frequency (more frequent is better, as extra tasks aren't scheduled unless required).
  1. (Optional) Add the following advanced configurations as needed:

"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1hour.mergeType": "rollup",
        "1hour.bucketTimePeriod": "1h",
        "1hour.bufferTimePeriod": "3h",
        "1hour.maxNumRecordsPerSegment": "1000000",
        "1hour.maxNumRecordsPerTask": "5000000",
        "1hour.maxNumParallelBuckets": "5",
        "1day.mergeType": "rollup",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d",
        "1day.roundBucketTimePeriod": "1d",
        "1day.maxNumRecordsPerSegment": "1000000",
        "1day.maxNumRecordsPerTask": "5000000",
        "metricColA.aggregationType": "sum",
        "metricColB.aggregationType": "max"
      }
    }
  }

For detail about these advanced configurations, see the following table:

Metrics

mergeRollupTaskDelayInNumBuckets.{tableNameWithType}.{mergeLevel}

This metric keeps track of the task delay in the number of time buckets. For example, if we see this number is 7, and the merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of delay. Useful to monitor if the merge task is stuck in production.