LogoLogo
release-0.10.0
release-0.10.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • Batch Ingestion
        • Spark
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Filtering with IdSet
      • Transformation Functions
      • Aggregation Functions
      • User-Defined Functions (UDFs)
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Explain Plan
      • Grouping Algorithm
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Transformations
      • Null Value Support
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Tiered Storage
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
      • Upgrading Pinot with confidence
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Functions
      • ABS
      • ADD
      • arrayConcatInt
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctString
      • arrayDistinctInt
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • hour
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • VALUEIN
      • week
      • year
      • yearOfWeek
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page
  • How this works
  • Config
  • Metrics
  • Future works

Was this helpful?

Export as PDF
  1. For Operators
  2. Deployment and Monitoring

Minion merge rollup task

PreviousPinot managed Offline flowsNextAccess Control

Last updated 3 years ago

Was this helpful?

Original design doc:

Issue:

The Minion merge/rollup task allows a user to merge small segments into larger ones, through which Pinot can potentially benefit from improved disk storage and the query performance. For complete motivation and reasoning, please refer to the design doc above. Currently, we only support OFFLINE table APPEND use cases.

How this works

The Pinot merge/rollup task will merge segments, k time buckets (configurable, default 1) at best effort at a time from the oldest to the newest records. After processing, the segments will be time aligned according to the bucket. For example, if the table has hourly records starting 11-01-2021T13:56:00, and is configured to use bucket time of 1 day, then the merge/rollup task will merge the records for the window [11-01-2021, 11-02-2021) in the first run, followed by [11-02-2021, 11-03-2021) in the next run, followed by [11-03-2021, 11-04-2021) in the next run, and so on.

Multi-level merge is also allowed to achieve different compressions for different time ranges. For example, if the table has hourly records, we can keep them as is for the last day, rollup the data to daily granularity from 1 week ago to 1 day ago, rollup the data before 1 week to monthly granularity.

This feature uses the following metadata in zookeeper:

  • CustomMap of SegmentZKMetadata keeps the mapping of { "MergeRollupTask.mergeLevel" : {mergeLevel} }. This field indicates that the segment is the result of merge/rollup task. This field is used to skip time buckets that have all merged segments to avoid reprocessing.

  • MergeRollupTaskMetadata stored in the path: MINION_TASK_METADATA/MergeRollupTask/{tableNameWithType}. This metadata keeps the mapping from mergeLevel to waterMarkMs. The watermark is the start time of current processing buckets. All data before the watermark are merged, time aligned and need to use new backfill approaches (not supported yet). This metadata is useful to determine the next scheduling buckets.

  • Merge/rollup task uses SegmentReplacementProtocol to achieve Broker level atomic swap between the input segments and result segments. Broker refers to the SegmentLineage metadata to determine which segments should be routed.

This feature uses the pinot-minions and the Helix Task Executor framework. It consists of 2 parts:

  1. MergeRollupTaskGenerator - This is the minion task scheduler, which schedules tasks of type "MergeRollupTask". This task is scheduled by the controller periodic task - PinotTaskManager. For each mergeLevel from the highest to the lowest granualrity (hourly -> daily -> monthly):

    • Time buckets calculation - Starting from the watermark, calculate up to k time buckets that has un-merged segments at best effort. Bump up the watermark if necessary.

    • Segments scheduling - For each time bucket, select all overlapping segments and create minion tasks.

  2. MergeRollupTaskExecutor - This is a minion task executor to execute the MergeRollupTask generated by the task generator. These tasks are run by the pinot-minion component.

    • Process segments - Download input segments as indicated in the task config. The segment processor framework will partition the data based on time value and rollup if configured.

    • Upload segments - Upload output segments with the segment replacement protocol. Once completed, the input segments are ready to be deleted, and will be cleaned up by the retention manager.

Config

Step 0: Start a pinot-minion

Step 1: Setup your OFFLINE table. Add "MergeRollupTask" in the task configs

"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1day.mergeType": "concat",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d"  
      }
    }
  }

Step 2: Enable PinotTaskManager

The PinotTaskManager periodic task is disabled by default. Enable it by adding this property to your controller conf. Set the frequency to some reasonable value (frequently is better, as extra tasks will not be scheduled unless required). The controller will need a restart after setting this config.

controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=3600

Step 3: Advanced configs

If needed, add more configs such as

"task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1hour.mergeType": "rollup",
        "1hour.bucketTimePeriod": "1h",
        "1hour.bufferTimePeriod": "3h",
        "1hour.maxNumRecordsPerSegment": "1000000",
        "1hour.maxNumRecordsPerTask": "5000000",
        "1hour.maxNumParallelBuckets": "5",
        "1day.mergeType": "rollup",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "1d",
        "1day.roundBucketTimePeriod": "1d",
        "1day.maxNumRecordsPerSegment": "1000000",
        "1day.maxNumRecordsPerTask": "5000000",
        "metricColA.aggregationType": "sum",
        "metricColB.aggregationType": "max"
      }
    }
  }

where,

Property
Description
Default

mergeType

Allowed values are concat - no aggregations rollup - perform metrics aggregations across common dimensions + time

concat

bucketTimePeriod

Time bucket size. Adjust this to change the time bucket. E.g. if set to 1h, the output segments will have records in 1 hour range.

None

bufferTimePeriod

Buffer time. Will not schedule tasks unless time bucket is older than this buffer. Configure this according to how late you expect your data. E.g. if your system can emit events later than 3d, set this to 3d to make sure those are included.

Note: Once a given time window has been processed, it will never be processed again.

None

roundBucketTimePeriod

Round the time value before merging the rows. This is useful if time column is highly granular than needed, you can rollup the time values (e.g. milliseconds granularity in the original data, but okay with minute level granularity in the application - set to 1m

None

{metricName}.aggregationType

Aggregation function to apply to the metric for aggregations. Only applicable for rollup cases. Allowed values are sum, max, min

sum

maxNumRecordsPerSegment

Control the number of records you want in a segment generated. Useful if the time bucket has many records, but you don't want them all in the same segment.

5,000,000

maxNumRecordsPerTask

Control single task workload. Useful to protect minion from overloading by a single task.

50,000,000

maxNumParallelBuckets

Control number of processing buckets per run. Useful to speed up the task scheduling for bootstrapping. E.g. if set to 10, the task generator will schedule 10 buckets per run.

1

Metrics

mergeRollupTaskDelayInNumBuckets.{tableNameWithType}.{mergeLevel}

This metric keeps track of the task delay in the number of time buckets. For example, if we see this number to be 7, and the merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of delay. It's useful to monitor if the merge task stuck in production.

Future works

Realtime support

If we can apply the feature to REALTIME tables, users can potential use long retention REALTIME tables instead of HYBRID tables for convenience. To add the support, we need to allow segment upload for realtime tables and handle potential corner cases.

Backfill support

Currently, Pinot data backfill is at segment level (replace segments with the same names), but the output segments have different names compared to the original segments. We need to introduce a new way to backfill the processed data, one potential approach:

  • Introduce a new API to get the list of segments for a given time window.

  • Use segment replacement protocol to swap the group of segments with the backfill ones.

https://docs.google.com/document/d/1-AKCfXNXdoNjFIvJ87wjWwFM_38gS0NCwFrIYjYsqp8/edit?usp=sharing
https://github.com/apache/pinot/issues/2715