arrow-left

All pages
gitbookPowered by GitBook
1 of 1

Loading...

Minion

A Minion is a standby component that leverages the Helix Task Frameworkarrow-up-right to offload computationally intensive tasks from other components.

It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. Custom tasks can be plugged via annotations into the cluster. Some typical minion tasks are:

  • Segment creation

  • Segment purge

  • Segment merge

hashtag
Starting a Minion

Make sure you've . If you're using docker, make sure to . To start a minion

hashtag
Interfaces

hashtag
PinotTaskGenerator

PinotTaskGenerator interface defines the APIs for the controller to generate tasks for minions to execute.

hashtag
PinotTaskExecutorFactory

Factory for PinotTaskExecutor which defines the APIs for Minion to execute the tasks.

hashtag
MinionEventObserverFactory

Factory for MinionEventObserver which defines the APIs for task event callbacks on minion.

hashtag
Built-in Tasks

hashtag
SegmentGenerationAndPushTask

To be added

hashtag
RealtimeToOfflineSegmentsTask

See for details.

hashtag
MergeRollupTask

See for details.

hashtag
ConvertToRawIndexTask

To be added

hashtag
Enable Tasks

Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask) on a table, update the table config to include the task type:

Under each enable task type, custom properties can be configured for the task type.

hashtag
Schedule Tasks

hashtag
Auto-Schedule

Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyInSeconds.

Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule config for each task type separately. Thie optioncontroller.task.scheduler.enabled should be set to true to enable cron scheduling.

As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax ).

hashtag
Manual Schedule

Tasks can be manually scheduled using the following controller rest APIs:

Rest API
Description

hashtag
Plug-in Custom Tasks

To plug in a custom task, implement PinotTaskGenerator, PinotTaskExecutorFactory and MinionEventObserverFactory (optional) for the task type (all of them should return the same string for getTaskType()), and annotate them with the following annotations:

Implementation
Annotation

After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*, then they will be auto-registered by the controller and minion.

hashtag
Example

See where the TestTask is plugged-in.

hashtag
Task-related metrics

There is a controller job that runs every 5 minutes by default and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:

  • NumMinionTasksInProgress: Number of running tasks

  • NumMinionSubtasksRunning: Number of running sub-tasks

  • NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)

For each task, the Minion will emit these metrics:

  • TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative

  • TASK_EXECUTION: Task execution time, which is the time spent on executing the task

  • NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1

NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)

  • PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states

  • PercentMinionSubtasksInError: Percent of sub-tasks in error

  • POST /tasks/schedule

    Schedule tasks for all task types on all enabled tables

    POST /tasks/schedule?taskType=myTask

    Schedule tasks for the given task type on all enabled tables

    POST /tasks/schedule?tableName=myTable_OFFLINE

    Schedule tasks for all task types on the given table

    POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE

    Schedule tasks for the given task type on the given table

    PinotTaskGenerator

    @TaskGenerator

    PinotTaskExecutorFactory

    @TaskExecutorFactory

    MinionEventObserverFactory

    @EventObserverFactory

    docker run \
        --network=pinot-demo \
        --name pinot-minion \
        -d ${PINOT_IMAGE} StartMinion \
        -zkAddress pinot-zookeeper:2181
    bin/pinot-admin.sh StartMinion \
        -zkAddress localhost:2181
    setup Zookeeper
    pull the pinot docker image
    Pinot managed Offline flows
    Minion merge rollup task
    defined herearrow-up-right
    SimpleMinionClusterIntegrationTestarrow-up-right
    Usage: StartMinion
        -help                                                   : Print this message. (required=false)
        -minionHost               <String>                      : Host name for minion. (required=false)
        -minionPort               <int>                         : Port number to start the minion at. (required=false)
        -zkAddress                <http>                        : HTTP address of Zookeeper. (required=false)
        -clusterName              <String>                      : Pinot cluster name. (required=false)
        -configFileName           <Config File Name>            : Minion Starter Config file. (required=false)
    public interface PinotTaskGenerator {
    
      /**
       * Initializes the task generator.
       */
      void init(ClusterInfoAccessor clusterInfoAccessor);
    
      /**
       * Returns the task type of the generator.
       */
      String getTaskType();
    
      /**
       * Generates a list of tasks to schedule based on the given table configs.
       */
      List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
    
      /**
       * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
       */
      default long getTaskTimeoutMs() {
        return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
      }
    
      /**
       * Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
       */
      default int getNumConcurrentTasksPerInstance() {
        return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
      }
    
      /**
       * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
       */
      default void nonLeaderCleanUp() {
      }
    }
    public interface PinotTaskExecutorFactory {
    
      /**
       * Initializes the task executor factory.
       */
      void init(MinionTaskZkMetadataManager zkMetadataManager);
    
      /**
       * Returns the task type of the executor.
       */
      String getTaskType();
    
      /**
       * Creates a new task executor.
       */
      PinotTaskExecutor create();
    }
    public interface PinotTaskExecutor {
    
      /**
       * Executes the task based on the given task config and returns the execution result.
       */
      Object executeTask(PinotTaskConfig pinotTaskConfig)
          throws Exception;
    
      /**
       * Tries to cancel the task.
       */
      void cancel();
    }
    public interface MinionEventObserverFactory {
    
      /**
       * Initializes the task executor factory.
       */
      void init(MinionTaskZkMetadataManager zkMetadataManager);
    
      /**
       * Returns the task type of the event observer.
       */
      String getTaskType();
    
      /**
       * Creates a new task event observer.
       */
      MinionEventObserver create();
    }
    public interface MinionEventObserver {
    
      /**
       * Invoked when a minion task starts.
       *
       * @param pinotTaskConfig Pinot task config
       */
      void notifyTaskStart(PinotTaskConfig pinotTaskConfig);
    
      /**
       * Invoked when a minion task succeeds.
       *
       * @param pinotTaskConfig Pinot task config
       * @param executionResult Execution result
       */
      void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult);
    
      /**
       * Invoked when a minion task gets cancelled.
       *
       * @param pinotTaskConfig Pinot task config
       */
      void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig);
    
      /**
       * Invoked when a minion task encounters exception.
       *
       * @param pinotTaskConfig Pinot task config
       * @param exception Exception encountered during execution
       */
      void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception);
    }
    {
      ...
      "task": {
        "taskTypeConfigsMap": {
          "myTask": {
            "myProperty1": "value1",
            "myProperty2": "value2"
          }
        }
      }
    }
      "task": {
        "taskTypeConfigsMap": {
          "RealtimeToOfflineSegmentsTask": {
            "bucketTimePeriod": "1h",
            "bufferTimePeriod": "1h",
            "schedule": "0 * * * * ?"
          }
        }
      },