Discover how Apache Pinot's broker component optimizes query processing, data retrieval, and enhances data-driven applications.
Pinot brokers take query requests from client processes, scatter them to applicable servers, gather the results, and return results to the client. The controller shares cluster metadata with the brokers, which allows the brokers to create a plan for executing the query involving a minimal subset of servers with the source data and, when required, other servers to shuffle and consolidate results.
A production Pinot cluster contains many brokers. In general, the more brokers, the more concurrent queries a cluster can process, and the lower latency it can deliver on queries.
Pinot brokers are modeled as Helix spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried.
The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed.
Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.
In the case of hybrid tables, the brokers ensure that the overlap between real-time and offline segment data is queried exactly once, by performing offline and real-time federation.
Let's take this example, we have real-time data for five days - March 23 to March 27, and offline data has been pushed until Mar 25, which is two days behind real-time. The brokers maintain this time boundary.
Suppose, we get a query to this table : select sum(metric) from table
. The broker will split the query into 2 queries based on this time boundary – one for offline and one for real-time. This query becomes select sum(metric) from table_REALTIME where date >= Mar 25
and select sum(metric) from table_OFFLINE where date < Mar 25
The broker merges results from both these queries before returning the result to the client.
Make sure you've set up Zookeeper. If you're using Docker, make sure to pull the Pinot Docker image. To start a broker:
Explore the minion component in Apache Pinot, empowering efficient data movement and segment generation within Pinot clusters.
A Pinot minion is an optional cluster component that executes background tasks on table data apart from the query processes performed by brokers and servers. Minions run on independent hardware resources, and are responsible for executing minion tasks as directed by the controller. Examples of minon tasks include converting batch data from a standard format like Avro or JSON into segment files to be loaded into an offline table, and rewriting existing segment files to purge records as required by data privacy laws like GDPR. Minion tasks can run once or be scheduled to run periodically.
Minions isolate the computational burden of out-of-band data processing from the servers. Although a Pinot cluster can function with or without minions, they are typically present to support routine tasks like batch data ingest.
Make sure you've set up Zookeeper. If you're using Docker, make sure to pull the Pinot Docker image. To start a minion:
The Pinot task generator interface defines the APIs for the controller to generate tasks for minions to execute.
Factory for PinotTaskExecutor
which defines the APIs for Minion to execute the tasks.
Factory for MinionEventObserver
which defines the APIs for task event callbacks on minion.
The PushTask can fetch files from an input folder e.g. from a S3 bucket and converts them into segments. The PushTask converts one file into one segment and keeps file name in segment metadata to avoid duplicate ingestion. Below is an example task config to put in TableConfig to enable this task. The task is scheduled every 10min to keep ingesting remaining files, with 10 parallel task at max and 1 file per task.
NOTE: You may want to simply omit "tableMaxNumTasks" due to this caveat: the task generates one segment per file, and derives segment name based on the time column of the file. If two files happen to have same time range and are ingested by tasks from different schedules, there might be segment name conflict. To overcome this issue for now, you can omit “tableMaxNumTasks” and by default it’s Integer.MAX_VALUE, meaning to schedule as many tasks as possible to ingest all input files in a single batch. Within one batch, a sequence number suffix is used to ensure no segment name conflict. Because the sequence number suffix is scoped within one batch, tasks from different batches might encounter segment name conflict issue said above.
When performing ingestion at scale remember that Pinot will list all of the files contained in the `inputDirURI` every time a `SegmentGenerationAndPushTask` job gets scheduled. This could become a bottleneck when fetching files from a cloud bucket like GCS. To prevent this make `inputDirURI` point to the least number of files possible.
See Pinot managed Offline flows for details.
See Minion merge rollup task for details.
Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask
) on a table, update the table config to include the task type:
Under each enable task type, custom properties can be configured for the task type.
There are also two task configs to be set as part of cluster configs like below. One controls task's overall timeout (1hr by default) and one for how many tasks to run on a single minion worker (1 by default).
There are 2 ways to enable task scheduling:
Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyPeriod
. This takes period strings as values, e.g. 2h, 30m, 1d.
Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule
config for each task type separately. This config in the controller config, controller.task.scheduler.enabled
should be set to true
to enable cron scheduling.
As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax defined here).
Tasks can be manually scheduled using the following controller rest APIs:
POST /tasks/schedule
Schedule tasks for all task types on all enabled tables
POST /tasks/schedule?taskType=myTask
Schedule tasks for the given task type on all enabled tables
POST /tasks/schedule?tableName=myTable_OFFLINE
Schedule tasks for all task types on the given table
POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE
Schedule tasks for the given task type on the given table
Tasks can be scheduled on specific instances using the following config at task level:
By default, the value is minion_untagged
to have backward-compatibility. This will allow users to schedule tasks on specific nodes and isolate tasks among tables / task-types.
POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE&minionInstanceTag=tag1_MINION
Schedule tasks for the given task type of the given table on the minion nodes tagged as tag1_MINION.
When a task is executed on a segment, the minion node fetches the segment from deepstore. If the deepstore is not accessible, the minion node can download the segment from the server node. This is controlled by the allowDownloadFromServer
config in the task config. By default, this is set to false
.
We can also set this config at a minion instance level pinot.minion.task.allow.download.from.server
(default is false
). This instance level config helps in enforcing this behaviour if the number of tables / tasks is pretty high and we want to enable for all. Note: task-level config will override instance-level config value.
To plug in a custom task, implement PinotTaskGenerator
, PinotTaskExecutorFactory
and MinionEventObserverFactory
(optional) for the task type (all of them should return the same string for getTaskType()
), and annotate them with the following annotations:
PinotTaskGenerator
@TaskGenerator
PinotTaskExecutorFactory
@TaskExecutorFactory
MinionEventObserverFactory
@EventObserverFactory
After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*
, then they will be auto-registered by the controller and minion.
See SimpleMinionClusterIntegrationTest where the TestTask
is plugged-in.
In the Pinot UI, there is Minion Task Manager tab under Cluster Manager page. From that minion task manager tab, one can find a lot of task related info for troubleshooting. Those info are mainly collected from the Pinot controller that schedules tasks or Helix that tracks task runtime status. There are also buttons to schedule tasks in an ad hoc way. Below are some brief introductions to some pages under the minion task manager tab.
This one shows which types of Minion Task have been used. Essentially which task types have created their task queues in Helix.
Clicking into a task type, one can see the tables using that task. And a few buttons to stop the task queue, cleaning up ended tasks etc.
Then clicking into any table in this list, one can see how the task is configured for that table. And the task metadata if there is one in ZK. For example, MergeRollupTask tracks a watermark in ZK. If the task is cron scheduled, the current and next schedules are also shown in this page like below.
At the bottom of this page is a list of tasks generated for this table for this specific task type. Like here, one MergeRollup task has been generated and completed.
Clicking into a task from that list, we can see start/end time for it, and the subtasks generated for that task (as context, one minion task can have multiple subtasks to process data in parallel). In this example, it happened to have one sub-task here, and it shows when it starts and stops and which minion worker it's running.
Clicking into this subtask, one can see more details about it like the input task configs and error info if the task failed.
There is a controller job that runs every 5 minutes by default and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:
NumMinionTasksInProgress: Number of running tasks
NumMinionSubtasksRunning: Number of running sub-tasks
NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)
NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)
PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states
PercentMinionSubtasksInError: Percent of sub-tasks in error
The controller also emits metrics about how tasks are cron scheduled:
cronSchedulerJobScheduled: Number of current cron schedules registered to be triggered regularly according their cron expressions. It's a Gauge.
cronSchedulerJobTrigger: Number of cron scheduled triggered, as a Meter.
cronSchedulerJobSkipped: Number of late cron scheduled skipped, as a Meter.
cronSchedulerJobExecutionTimeMs: Time used to complete task generation, as a Timer.
For each task, the minion will emit these metrics:
TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative
TASK_EXECUTION: Task execution time, which is the time spent on executing the task
NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1
NUMBER_TASKS_EXECUTED: Number of tasks executed, as a Meter.
NUMBER_TASKS_COMPLETED: Number of tasks completed, as a Meter.
NUMBER_TASKS_CANCELLED: Number of tasks cancelled, as a Meter.
NUMBER_TASKS_FAILED: Number of tasks failed, as a Meter. Different from fatal failure, the task encountered an error which can not be recovered from this run, but it may still succeed by retrying the task.
NUMBER_TASKS_FATAL_FAILED: Number of tasks fatal failed, as a Meter. Different from failure, the task encountered an error, which will not be recoverable even with retrying the task.
Learn to build and manage Apache Pinot clusters, uncovering key components for efficient data processing and optimized analysis.
A Pinot cluster is a collection of the software processes and hardware resources required to ingest, store, and process data. For detail about Pinot cluster components, see Physical architecture.
A Pinot cluster consists of the following processes, which are typically deployed on separate hardware resources in production. In development, they can fit comfortably into Docker containers on a typical laptop:
Controller: Maintains cluster metadata and manages cluster resources.
Zookeeper: Manages the Pinot cluster on behalf of the controller. Provides fault-tolerant, persistent storage of metadata, including table configurations, schemas, segment metadata, and cluster state.
Broker: Accepts queries from client processes and forwards them to servers for processing.
Server: Provides storage for segment files and compute for query processing.
(Optional) Minion: Computes background tasks other than query processing, minimizing impact on query latency. Optimizes segments, and builds additional indexes to ensure performance (even if data is deleted).
The simplest possible Pinot cluster consists of four components: a server, a broker, a controller, and a Zookeeper node. In production environments, these components typically run on separate server instances, and scale out as needed for data volume, load, availability, and latency. Pinot clusters in production range from fewer than ten total instances to more than 1,000.
Pinot uses Apache Zookeeper as a distributed metadata store and Apache Helix for cluster management.
Helix is a cluster management solution that maintains a persistent, fault-tolerant map of the intended state of the Pinot cluster. Helix constantly monitors the cluster to ensure that the right hardware resources are allocated for the present configuration. When the configuration changes, Helix schedules or decommissions hardware resources to reflect the new configuration. When elements of the cluster change state catastrophically, Helix schedules hardware resources to keep the actual cluster consistent with the ideal represented in the metadata. From a physical perspective, Helix takes the form of a controller process plus agents running on servers and brokers.
For details of cluster configuration settings, see Cluster configuration reference.
Helix divides nodes into logical components based on their responsibilities:
Participants are the nodes that host distributed, partitioned resources
Pinot servers are modeled as participants. For details about server nodes, see Server.
Spectators are the nodes that observe the current state of each participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
Pinot brokers are modeled as spectators. For details about broker nodes, see Broker.
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.
Pinot controllers are modeled as controllers. For details about controller nodes, see Controller.
Another way to visualize the cluster is a logical view, where:
Typically, there is only one cluster per environment/data center. There is no need to create multiple Pinot clusters because Pinot supports tenants.
To set up a cluster, see one of the following guides:
Discover the tenant component of Apache Pinot, which facilitates efficient data isolation and resource management within Pinot clusters.
Every table is associated with a tenant, or a logical namespace that restricts where the cluster processes queries on the table. A Pinot tenant takes the form of a text tag in the logical tenant namespace. Physical cluster hardware resources (i.e., and ) are also associated with a tenant tag in the common tenant namespace. Tables of a particular tenant tag will only be scheduled for storage and query processing on hardware resources that belong to the same tenant tag. This lets Pinot cluster operators assign specified workloads to certain hardware resources, preventing data in separate workloads from being stored or processed on the same physical hardware.
By default, all tables, brokers, and servers belong to a tenant called DefaultTenant, but you can configure multiple tenants in a Pinot cluster.
To support multi-tenancy, Pinot has first-class support for tenants. Every table is associated with a server tenant and a broker tenant, which controls the nodes used by the table as servers and brokers. Multi-tenancy lets Pinot group all tables belonging to a particular use case under a single tenant name.
The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A
and Table B
in the same Pinot cluster.
We can configure Table A
with server tenant Tenant A
and Table B
with server tenant Tenant B
. We can tag some of the server nodes for Tenant A
and some for Tenant B
. This will ensure that segments of Table A
only reside on servers tagged with Tenant A
, and segment of Table B
only reside on servers tagged with Tenant B
. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
No need to create separate clusters for every table or use case!
This section contains two main fields broker
and server
, which decide the tenants used for the broker and server components of this table.
In the above example:
The table will be served by brokers that have been tagged as brokerTenantName_BROKER
in Helix.
If this were an offline table, the offline segments for the table will be hosted in Pinot servers tagged in Helix as serverTenantName_OFFLINE
If this were a real-time table, the real-time segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in Helix as serverTenantName_REALTIME
.
Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant
by tagging three untagged broker nodes as sampleBrokerTenant_BROKER
.
To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances
.
Here's a sample server tenant config. This will create a server tenant sampleServerTenant
by tagging 1 untagged server node as sampleServerTenant_OFFLINE
and 1 untagged server node as sampleServerTenant_REALTIME
.
To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances
+ realtimeInstances
.
Uncover the efficient data processing and storage capabilities of Apache Pinot's server component, optimizing performance for data-driven applications.
Pinot servers provide the primary storage for and perform the computation required to execute queries. A production Pinot cluster contains many servers. In general, the more servers, the more data the cluster can retain in tables, the lower latency the cluster can deliver on queries, and the more concurrent queries the cluster can process.
Servers are typically segregated into real-time and offline workloads, with "real-time" servers hosting only real-time tables, and "offline" servers hosting only offline tables. This is a ubiquitous operational convention, not a difference or an explicit configuration in the server process itself. There are two types of servers:
Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.
Real-time servers directly ingest from a real-time stream (such as Kafka or EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.
Pinot servers are modeled as Helix participants, hosting Pinot tables (referred to as resources in Helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more Helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).
Discover the controller component of Apache Pinot, enabling efficient data and query management.
The Pinot controller schedules and reschedules resources in a Pinot cluster when metadata changes or a node fails. As an Apache Helix Controller, the Pinot controller schedules the resources that comprise the cluster and orchestrates connections between certain external processes and cluster components (for example, ingest of and ). The Pinot controller can be deployed as a single process on its own server or as a group of redundant servers in an active/passive configuration.
The controller exposes a for cluster-wide administrative operations as well as a web-based query console to execute interactive SQL queries and perform simple administrative tasks.
The Pinot controller is responsible for the following:
Maintaining global metadata (e.g., configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.
Hosting the Helix Controller and managing other Pinot components (brokers, servers, minions)
Maintaining the mapping of which servers are responsible for which segments. This mapping is used by the servers to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.
Serving admin endpoints for viewing, creating, updating, and deleting configs, which are used to manage and operate the cluster.
Serving endpoints for segment uploads, which are used in offline data pushes. They are responsible for initializing real-time consumption and coordination of persisting real-time segments into the segment store periodically.
Undertaking other management activities such as managing retention of segments, validations.
For redundancy, there can be multiple instances of Pinot controllers. Pinot expects that all controllers are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .
The controller runs several periodic tasks in the background, to perform activities such as management and validation. Each periodic task has to define the run frequency and default frequency. Each task runs at its own schedule or can also be triggered manually if needed. The task runs on the lead controller for each table.
For period task configuration details, see .
Use the GET /periodictask/names
API to fetch the names of all the periodic tasks running on your Pinot cluster.
To manually run a named periodic task, use the GET /periodictask/run
API:
The Log Request Id
(api-09630c07
) can be used to search through pinot-controller log file to see log entries related to execution of the Periodic task that was manually run.
If tableName
(and its type OFFLINE
or REALTIME
) is not provided, the task will run against all tables.
This tenant is defined in the section of the table config.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Make sure you've . If you're using Docker, make sure to . To start a server:
Make sure you've . If you're using Docker, make sure to . To start a controller: