Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
For details on how to set up a table, refer to Creating a table.
This page introduces all the instance assignment strategies, when to use them, and how to configure them.
Instance assignment is the strategy of assigning the servers to host a table. Each instance assignment strategy is associated with one segment assignment strategy (read more about Segment Assignment).
Instance assignment is configured via the InstanceAssignmentConfig. Based on the config, Pinot can assign servers to a table, then assign segments to servers using the segment assignment strategy associated with the instance assignment strategy.
There are 3 types of instances for the InstanceAssignmentConfig: OFFLINE
, CONSUMING
and COMPLETED
. OFFLINE
represents the instances hosting the segments for the offline table; CONSUMING
represents the instances hosting the consuming segments for the real-time table; COMPLETED
represents the instances hosting the completed segments for the real-time table. For real-time table, if COMPLETED
instances are not configured, completed segments will use the same instance assignment strategy as the consuming segments. If it is configured, completed segments will be automatically moved to the COMPLETED
instances periodically.
The default instance assignment strategy simply assigns all the servers in the cluster to each table, and uses the Balanced Segment Assignment for the table. This strategy requires no extra configurations for the cluster, and it works well for small clusters with few tables where all the resources can be shared among all the tables.
For performance critical use cases, we might not want to share the server resources for multiple use cases to prevent the use case being impacted by other use cases hosted on the same set of servers. We can use the Tag-Based Instance Assignment to achieve isolation for tables.
(Note: Logically the Tag-Based Instance Assignment is identical to the Tenant concept in Pinot, but just a different way of configuring the table. We recommend using the instance assignment over the tenant config because it can achieve more complex assignment strategies, as described below.)
In order to use the Tag-Based Instance Assignment, the servers should be tagged via the Helix InstanceConfig, where the tag suffix (_OFFLINE
or _REALTIME
) denotes the type of table the server is going to serve. Each server can have multiple tags if necessary.
After configuring the server tags, the Tag-Based Instance Assignment can be enabled by setting the tag
within the InstanceAssignmentConfig for the table as shown below. Only the servers with this tag will be assigned to host this table, and the table will use the Balanced Segment Assignment.
On top of the Tag-Based Instance Assignment, we can also control the number of servers assigned to each table by configuring the numInstances
in the InstanceAssignmentConfig. This is useful when we want to serve multiple tables of different sizes on the same set of servers. For example, suppose we have 30 servers hosting hundreds of tables for different analytics, we don’t want to use all 30 servers for each table, especially the tiny tables with only megabytes of data.
In order to use the Replica-Group Segment Assignment, the servers need to be assigned to multiple replica-groups of the table, where the Replica-Group Instance Assignment comes into the picture. Enable it and configure the numReplicaGroups
and numInstancesPerReplicaGroup
in the InstanceAssignmentConfig, and Pinot will assign the instances accordingly.
Similar to the Replica-Group Segment Assignment, in order to use the Partitioned Replica-Group Segment Assignment, servers not only need to be assigned to each replica-group, but also the partition within the replica-group. Adding the numPartitions
and numInstancesPerPartition
in the InstanceAssignmentConfig can fulfill the requirement.
(Note: The numPartitions
configured here does not have to match the actual number of partitions for the table in case the partitions of the table changed for some reason. If they do not match, the table partition will be assigned to the server partition in a round-robin fashion. For example, if there are 2 server partitions, but 4 table partitions, table partition 1 and 3 will be assigned to server partition 1, and table partition 2 and 4 will be assigned to server partition 2.)
In order to use Partitioned Replica-Group Segment Assignment, partitionColumn
is required in replicaGroupPartitionConfig
.
For LLC real-time table, all the stream events are split into several stream partitions, and the events from each stream partition are consumed by a single server. Because the data is always partitioned, the LLC real-time table is using Partitioned Replica-Group Instance Assignment implicitly with numPartitions
the same as the number of stream partitions, and numInstancesPerPartition
of 1, and we don't allow configuring them explicitly. The replica-group based instance assignment can still be configured explicitly.
Without explicitly configuring the replica-group based instance assignment, the replicas of the stream partitions will be evenly spread over all the available instances as shown in the following diagram:
With replica-group based instance assignment, the stream partitions will be evenly spread over the instances within the replica-group:
This strategy is designed for accelerating the no-downtime rolling restart of the large shared cluster.
For example, suppose we have a cluster with 100 servers hosting hundreds of tables, each table has 2 replicas. Without organizing the segments, in order to keep no-downtime (at least 1 replica for each table has to be alive) for the cluster, only one server can be shut down at the same time, or there is a very high chance that both replicas of some segments are served on the down servers, which causes down time for the segment. Rolling restart servers one by one could take a very long time (even days) for a large cluster with petabytes of data. Pool-Based Instance Assignment is introduced to help organize the segments so that each time multiple servers can be restarted at the same time without bringing down any segment.
To use the Pool-Based Instance Assignment, each server should be assigned to a pool under the tag via the Helix InstanceConfig as shown below. Then the strategy can be configured by enabling the poolBased
in the InstanceAssignmentConfig. All the tables in this cluster should use the Replica-Group Instance Assignment, and Pinot will assign servers from different pools to each replica-group of the table. It is guaranteed that servers within one pool only host one replica of any table, and it is okay to shut down all servers within one pool without bringing down any table. This can significantly reduce the deploy time of the cluster, where the 100 servers for the above example can be restarted in 2 rounds (less than an hour) instead of 100 rounds (days).
(Note: A table can have more replicas than the number of pools for the cluster, in which case the replica-group will be assigned to the pools in a round-robin fashion, and the servers within a pool can host more than one replicas of the table. It is still okay to shut down the whole pool without bringing down the table because there are other replicas hosted by servers from other pools.)
In order to use Partitioned Replica-Group Segment Assignment, partitionColumn
is required in replicaGroupPartitionConfig
.
This strategy is to maximize Fault Domain diversity for replica-group based assignment strategy. Specifically, data center and cloud service (e.g. Azure) today provides the idea of rack or fault domain, as to ensure hardware resiliency upon power/network failure.
Specifically, if a table has R replicas and the underlying infrastructure provides F fault domains, then we guarantee that with the Fault-Domain-Aware Instance Assignment algorithm, if a fault domain is down, at most Ceil(R/F) instances from R mirrored machines can go down.
The configuration of this comes in two folds:
Tag the servers of a specific Fault Domain with the same pool ID (see instance config tagging in pool based assignment).
Specify partitionSelector in instanceAssignmentConfigMap to use FD_AWARE_INSTANCE_PARTITION_SELECTOR
Sometimes we don’t have the instance assignment configured in the optimal way in the first shot, or the capacity or requirement of the use case changes and we have to change the strategy. In order to do that, simply apply the table config with the updated InstanceAssignmentConfig, and kick off a rebalance of the table (read more about Rebalance Servers). Pinot will reassign the instances for the table, and also rebalance the segments on the servers without downtime.
For details on how to set up ingestion, refer to:
The Minion merge rollup task lets you merge small segments into larger ones. This helps to improve query performance and disk storage by aggregating data at a courser granularity to reduce the data processed during query execution.
This task is supported for the following use cases:
OFFLINE tables, APPEND only
REALTIME tables, without upsert or dedup
The Minion merge rollup task merges all segments of segment K time buckets
(default 1) from the oldest to the newest records. After processing, the segments are time aligned by bucket.
For example, if the table has hourly records starting with 11-01-2021T13:56:00
, and is configured to use bucket time of 1 day, the Merge rollup task merges the records for the window \[11-01-2021, 11-02-2021)
in the first run, followed by \[11-02-2021, 11-03-2021)
in the next run, followed by \[11-03-2021, 11-04-2021)
in the next run, and so on.
Multi-level merge is supported to apply different compressions for different time ranges. For example, for 24 hours you can retain hourly records of data, rollup data from 1 week ago to 1 day ago into daily granularity, and rollup data older than a week to monthly granularity.
This feature uses the following metadata in Zookeeper:
CustomMap of SegmentZKMetadata: Keeps the mapping of { "MergeRollupTask.mergeLevel" : {mergeLevel} }
. Indicates that the segment is the result of a merge rollup task. Used to skip time buckets that have all merged segments to avoid reprocessing.
MergeRollupTaskMetadata: Stored in the path: MINION\_TASK\_METADATA/MergeRollupTask/{tableNameWithType}
. This metadata keeps the mapping from mergeLevel
to waterMarkMs
. Used to determine when to schedule the next merge rollup task run. The watermark is the start time of current processing buckets. All data before the watermark is merged and time aligned.
Merge rollup task uses SegmentReplacementProtocol
to achieve broker-level atomic swap between the input segments and result segments. Broker refers to the SegmentLineage
metadata to determine which segments should be routed.
This feature uses the pinot-minions and the Helix Task Executor framework, which consists of 2 parts:
MergeRollupTaskGenerator: The minion task scheduler, which schedules tasks of type MergeRollupTask
. This task is scheduled by the controller periodic task, PinotTaskManager
. For each mergeLevel
from the highest to the lowest granularity (hourly -> daily -> monthly):
Time buckets calculation: Starting from the watermark, calculate up to k time buckets that has un-merged segments at best effort. Bump up the watermark if necessary.
Segments scheduling: For each time bucket, select all overlapping segments and create minion tasks.
MergeRollupTaskExecutor: The minion task executor, which executes the MergeRollupTask
generated by the task generator. These tasks are run by the pinot-minion component.
Process segments: Download input segments as indicated in the task config. The segment processor framework partitions the data based on time value and rollup if configured.
Upload segments: Upload output segments with the segment replacement protocol. Once completed, the input segments are ready to be deleted and cleaned up by the retention manager.
Start a pinot-minion.
Set up your OFFLINE table. Add "MergeRollupTask" in the task configs, like this:
(Optional) Add the following advanced configurations as needed:
For detail about these advanced configurations, see the following table:
This metric keeps track of the task delay in the number of time buckets. For example, if we see this number is 7, and the merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of delay. Useful to monitor if the merge task is stuck in production.
This page introduces all the segment assignment strategies, when to use them, and how to configure them.
Segment assignment refers to the strategy of assigning each segment from a table to the servers hosting the table. Picking the best segment assignment strategy can help reduce the overhead of the query routing, thus providing better performance.
Balanced Segment Assignment is the default assignment strategy, where each segment is assigned to the server with the least segments already assigned. With this strategy, each server will have balanced query load, and each query will be routed to all the servers. It requires minimum configuration, and works well for small use cases.
Balanced Segment Assignment is ideal for small use cases with a small number of servers, but as the number of servers increases, routing each query to all the servers could harm the query performance due to the overhead of the increased fanout.
Replica-Group Segment Assignment is introduced to solve the horizontal scalability problem of the large use cases, which makes Pinot linearly scalable. This strategy breaks the servers into multiple replica-groups, where each replica-group contains a full copy of all the segments.
When executing queries, each query will only be routed to the servers within the same replica-group. In order to scale up the cluster, more replica-groups can be added without affecting the fanout of the query, thus not impacting the query performance but increasing the overall throughput linearly.
In order to further increase the query performance, we can reduce the number of segments processed for each query by partitioning the data and use the Partitioned Replica-Group Segment Assignment.
Partitioned Replica-Group Segment Assignment extends the Replica-Group Segment Assignment by assigning the segments from the same partition to the same set of servers. To solve a query which hits only one partition (e.g. SELECT * FROM myTable WHERE memberId = 123
where myTable
is partitioned with memberId
column), the query only needs to be routed to the servers for the targeting partition, which can significantly reduce the number of segments to be processed. This strategy is especially useful to achieve high throughput and low latency for use cases that filter on an id field.
Segment assignment is configured along with the instance assignment, check Instance Assignment for details.
There are multiple different sections in the documentation to help you get started with operating a Pinot cluster. If you are new to Pinot, start with the basics.
To get started with operating a Pinot cluster, first look at the tutorials in Getting Started on how to run a basic pinot cluster in various environments.
You can then proceed to the more advanced Pinot setup in production environment.
Here are some related blog posts from the Apache Pinot community. You can find all of our blog posts on our developer blog on Medium.
This page describes how to rebalance a table
Rebalance operation is used to recompute assignment of brokers or servers in the cluster. This is not a single command, but more of a series of steps that need to be taken.
In case of servers, rebalance operation is used to balance the distribution of the segments amongst the servers being used by a Pinot table. This is typically done after capacity changes, or config changes such as replication or segment assignment strategies.
In case of brokers, rebalance operation is used to recalculate the broker assignment to the tables. This is typically done after capacity changes (scale up/down brokers).
In order to optimize for low latency, we often recommend using high performance SSDs as server nodes. But if such a use case has vast amount of data, and need the high performance only when querying few recent days of data, it might become desirable to keep only the recent time ranges on SSDs, and keep the less frequently queried ones on cheaper nodes such as HDDs.
By storing data separately at different storage tiers, one can keep large amounts of data in Pinot while having control over the cost of the cluster. Usually, the most recent data is recommended to put in storage tier with fast disk access to support real-time analytics queries of low latency and high throughput; and older data in cheaper and slower storage tiers for analytics where higher query latency can be accepted.
Note that separating data storage by age is not about to achieve the compute-storage decoupled architecture for Pinot.
Original design doc:
Issue:
The Pinot managed offline flows feature allows a user to simply set up a REALTIME table, and let Pinot manage populating the OFFLINE table. For complete motivation and reasoning, please refer to the design doc above.
There are 3 kinds of tables in Pinot
OFFLINE only - this feature is not relevant for this mode.
REALTIME only - this feature is built for this mode. While having a real-time-only table setup (versus a hybrid table setup) is certainly lightweight and lesser operations, you lose some of the flexibility that comes with having a corresponding OFFLINE table.
For example, in real-time only mode, it is impossible to backfill a specific day's data, even if you have that data available offline somewhere, whereas you could've easily run a one off backfill job to correct data in an OFFLINE table.
It is also not possible to re-bootstrap the table using some offline data, as data for the REALTIME table strictly must come in through a stream. In OFFLINE tables, it is very easy to run jobs and replace segments in the table.
In REALTIME tables, the data often tends to be highly granular and we achieve very little aggregations. OFFLINE tables let you look at bigger windows of data hence achieving rollups for time column, aggregations across common dimensions, better compression and even dedup.
This feature will automatically manage the movement of the data to a corresponding OFFLINE table, so you don't have to write any offline jobs.
HYBRID table - If you already have a hybrid table this feature again may not be relevant to you. But you could explore using this to replace your offline push jobs, and simply keep them for backfills.
The Pinot managed offline flows feature will move records from the REALTIME table to the OFFLINE table, one time window
at a time. For example, if the REALTIME table has records with timestamp starting 10-24-2020T13:56:00, then the Pinot managed offline flows will move records for the time window [10-24-2020, 10-25-2020) in the first run, followed by [10-25-2020, 10-26-1010) in the next run, followed by [10-26-2020, 10-27-2020) in the next run, and so on. This window length of 1d is just the default, and it can be configured to any length of your choice.
Note
Only completed (ONLINE) segments of the real-time table are used for movement. If the window's data falls into the CONSUMING segment, that run will be skipped. That window will be processed in a future run when all data has made it to the completed segments.
This feature uses the pinot-minions and the Helix Task Executor framework. This feature consists of 2 parts
RealtimeToOfflineSegmentsTaskGenerator - This is the minion task scheduler, which schedules tasks of type "RealtimeToOfflineSegmentsTask". This task is scheduled by the controller periodic task - PinotTaskManager. A watermark is maintained in zookeeper, which is the end time of the time window last successfully processed. The task generator refers to this watermark, to determine the start of the time window, for the next task it generates. The end time is calculated based on the window length (configurable, 1d default). The task generator will find all segments which have data in [start, end), and set it into the task configs, along with the start and end. The generator will not schedule a new task, unless the previous task has COMPLETED (or been stuck for over 24h). This is to ensure that we always move records in sequential time windows (exactly mimicking offline flows), because out-of-order data pushes will mess with the time boundary calculation of the hybrid table.
RealtimeToOfflineSegmentsTaskExecutor - This is a minion task executor to execute the RealtimeToOfflineSegmentsTask generated by the task generator. These tasks are run by the pinot-minion component. The task executor will download all segments from the REALTIME table, as indicated in the task config. Using the SegmentProcessorFramework, it will extract data for [start, end), build the segments, and push them to the OFFLINE table. The segment processor framework will do any required partitioning & sorting based on the OFFLINE table config. Before exiting from the task, it will update the watermark in zookeeper, to reflect the end time of the time window processed.
Step 0: Start a pinot-minion
Step 1: Set up your REALTIME table. Add "RealtimeToOfflineSegmentsTask" in the task configs
Step 2: Create the corresponding OFFLINE table
Step 3: Enable PinotTaskManager
Step 4: Advanced configs
If needed, you can add more configs to the task configs in the REALTIME table, such as
where,
The following properties are deprecated/removed in release 0.8.0
collectorType (deprecated): Replaced by mergeType
Once the time window has moved forward, it will never be processed again. If some data arrives into your stream after the window has moved on, that data will never be processed. Set the "bufferTimePeriod" accordingly, to account for late data issues in your setup. We will potentially consider ability to schedule ad hoc one-off tasks. For example, user can specify "rerun for day 10/23", which would sweep all segments again and collect data, replacing the old segments. This will help resolve the problem of data arriving very late.
This feature automates the daily/hourly pushes to the offline counterpart of your hybrid table. And since you now have an OFFLINE table created, it opens up the possibility of doing an ad hoc backfill or re-bootstrap. However, there are no mechanisms for doing an automated backfill/re-bootstrap from some offline data. You still have to write your own flows for such scenarios.
The segments download, data extraction, transformation, aggregations, sorting all happens on a single minion node for every run. You will need to be mindful of the memory available on the minion machine. Adjust the bucketSize
and maxNumRecordsPerSegment
if you are running into memory issues.
We will potentially introduce smarter config adjustments based on memory, or consider using Spark/Hadoop MR.
\
This section provides information on various options to tune Pinot cluster for storage and query efficiency. Unlike key-value store, tuning Pinot sometimes can be tricky because the cost of query can vary depending on the workload and data characteristics.
If you want to improve query latency for your use case, you can refer to Index Techniques
section. If your use case faces the scalability issue after tuning index, you can refer Optimizing Scatter and Gather
for improving query throughput for Pinot cluster. If you have identified a performance issue on the specific component (broker or server), you can refer to the Tuning Broker
or Tuning Server
section.
Enable PinotTaskManager (disabled by default) by adding the controller.task
properties below to your , and then restart the controller (required).
Property | Description | Default |
---|
Original design doc:
Issue:
The PinotTaskManager periodic task is disabled by default. Enable this using one of the 2 methods described in section. Set the frequency to some reasonable value (frequently is better, as extra tasks will not be scheduled unless required). Controller will need a restart after setting this config.
Property | Description | Default |
---|
timeColumnTransformFunction (removed): Use or roundBucketTimePeriod instead