# Minion Merge Rollup Task

The Minion merge rollup task lets you merge small segments into larger ones. This helps to improve query performance and disk storage by aggregating data at a courser granularity to reduce the data processed during query execution.

This task is supported for the following use cases:

* OFFLINE tables, APPEND only
* REALTIME tables, without upsert or dedup

### Task overview

The Minion merge rollup task merges all segments of segment K `time buckets` (default 1) from the oldest to the newest records. After processing, the segments are time aligned by bucket.

For example, if the table has hourly records starting with `11-01-2021T13:56:00`, and is configured to use bucket time of 1 day, the Merge rollup task merges the records for the window `\[11-01-2021, 11-02-2021)` in the first run, followed by `\[11-02-2021, 11-03-2021)` in the next run, followed by `\[11-03-2021, 11-04-2021)` in the next run, and so on.

Multi-level merge is supported to apply different compressions for different time ranges. For example, for 24 hours you can retain hourly records of data, rollup data from 1 week ago to 1 day ago into daily granularity, and rollup data older than a week to monthly granularity.

This feature uses the following metadata in Zookeeper:

* **CustomMap of SegmentZKMetadata**: Keeps the mapping of `{ "MergeRollupTask.mergeLevel" : {mergeLevel} }`. Indicates that the segment is the result of a merge rollup task. Used to skip time buckets that have all merged segments to avoid reprocessing.
* **MergeRollupTaskMetadata**: Stored in the path: `MINION\_TASK\_METADATA/MergeRollupTask/{tableNameWithType}`. This metadata keeps the mapping from `mergeLevel` to `waterMarkMs`. Used to determine when to schedule the next merge rollup task run. The watermark is the start time of current processing buckets. All data before the watermark is merged and time aligned.
* Merge rollup task uses `SegmentReplacementProtocol` to achieve broker-level atomic swap between the input segments and result segments. Broker refers to the `SegmentLineage` metadata to determine which segments should be routed.

This feature uses the pinot-minions and the Helix Task Executor framework, which consists of 2 parts:

* **MergeRollupTaskGenerator**: The minion task scheduler, which schedules tasks of type `MergeRollupTask`. This task is scheduled by the controller periodic task, `PinotTaskManager`. For each `mergeLevel` from the highest to the lowest granularity (hourly -> daily -> monthly):
  * **Time buckets calculation**: Starting from the watermark, calculate up to k time buckets that has un-merged segments at best effort. Bump up the watermark if necessary.
  * **Segments scheduling**: For each time bucket, select all overlapping segments and create minion tasks.
* **MergeRollupTaskExecutor**: The minion task executor, which executes the `MergeRollupTask` generated by the task generator. These tasks are run by the pinot-minion component.
  * **Process segments**: Download input segments as indicated in the task config. The segment processor framework partitions the data based on time value and rollup if configured.
  * **Upload segments**: Upload output segments with the segment replacement protocol. Once completed, the input segments are ready to be deleted and cleaned up by the retention manager.

### Configure the Minion merge rollup task

1. Start a pinot-minion.
2. Set up your OFFLINE table. Add "MergeRollupTask" in the task configs, like this:

```
"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1day.mergeType": "concat",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d"  
      }
    }
  }
```

3. Enable PinotTaskManager (disabled by default) by adding the `controller.task` properties below to your [controller conf](/reference/configuration-reference/controller.md), and then restart the controller (required).

```
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=1h  #Specify the frequency (more frequent is better, as extra tasks aren't scheduled unless required).
```

4. (Optional) Add the following advanced configurations as needed:

```
"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1hour.mergeType": "rollup",
        "1hour.bucketTimePeriod": "1h",
        "1hour.bufferTimePeriod": "3h",
        "1hour.maxNumRecordsPerSegment": "1000000",
        "1hour.maxNumRecordsPerTask": "5000000",
        "1hour.maxNumParallelBuckets": "5",
        "1day.mergeType": "rollup",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d",
        "1day.roundBucketTimePeriod": "1d",
        "1day.maxNumRecordsPerSegment": "1000000",
        "1day.maxNumRecordsPerTask": "5000000",
        "1day.eraseDimensionValues": "a,b",
        "1day.aggregationFunctionParameters.metricColC.samplingProbability": "0.9",
        "metricColA.aggregationType": "sum",
        "metricColB.aggregationType": "max",
        "metricColC.aggregationType": "distinctCountThetaSketch"
        "percentilesCol.aggregationType": "percentileTDigest",
        "aggregationFunctionParameters.percentilesCol.compressionFactor": "200"
      }
    }
  }
```

For detail about these advanced configurations, see the following table:

| Property                                                       | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    | Default                                         |
| -------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------- |
| mergeType                                                      | Allowed values are **concat** - no aggregations **rollup** - perform metrics aggregations across common dimensions + time                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | concat                                          |
| bucketTimePeriod                                               | **Time bucket size**. Adjust this to change the time bucket. E.g. if set to 1h, the output segments will have records in 1 hour range.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         | None                                            |
| bufferTimePeriod                                               | **Buffer time**. Will not schedule tasks unless time bucket is older than this buffer. Configure this according to how late you expect your data. E.g. if your system can emit events later than 3d, set this to 3d to make sure those are included. Note: Once a given time window has been processed, it will never be processed again.                                                                                                                                                                                                                                                                                                                                                                                                                      | None                                            |
| roundBucketTimePeriod                                          | **Round the time value before merging the rows**. This is useful if time column is highly granular than needed, you can rollup the time values (e.g. milliseconds granularity in the original data, but okay with minute level granularity in the application - set to `1m`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    | None                                            |
| {metricName}.aggregationType                                   | **Aggregation function to apply to the metric** for aggregations. Only applicable for `rollup` cases. Allowed values are `sum`, `max`, `min`, `distinctCountHLL`, `distinctCountThetaSketch`, `distinctCountTupleSketch`, `distinctCountCpcSketch`, `distinctCountULL`, `percentileTDigest`, `percentileRawTDigest`                                                                                                                                                                                                                                                                                                                                                                                                                                            | sum                                             |
| maxNumRecordsPerSegment                                        | Control the **number of records you want in a segment generated**. Useful if the time bucket has many records, but you don't want them all in the same segment.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | 5,000,000                                       |
| maxNumRecordsPerTask                                           | Control **single task workload**. Useful to protect minion from overloading by a single task.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  | 50,000,000                                      |
| maxNumParallelBuckets                                          | Control **number of processing buckets per run**. Useful to speed up the task scheduling for bootstrapping. E.g. if set to 10, the task generator will schedule 10 buckets per run.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            | 1                                               |
| eraseDimensionValues                                           | Erase dimension values from a merged segment before rollup to reduce cardinality and increase the degree to which common dimension coordinates are aggregated. This can result in a space saving for some dimensions which are not important in historic data. Dimensions in this list will use the configured `defaultValue`.                                                                                                                                                                                                                                                                                                                                                                                                                                 | None                                            |
| aggregationFunctionParameters.{metricName}.nominalEntries      | Configure the nominal entries for a `Theta` or `Tuple` DataSketch metric, providing more control over how sketches are merged in different time buckets. This can result in a space saving for use cases where historical data does not require high precision.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | `16384`                                         |
| aggregationFunctionParameters.{metricName}.samplingProbability | Configure the sampling probability for a `Theta` sketch metric, providing more control over sampling when merging over different time buckets. This can result in a space saving for use cases where historical data does not require high precision.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          | `1.0` (no sampling)                             |
| aggregationFunctionParameters.{metricName}.lgK                 | Configure the `lgK` value for a `CPC` DataSketch metric, providing more control over how sketches are merged in different time buckets. This can result in a space saving for use cases where historical data does not require high precision.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | `12`                                            |
| aggregationFunctionParameters.{metricName}.compressionFactor   | Configure the compression factor for a `TDigest` sketch metric, providing control over the accuracy-to-size tradeoff when merging TDigest sketches across different time buckets. Lower values result in smaller sketches with lower accuracy.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | `100`                                           |
| retentionExpiryBufferPeriod                                    | **Buffer period to exclude segments nearing retention expiry**. Prevents task generation for segments that are close to being deleted by the RetentionManager, avoiding a race condition where segments may be deleted between task generation (controller) and download (minion). Format: time duration strings like `"1h"`, `"30m"`, `"2d"`. If this buffer is greater than or equal to the table retention, the filter fails open (returns all segments) with a WARN log. **Important for MergeRollupTask**: If all segments in an early time bucket are filtered out due to retention, the watermark advances past them permanently, and these buckets will never be reprocessed. Misconfigured buffer values cannot recover already-skipped time buckets. | None (no buffer, uses exact retention boundary) |
| push.mode                                                      | Push mode as in [Job Spec](/reference/configuration-reference/job-specification.md#ingestion-job-spec) supports `TAR`, `URI` and `METADATA` ingestion. This config is general to the job and not specific to any aggregation granularity.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | TAR                                             |
| output.segment.dir.uri                                         | Path where the merged segments will be stored. This config is general to the job and not specific to any aggregation granularity.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              | Controller Config `data.dir`                    |

### Watermark Impact of Retention Buffer

When using `retentionExpiryBufferPeriod`, be aware of the following watermark behavior:

* **Permanent watermark advancement**: If all segments in an early time bucket are filtered out due to retention expiry, the merge task watermark still advances past that bucket. This is intentional to prevent the task from repeatedly attempting to process segments that are about to be deleted.
* **Irreversible skipping**: Once the watermark has moved past a time bucket, those buckets will never be reprocessed, even if the buffer configuration is later changed. There is no way to recover skipped buckets without manual intervention or table reinitialization.
* **Configuration impact**: Misconfiguring the retention buffer (e.g., setting it too high) can cause the merge task to permanently skip important time buckets. Ensure the buffer is set appropriately for your use case.
* **Recommendation**: Use a conservative buffer value (e.g., `"1h"` or `"30m"`) to balance safety against clock skew with the risk of skipping buckets. Monitor `mergeRollupTaskDelayInNumBuckets` metrics to detect if the task is falling behind.

### Metrics

#### mergeRollupTaskDelayInNumBuckets.{tableNameWithType}.{mergeLevel}

This metric keeps track of the task delay in the number of time buckets. For example, if we see this number is 7, and the merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of delay. Useful to monitor if the merge task is stuck in production.

{% hint style="info" %}
**Original design doc**: <https://docs.google.com/document/d/1-AKCfXNXdoNjFIvJ87wjWwFM_38gS0NCwFrIYjYsqp8/edit?usp=sharing>

**Issue**: <https://github.com/apache/pinot/issues/2715>
{% endhint %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pinot.apache.org/operate-pinot/segment-management/minion-merge-rollup-task.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
