Minion

Explore the minion component in Apache Pinot, empowering efficient data movement and segment generation within Pinot clusters.

A Pinot minion is an optional cluster component that executes background tasks on table data apart from the query processes performed by brokers and servers. Minions run on independent hardware resources, and are responsible for executing minion tasks as directed by the controller. Examples of minon tasks include converting batch data from a standard format like Avro or JSON into segment files to be loaded into an offline table, and rewriting existing segment files to purge records as required by data privacy laws like GDPR. Minion tasks can run once or be scheduled to run periodically.

Minions isolate the computational burden of out-of-band data processing from the servers. Although a Pinot cluster can function with or without minions, they are typically present to support routine tasks like batch data ingest.

Starting a minion

Make sure you've set up Zookeeper. If you're using Docker, make sure to pull the Pinot Docker image. To start a minion:

Usage: StartMinion
    -help                                                   : Print this message. (required=false)
    -minionHost               <String>                      : Host name for minion. (required=false)
    -minionPort               <int>                         : Port number to start the minion at. (required=false)
    -zkAddress                <http>                        : HTTP address of Zookeeper. (required=false)
    -clusterName              <String>                      : Pinot cluster name. (required=false)
    -configFileName           <Config File Name>            : Minion Starter Config file. (required=false)
docker run \
    --network=pinot-demo \
    --name pinot-minion \
    -d ${PINOT_IMAGE} StartMinion \
    -zkAddress pinot-zookeeper:2181

Interfaces

Pinot task generator

The Pinot task generator interface defines the APIs for the controller to generate tasks for minions to execute.

ConfigurationException: Duplicate key found in /path/to/minion.conf at line 10 and line 15: pinot.minion.task.allow.download.from.server

PinotTaskExecutorFactory

Factory for PinotTaskExecutor which defines the APIs for Minion to execute the tasks.

MinionEventObserverFactory

Factory for MinionEventObserver which defines the APIs for task event callbacks on minion.

Built-in tasks

Pinot ships with the following built-in Minion tasks:

Task
Purpose
Table Types

Batch ingestion: reads raw data files and converts them into Pinot segments

OFFLINE

Converts completed real-time segments into optimized offline segments

REALTIME to OFFLINE

Merges small segments into larger ones and optionally rolls up data at coarser granularity

OFFLINE, REALTIME (without upsert/dedup)

Removes or modifies records for data retention and compliance (e.g., GDPR)

OFFLINE, REALTIME

Reprocesses segments after table config or schema changes (new indexes, columns, data types)

OFFLINE, REALTIME

Compacts individual upsert segments by removing invalidated records

REALTIME (upsert only)

Merges multiple small upsert segments into larger ones to reduce segment count

REALTIME (upsert only)

SegmentGenerationAndPushTask

The SegmentGenerationAndPushTask can fetch files from an input folder (e.g. from an S3 bucket) and convert them into segments. It converts one file into one segment and keeps the file name in segment metadata to avoid duplicate ingestion.

See SegmentGenerationAndPushTask runbook for full configuration details.

Below is an example task config to put in TableConfig to enable this task. The task is scheduled every 10min to keep ingesting remaining files, with 10 parallel task at max and 1 file per task.

NOTE: You may want to simply omit "tableMaxNumTasks" due to this caveat: the task generates one segment per file, and derives segment name based on the time column of the file. If two files happen to have same time range and are ingested by tasks from different schedules, there might be segment name conflict. To overcome this issue for now, you can omit “tableMaxNumTasks” and by default it’s Integer.MAX_VALUE, meaning to schedule as many tasks as possible to ingest all input files in a single batch. Within one batch, a sequence number suffix is used to ensure no segment name conflict. Because the sequence number suffix is scoped within one batch, tasks from different batches might encounter segment name conflict issue said above.

When performing ingestion at scale remember that Pinot will list all of the files contained in the `inputDirURI` every time a `SegmentGenerationAndPushTask` job gets scheduled. This could become a bottleneck when fetching files from a cloud bucket like GCS. To prevent this make `inputDirURI` point to the least number of files possible.

RealtimeToOfflineSegmentsTask

See Pinot managed Offline flows for details.

MergeRollupTask

See Minion merge rollup task for details.

PurgeTask

See PurgeTask runbook for details.

RefreshSegmentTask

See RefreshSegmentTask runbook for details.

UpsertCompactionTask

See UpsertCompactionTask runbook for details.

UpsertCompactMergeTask

See UpsertCompactMergeTask runbook for details.

Enable tasks

Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask) on a table, update the table config to include the task type:

Under each enable task type, custom properties can be configured for the task type.

You can also override how Pinot schedules task generation for a table by setting concurrentSchedulingEnabled in the same task block:

Use concurrentSchedulingEnabled as follows:

  • null or omitted: inherit the cluster default from controller.task.concurrentSchedulingEnabled

  • true: opt this table into concurrent task scheduling

  • false: force the legacy serialized scheduling path for this table, even if the cluster default is concurrent

There are also two task configs to be set as part of cluster configs like below. One controls task's overall timeout (1hr by default) and one for how many tasks to run on a single minion worker (1 by default).

Schedule tasks

Auto-schedule

There are 2 ways to enable task scheduling:

Controller level schedule for all minion tasks

Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyPeriod. This takes period strings as values, e.g. 2h, 30m, 1d.

To let PinotTaskManager generate tasks for different tables in parallel, enable distributed locking first and then enable concurrent scheduling:

If you want to keep the cluster default serialized, leave controller.task.concurrentSchedulingEnabled=false and opt individual tables in with task.concurrentSchedulingEnabled=true. Pinot uses the concurrent path only when every table targeted by a scheduling request resolves to concurrent scheduling.

Per table and task level schedule

Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule config for each task type separately. This config in the controller config, controller.task.scheduler.enabled should be set to true to enable cron scheduling.

As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax defined here).

Manual schedule

Tasks can be manually scheduled using the following controller rest APIs:

Rest API
Description

POST /tasks/schedule

Schedule tasks for all task types on all enabled tables

POST /tasks/schedule?taskType=myTask

Schedule tasks for the given task type on all enabled tables

POST /tasks/schedule?tableName=myTable_OFFLINE

Schedule tasks for all task types on the given table

POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE

Schedule tasks for the given task type on the given table

Schedule task on specific instances

Tasks can be scheduled on specific instances using the following config at task level:

By default, the value is minion_untagged to have backward-compatibility. This will allow users to schedule tasks on specific nodes and isolate tasks among tables / task-types.

Rest API
Description

POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE&minionInstanceTag=tag1_MINION

Schedule tasks for the given task type of the given table on the minion nodes tagged as tag1_MINION.

Task level advanced configs

allowDownloadFromServer

When a task is executed on a segment, the minion node fetches the segment from deepstore. If the deepstore is not accessible, the minion node can download the segment from the server node. This is controlled by the allowDownloadFromServer config in the task config. By default, this is set to false.

We can also set this config at a minion instance level pinot.minion.task.allow.download.from.server (default is false). This instance level config helps in enforcing this behaviour if the number of tables / tasks is pretty high and we want to enable for all. Note: task-level config will override instance-level config value.

Plug-in custom tasks

To plug in a custom task, implement PinotTaskGenerator, PinotTaskExecutorFactory and MinionEventObserverFactory (optional) for the task type (all of them should return the same string for getTaskType()), and annotate them with the following annotations:

Implementation
Annotation

PinotTaskGenerator

@TaskGenerator

PinotTaskExecutorFactory

@TaskExecutorFactory

MinionEventObserverFactory

@EventObserverFactory

After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*, then they will be auto-registered by the controller and minion.

Example

See SimpleMinionClusterIntegrationTest where the TestTask is plugged-in.

Task Manager UI

In the Pinot Data Explorer, select Minion Tasks from the left navigation to open the Minion Task Manager page. This page focuses on minion queue troubleshooting and task drill-downs. Controller-wide scheduler details live on the Cluster Manager page, where Cron Scheduler Information and Periodic Tasks are shown separately.

The Minion Task Manager landing page shows four summary tiles:

  • Task Types

  • Minion Instances

  • Running Tasks

  • Waiting Tasks

Below the summary tiles is the task-queue table. This table shows which task types are active in Helix and lets you drill into each queue.

This one shows which types of Minion Task have been used. Essentially which task types have created their task queues in Helix.

**

Clicking into a task type shows the tables using that task type, along with queue-management actions such as stopping or cleaning up the queue.

**

Then clicking into any table in this list, one can see how the task is configured for that table. And the task metadata if there is one in ZK. For example, MergeRollupTask tracks a watermark in ZK. If the task is cron scheduled, the current and next schedules are also shown in this page like below.

**

**

At the bottom of this page is a list of tasks generated for this table for this specific task type. Like here, one MergeRollup task has been generated and completed. The task list also includes a Status Filter control so you can focus on a single task state, and a Sub Tasks (Total/Completed/Running/Waiting/Error/Other) column that summarizes the subtasks for each task. The Other bucket combines UNKNOWN, DROPPED, TIMED_OUT, and ABORTED subtasks.

Clicking into a task opens task details including start and finish times, runtime configuration, and an Operations accordion with a Delete Task action for removing the task and its subtasks from the queue. The task detail page also lists the subtasks generated for that task (as context, one minion task can have multiple subtasks to process data in parallel). The subtask table has its own Status Filter control, which is useful when a task fanout creates many subtasks across multiple minion workers. In this example, it happened to have one sub-task here, and it shows when it starts and stops and which minion worker it's running.

**

Clicking into this subtask shows more details such as the input task config, progress, and error information if the task failed. If the subtask has already been assigned to a minion worker, the page also includes a Minion Log Files panel so you can refresh the file list and download logs from that minion directly in the UI.

**

There is a controller job that runs every 5 minutes by default, controlled by controller.minion.task.metrics.emitter.frequencyPeriod, and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:

  • NumMinionTasksInProgress: Number of running tasks

  • NumMinionSubtasksRunning: Number of running sub-tasks

  • NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)

  • NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)

  • PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states

  • PercentMinionSubtasksInError: Percent of sub-tasks in error

  • MaxSubtaskWaitTimeMs: Per-table, per-task-type controller gauge for the longest current wait time across subtasks in WAITING. Pinot emits 0 when no subtasks are waiting, so alerts can self-resolve after the queue drains.

  • MaxSubtaskRunningTimeMs: Per-table, per-task-type controller gauge for the longest current runtime across subtasks in RUNNING. Pinot emits 0 when no subtasks are running.

The controller also emits metrics about how tasks are cron scheduled:

  • cronSchedulerJobScheduled: Number of current cron schedules registered to be triggered regularly according their cron expressions. It's a Gauge.

  • cronSchedulerJobTrigger: Number of cron scheduled triggered, as a Meter.

  • cronSchedulerJobSkipped: Number of late cron scheduled skipped, as a Meter.

  • cronSchedulerJobExecutionTimeMs: Time used to complete task generation, as a Timer.

For each task, the minion will emit these metrics:

  • TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative

  • TASK_EXECUTION: Task execution time, which is the time spent on executing the task

  • NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1

  • NUMBER_TASKS_EXECUTED: Number of tasks executed, as a Meter.

  • NUMBER_TASKS_COMPLETED: Number of tasks completed, as a Meter.

  • NUMBER_TASKS_CANCELLED: Number of tasks cancelled, as a Meter.

  • NUMBER_TASKS_FAILED: Number of tasks failed, as a Meter. Different from fatal failure, the task encountered an error which can not be recovered from this run, but it may still succeed by retrying the task.

  • NUMBER_TASKS_FATAL_FAILED: Number of tasks fatal failed, as a Meter. Different from failure, the task encountered an error, which will not be recoverable even with retrying the task.

Last updated

Was this helpful?