Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
There are multiple different sections in the documentation to help you get started with operating a Pinot cluster. If you are new to Pinot, please start with the basics.
To get started with operating a Pinot cluster, first please look at the tutorials in Getting Started on how to run a basic pinot cluster in various environments.
You can then proceed to the more advanced Pinot setup in production environment.
Here are some related blog posts from the Apache Pinot community. You can find all of our blog posts on our developer blog on Medium.
For more details on how to setup a table, refer to Creating a table
For Real Time Pinot tables
In case of RealTime Pinot tables, whenever a Pinot server finishes consuming a segment, it goes through a segment completion protocol sequence. The default approach is to upload this segment to the lead Pinot controller which in turn will persist it in the segment store (eg: NFS, S3 or HDFS). As a result, since all the realtime segments flow through the controller, it can become a bottleneck and slow down the overall ingestion rate. To overcome this limitation, we've added a new policy which allows bypassing the controller in the segment completion protocol. This is internally named as "Peer Download policy".
When this is enabled, the Pinot servers will attempt to upload the completed segment to the segment store directly, thus by-passing the controller. Once this is finished, it will update the controller with the corresponding segment metadata. The reason this policy is named peer download
is because if the segment store is unavailable for whatever reason, the corresponding segments can still be downloaded directly from the Pinot servers.
Please Note: This is available in the latest master (not in 0.5.0 release)
This scheme only works for real-time tables using the Low Level Consumer (LLC) mode. The changes needed are as follows:
Add the following things to the Controller Config
Add the following things to the server config
Here URI of segment store should point to the desired full path in the corresponding segment store with both filesystem scheme and path (eg: file://dir
or hdfs://path
or s3://path
)
Replace the last field (i.e., scheme) of pinot.server.storage.factory.class.(scheme) with the corresponding scheme (e.g., hdfs, s3 or gcs) of the segment store URI configured above. Then put the PinotFS subclass for the scheme as the config value.
Add the following things to the real-time segments config:
In this case, the peerSegmentDownloadScheme
can be either http
or https
.
Enabling peer download may incur LLC segments failed to be uploaded to segment store in some failure cases, e.g. segment store is unavailable during segment completion. Add the following controller config to enable the upload retry by a controller periodic job asynchronously.
To setup a Pinot cluster, follow these steps
Start Controller instances
Start Broker instances
Start Server instances
For more details on how to setup ingestion, refer to
Tiered storage allows you to split your server storage into multiple tiers. All the tiers can use different filesystem to hold the data. Tiered storage can be used to optimise the cost to latency tradeoff in production Pinot systems.
Some example scenarios in which tiered storage can be used -
Tables with very long retention (more than 2 years) but most frequently queries are performed on the recent data.
Reduce storage cost for older data while tolerating slightly higher latencies In order to optimize for low latency, we often recommend using high performance SSDs. But if such a use case has 2 years of data, and need the high performance only when querying 1 month of data, it might become desirable to keep only the recent time ranges on SSDs, and keep the less frequently queried ones on cheaper nodes such as HDDs or a DFS such as S3.
The data age based tiers is just one of the examples. The logic to split data into tiers may change depending on the use case.
You can configured tiered storage by setting the tieredConfigs
key in your table config json.
In this example, the table uses servers tagged with base_OFFLINE
. We have created two tiers of Pinot servers, tagged with tier_a_OFFLINE
and tier_b_OFFLINE
. Segments older than 7 days will move from base_OFFLINE
to tier_a_OFFLINE
, and segments older than 15 days will move to tier_b_OFFLINE
.
Following properties are supported under tierConfigs
-
On adding tier config, a periodic task on the pinot-controller called "SegmentRelocator" will move segments from one tenant to another, as and when the segment crosses the segment age.
This periodic task runs every hour by default. You can configure this frequency by setting the config with any period string (60s, 2h, 5d)
This job can also be triggered manually
Under the hood, this job runs a rebalance. So you can achieve the same effect as a manual trigger by running a rebalance
name
Name of the tier. Every tier in the tierConfigs list must have a unique name
segmentSelectorType
The strategy used for selecting segments for tiers. The only supported strategy as of now is time
, which will pick segments based on segment age. In future, we expect to have strategies like column_value
, etc
segmentAge
This property is required when segmentSelectorType
is time
. Set a period string, eg. 15d, 24h, 60m. Segments which are older than the age will be moved to the the specific tier
storageType
The type of storage. The only supported type is pinot_server
, which will use Pinot servers as storage for the tier. In future, we expect to have some deep store modes here
serverTag
This property is required when storageType
is pinot_server
. Set the tag of the Pinot servers you wish to use for this tier.
Access control can be setup at various points in Pinot, such as controller endpoints and broker query endpoints. By default we will use AllowAllAccessFactory and hence not be enforcing any access controls. You can add access control by implementing the AccessControlFactory interface.
The access control factory can be configured in the controller configs by setting the fully qualified class name of the AccessControlFactory in the property controller.admin.access.control.factory.class
The access control factory can be configured in the broker configs by setting the fully qualified class name of the AccessControlFactory in the property pinot.broker.access.control.class
. Any other properties required for initializing the factory can be set in the broker configs as properties with the prefix pinot.broker.access.control
.
Rebalance operation is used to recompute assignment of brokers or servers in the cluster. This is not a single command, but more of a series of steps that need to be taken.
In case of brokers, rebalance operation is used to recalculate the broker assignment to the tables. This is typically done after capacity changes.
These are typically done when downsizing/uplifting a cluster, or replacing nodes of a cluster.
Every broker added to the Pinot cluster, has tags associated with it. A group of brokers with the same tag forms a Broker Tenant. By default, a broker in the cluster gets added to the DefaultTenant
i.e. gets tagged as DefaultTenant_BROKER
. Below is an example of how this tag looks in the znode, as seen in ZooInspector.
A Pinot table config has a tenants section, to define the tenant to be used by the table. More details about this in the Tenants section.
Using the tenant defined above, a mapping is created, from table name to brokers and stored in the IDEALSTATES/brokerResource
. This mapping can be used by external services that need to pick a broker for querying.
If you want to scale up brokers, add new brokers to the cluster, and then tag them based on the tenant used by the table. If you're using DefaultTenant
, no tagging needs to be done, as every broker node by default joins with tag DefaultTenant_BROKER
.
If you want to scale down brokers, untag the brokers you wish to remove.
To update the tags on the broker, use the following API:
PUT /instances/{instanceName}/updateTags?tags=<comma separated tags>
Example for tagging the broker as per your custom tenant:
PUT /instances/Broker_10.20.151.8_8000/updateTags?tags=customTenant_BROKER
Example for untagging a broker:
PUT /instances/Broker_10.20.151.8_8000/updateTags?tags=untagged_BROKER
After making any capacity changes to the broker, the brokerResource needs to be rebuilt. This can be done with the below API:
POST /tables/{tableNameWithType}/rebuildBrokerResourceFromHelixTags
This is when you untagged and now want to remove the node from the cluster.
First, shutdown the broker. Then, use API below to remove the node from the cluster.
DELETE /instances/{instanceName}
If you encounter the below message when dropping, it means the broker process hasn't been shut down.
If you encounter below message, it means the broker has not been removed from the ideal state. Check the untagging and rebuild steps went through successfully.
Rebalance operation is used to recompute assignment of brokers or servers in the cluster. This is not a single command, but more of a series of steps that need to be taken.
In case of servers, rebalance operation is used to balance the distribution of the segments amongst the servers being used by a Pinot table. This is typically done after capacity changes, or config changes such as replication or segment assignment strategies or table migration to a different tenant.
Here's some common scenarios where the changes need to be followed by a rebalance.
Capacity changes
Increasing/decreasing replication for a table
Changing segment assignment for a table
Moving table from one tenant to a different tenant
These are typically done when downsizing/uplifting a cluster, or replacing nodes of a cluster.
Every server added to the Pinot cluster, has tags associated with it. A group of servers with the same tag forms a Server Tenant. By default, a server in the cluster gets added to the DefaultTenant
i.e. gets tagged as DefaultTenant_OFFLINE
and DefaultTenant_REALTIME
. Below is an example of how this looks in the znode, as seen in ZooInspector.
A Pinot table config has a tenants section, to define the tenant to be used by the table. The Pinot table will use all the servers which belong to the tenant as described in this config. More details about this in the Tenants section.
Using master or 0.6.0 onwards
In order to change the server tags, the following API can be used.
PUT /instances/{instanceName}/updateTags?tags=<comma separated tags>
0.5.0 and prior
UpdateTags API is not available in 0.5.0 and prior. Instead use this API to update the Instance.
PUT /instances/{instanceName}
For example,
NOTE
The output of GET and input of PUT don't match for this API. Please make sure to use the right payload as shown in example above. Particularly, notice that instance name "Server_host_port" gets split up into their own fields in this PUT API.
In order to make change to the replication factor of a table, update the table config as follows
OFFLINE table - update the replication
field
REALTIME table - update the replicasPerPartition
field
The most common segment assignment change would be to move from the default segment assignment to replica group segment assignment. Discussing the details of the segment assignment is beyond the scope of this page. More details can be found in Routing and in this FAQ question.
In a scenario where you need to move table across tenants, for e.g table was assigned earlier to a different Pinot tenant and now you want to move it to a separate one, then you need to call the rebalance API with reassignInstances set to true.
After any of the above described changes are done, a rebalance is needed to make those changes take effect.
To run a rebalance, use the following API.
POST /tables/{tableName}/rebalance?type=<OFFLINE/REALTIME>
This API has a lot of knobs to control various behaviors. Make sure to go over them and change the defaults as needed.
Note
Typically, the flags that need to be changed from defaults are
includeConsuming=true for REALTIME
downtime=true if you have only 1 replica, or prefer faster rebalance at the cost of a momentary downtime
You can check the status of the rebalance by
Checking the controller logs
Running rebalance again after a while, you should receive status "status": "NO_OP"
Checking the External View of the table, to see the changes in capacity/replicas have taken effect.
This page introduces all the instance assignment strategies, when to use them, and how to configure them
Instance Assignment means the strategy of assigning the servers to host a table. Each instance assignment strategy is associated with one segment assignment strategy (read more about Segment Assignment).
Instance assignment is configured via the InstanceAssignmentConfig. Based on the config, Pinot can assign servers to a table, then assign segments to servers using the segment assignment strategy associated with the instance assignment strategy.
There are 3 types of instances for the InstanceAssignmentConfig: OFFLINE
, CONSUMING
and COMPLETED
. OFFLINE
represents the instances hosting the segments for the offline table; CONSUMING
represents the instances hosting the consuming segments for the real-time table; COMPLETED
represents the instances hosting the completed segments for the real-time table. For real-time table, if COMPLETED
instances are not configured, completed segments will use the same instance assignment strategy as the consuming segments. If it is configured, completed segments will be automatically moved to the COMPLETED
instances periodically.
The default instance assignment strategy simply assigns all the servers in the cluster to each table, and uses the Balanced Segment Assignment for the table. This strategy requires no extra configurations for the cluster, and it works well for small clusters with few tables where all the resources can be shared among all the tables.
For performance critical use cases, we might not want to share the server resources for multiple use cases to prevent the use case being impacted by other use cases hosted on the same set of servers. We can use the Tag-Based Instance Assignment to achieve isolation for tables.
(Note: Logically the Tag-Based Instance Assignment is identical to the Tenant concept in Pinot, but just a different way of configuring the table. We recommend using the instance assignment over the tenant config because it can achieve more complex assignment strategies, as described below.)
In order to use the Tag-Based Instance Assignment, the servers should be tagged via the Helix InstanceConfig, where the tag suffix (_OFFLINE
or _REALTIME
) denotes the type of table the server is going to serve. Each server can have multiple tags if necessary.
After configuring the server tags, the Tag-Based Instance Assignment can be enabled by setting the tag
within the InstanceAssignmentConfig for the table as shown below. Only the servers with this tag will be assigned to host this table, and the table will use the Balanced Segment Assignment.
On top of the Tag-Based Instance Assignment, we can also control the number of servers assigned to each table by configuring the numInstances
in the InstanceAssignmentConfig. This is useful when we want to serve multiple tables of different sizes on the same set of servers. For example, suppose we have 30 servers hosting hundreds of tables for different analytics, we don’t want to use all 30 servers for each table, especially the tiny tables with only megabytes of data.
In order to use the Replica-Group Segment Assignment, the servers need to be assigned to multiple replica-groups of the table, where the Replica-Group Instance Assignment comes into the picture. Enable it and configure the numReplicaGroups
and numInstancesPerReplicaGroup
in the InstanceAssignmentConfig, and Pinot will assign the instances accordingly.
Similar to the Replica-Group Segment Assignment, in order to use the Partitioned Replica-Group Segment Assignment, servers not only need to be assigned to each replica-group, but also the partition within the replica-group. Adding the numPartitions
and numInstancesPerPartition
in the InstanceAssignmentConfig can fulfill the requirement.
(Note: The numPartitions
configured here does not have to match the actual number of partitions for the table in case the partitions of the table changed for some reason. If they do not match, the table partition will be assigned to the server partition in a round-robin fashion. For example, if there are 2 server partitions, but 4 table partitions, table partition 1 and 3 will be assigned to server partition 1, and table partition 2 and 4 will be assigned to server partition 2.)
In order to use Partitioned Replica-Group Segment Assignment, replicaGroupStrategyConfig
is required.
For LLC real-time table, all the stream events are split into several stream partitions, and the events from each stream partition are consumed by a single server. Because the data is always partitioned, the LLC real-time table is using Partitioned Replica-Group Instance Assignment implicitly with numPartitions
the same as the number of stream partitions, and numInstancesPerPartition
of 1, and we don't allow configuring them explicitly. The replica-group based instance assignment can still be configured explicitly.
Without explicitly configuring the replica-group based instance assignment, the replicas of the stream partitions will be evenly spread over all the available instances as shown in the following diagram:
With replica-group based instance assignment, the stream partitions will be evenly spread over the instances within the replica-group:
This strategy is designed for accelerating the no-downtime rolling restart of the large shared cluster.
For example, suppose we have a cluster with 100 servers hosting hundreds of tables, each table has 2 replicas. Without organizing the segments, in order to keep no-downtime (at least 1 replica for each table has to be alive) for the cluster, only one server can be shut down at the same time, or there is a very high chance that both replicas of some segments are served on the down servers, which causes down time for the segment. Rolling restart servers one by one could take a very long time (even days) for a large cluster with petabytes of data. Pool-Based Instance Assignment is introduced to help organize the segments so that each time multiple servers can be restarted at the same time without bringing down any segment.
To use the Pool-Based Instance Assignment, each server should be assigned to a pool under the tag via the Helix InstanceConfig as shown below. Then the strategy can be configured by enabling the poolBased
in the InstanceAssignmentConfig. All the tables in this cluster should use the Replica-Group Instance Assignment, and Pinot will assign servers from different pools to each replica-group of the table. It is guaranteed that servers within one pool only host one replica of any table, and it is okay to shut down all servers within one pool without bringing down any table. This can significantly reduce the deploy time of the cluster, where the 100 servers for the above example can be restarted in 2 rounds (less than an hour) instead of 100 rounds (days).
(Note: A table can have more replicas than the number of pools for the cluster, in which case the replica-group will be assigned to the pools in a round-robin fashion, and the servers within a pool can host more than one replicas of the table. It is still okay to shut down the whole pool without bringing down the table because there are other replicas hosted by servers from other pools.)
In order to use Partitioned Replica-Group Segment Assignment, replicaGroupStrategyConfig
is required.
Sometimes we don’t have the instance assignment configured in the optimal way in the first shot, or the capacity or requirement of the use case changes and we have to change the strategy. In order to do that, simply apply the table config with the updated InstanceAssignmentConfig, and kick off a rebalance of the table (read more about Rebalance Servers). Pinot will reassign the instances for the table, and also rebalance the segments on the servers without downtime.
If you are deploying using the helm chart with Kubernetes, see the tutorial on setting up Prometheus and Grafana to monitor Pinot.
Please refer to key metrics documented in .
Pinot uses to collect metrics within our application components. These metrics can be published to a metrics server with the help of interface. By default, metrics are published to JMX using the .
You can write a listener to publish metrics to another metrics server by implementing the MetricsRegistryRegistrationListener
interface. This listener can be injected into the controller by setting the fully qualified name of the class in the controller configs for the property pinot.controller.metrics.metricsRegistryRegistrationListeners
.
You would have to design your own systems to view and monitor these metrics. A list of all the metrics published for each component can be found in:
This will expose a port at 8080 to dump metrics as Prometheus format for Prometheus scrapper to fetch.
Original design doc:
Issue:
The Pinot managed offline flows feature allows a user to simply setup a REALTIME table, and let Pinot manage populating the OFFLINE table. For complete motivation and reasoning, please refer to the design doc above.
There are 3 kinds of tables in Pinot
OFFLINE only - this feature is not relevant for this mode.
REALTIME only - this feature is built for this mode. While having a realtime-only table setup (versus a hybrid table setup) is certainly lightweight and lesser operations, you lose some of the flexibility that comes with having a corresponding OFFLINE table.
For example, in realtime only mode, it is impossible to backfill a specific day's data, even if you have that data available offline somewhere, whereas you could've easily run a one off backfill job to correct data in an OFFLINE table.
It is also not possible to re-bootstrap the table using some offline data, as data for the REALTIME table strictly must come in through a stream. In OFFLINE tables, it is very easy to run jobs and replace segments in the table.
In REALTIME tables, the data often tends to be highly granular and we achieve very little aggregations. OFFLINE tables let you look at bigger windows of data hence achieving rollups for time column, aggregations across common dimensions, better compression and even dedup.
This feature will automatically manage the movement of the data to a corresponding OFFLINE table, so you don't have to write any offline jobs.
HYBRID table - If you already have a hybrid table this feature again may not be relevant to you. But you could explore using this to replace your offline push jobs, and simply keep them for backfills.
The Pinot managed offline flows feature will move records from the REALTIME table to the OFFLINE table, one time window
at a time. For example, if the REALTIME table has records with timestamp starting 10-24-2020T13:56:00, then the Pinot managed offline flows will move records for the time window [10-24-2020, 10-25-2020) in the first run, followed by [10-25-2020, 10-26-1010) in the next run, followed by [10-26-2020, 10-27-2020) in the next run, and so on. This window length of 1d is just the default, and it can be configured to any length of your choice.
This feature uses the pinot-minions and the Helix Task Executor framework. This feature consists of 2 parts
RealtimeToOfflineSegmentsTaskGenerator - This is the minion task scheduler, which schedules tasks of type "RealtimeToOfflineSegmentsTask". This task is scheduled by the controller periodic task - PinotTaskManager. A watermark is maintained in zookeeper, which is the end time of the time window last successfully processed. The task generator refers to this watermark, to determine the start of the time window, for the next task it generates. The end time is calculated based on the window length (configurable, 1d default). The task generator will find all segments which have data in [start, end), and set it into the task configs, along with the start and end. The generator will not schedule a new task, unless the previous task has COMPLETED (or been stuck for over 24h). This is to ensure that we always move records in sequential time windows (exactly mimicking offline flows), because out-of-order data pushes will mess with the time boundary calculation of the hybrid table.
RealtimeToOfflineSegmentsTaskExecutor - This is a minion task executor to execute the RealtimeToOfflineSegmentsTask generated by the task generator. These tasks are run by the pinot-minion component. The task executor will download all segments from the REALTIME table, as indicated in the task config. Using the SegmentProcessorFramework, it will extract data for [start, end), build the segments, and push them to the OFFLINE table. The segment processor framework will do any required partitioning & sorting based on the OFFLINE table config. Before exiting from the task, it will update the watermark in zookeeper, to reflect the end time of the time window processed.
Step 0: Start a pinot-minion
Step 1: Setup your REALTIME table. Add "RealtimeToOfflineSegmentsTask" in the task configs
Step 2: Create the corresponding OFFLINE table
Step 3: Enable PinotTaskManager
Step 4: Advanced configs
If needed, you can add more configs to the task configs in the REALTIME table, such as
where,
The following properties are deprecated/removed in release 0.8.0
collectorType (deprecated): Replaced by mergeType
Once the time window has moved forward, it will never be processed again. If some data arrives into your stream after the window has moved on, that data will never be processed. Set the "bufferTimePeriod" accordingly, to account for late data issues in your setup. We will potentially consider ability to schedule ad-hoc one-off tasks. For example, user can specify "rerun for day 10/23", which would sweep all segments again and collect data, replacing the old segments. This will help resolve the problem of data arriving very late.
This feature automates the daily/hourly pushes to the offline counterpart of your hybrid table. And since you now have an OFFLINE table created, it opens up the possibility of doing an ad-hoc backfill or re-bootstrap. However, there are no mechanisms for doing an automated backfill/re-bootstrap from some offline data. You still have to write your own flows for such scenarios.
The segments download, data extraction, transformation, aggregations, sorting all happens on a single minion node for every run. You will need to be mindful of the memory available on the minion machine. Adjust the bucketSize
and maxNumRecordsPerSegment
if you are running into memory issues.
We will potentially introduce smarter config adjustments based on memory, or consider using Spark/Hadoop MR.
This section provides information on various options to tune Pinot cluster for storage and query efficiency. Unlike Key-Value store, tuning Pinot sometimes can be tricky because the cost of query can vary depending on the workload and data characteristics.
If you want to improve query latency for your use case, you can refer to Index Techniques
section. If your use case faces the scalability issue after tuning index, you can refer Optimizing Scatter and Gather
for improving query throughput for Pinot cluster. If you have identified a performance issue on the specific component (broker or server), you can refer to the Tuning Broker
or Tuning Server
section.
This page introduces all the segment assignment strategies, when to use them, and how to configure them
Segment assignment means the strategy of assigning each segment from a table to the servers hosting the table. Picking the best segment assignment strategy can help reduce the overhead of the query routing, thus providing better performance.
Balanced Segment Assignment is the default assignment strategy, where each segment is assigned to the server with the least segments already assigned. With this strategy, each server will have balanced query load, and each query will be routed to all the servers. It requires minimum configuration, and works well for small use cases.
Balanced Segment Assignment is ideal for small use cases with a small number of servers, but as the number of servers increases, routing each query to all the servers could harm the query performance due to the overhead of the increased fanout.
Replica-Group Segment Assignment is introduced to solve the horizontal scalability problem of the large use cases, which makes Pinot linearly scalable. This strategy breaks the servers into multiple replica-groups, where each replica-group contains a full copy of all the segments.
When executing queries, each query will only be routed to the servers within the same replica-group. In order to scale up the cluster, more replica-groups can be added without affecting the fanout of the query, thus not impacting the query performance but increasing the overall throughput linearly.
In order to further increase the query performance, we can reduce the number of segments processed for each query by partitioning the data and use the Partitioned Replica-Group Segment Assignment.
Partitioned Replica-Group Segment Assignment extends the Replica-Group Segment Assignment by assigning the segments from the same partition to the same set of servers. To solve a query which hits only one partition (e.g. SELECT * FROM myTable WHERE memberId = 123
where myTable
is partitioned with memberId
column), the query only needs to be routed to the servers for the targeting partition, which can significantly reduce the number of segments to be processed. This strategy is especially useful to achieve high throughput and low latency for use cases that filter on an id field.
Original design doc:
Issue:
The Minion merge/rollup task allows a user to merge small segments into larger ones, through which Pinot can potentially benefit from improved disk storage and the query performance. For complete motivation and reasoning, please refer to the design doc above. Currently, we only support OFFLINE table APPEND use cases.
The Pinot merge/rollup task will merge segments, k time buckets
(configurable, default 1) at best effort at a time from the oldest to the newest records. After processing, the segments will be time aligned according to the bucket. For example, if the table has hourly records starting 11-01-2021T13:56:00, and is configured to use bucket time of 1 day, then the merge/rollup task will merge the records for the window [11-01-2021, 11-02-2021) in the first run, followed by [11-02-2021, 11-03-2021) in the next run, followed by [11-03-2021, 11-04-2021) in the next run, and so on.
Multi-level merge is also allowed to achieve different compressions for different time ranges. For example, if the table has hourly records, we can keep them as is for the last day, rollup the data to daily granularity from 1 week ago to 1 day ago, rollup the data before 1 week to monthly granularity.
This feature uses the following metadata in zookeeper:
CustomMap of SegmentZKMetadata keeps the mapping of { "MergeRollupTask.mergeLevel" : {mergeLevel} }. This field indicates that the segment is the result of merge/rollup task. This field is used to skip time buckets that have all merged segments to avoid reprocessing.
MergeRollupTaskMetadata stored in the path: MINION_TASK_METADATA/MergeRollupTask/{tableNameWithType}. This metadata keeps the mapping from mergeLevel to waterMarkMs. The watermark is the start time of current processing buckets. All data before the watermark are merged, time aligned and need to use new backfill approaches (not supported yet). This metadata is useful to determine the next scheduling buckets.
Merge/rollup task uses SegmentReplacementProtocol to achieve Broker level atomic swap between the input segments and result segments. Broker refers to the SegmentLineage metadata to determine which segments should be routed.
This feature uses the pinot-minions and the Helix Task Executor framework. It consists of 2 parts:
MergeRollupTaskGenerator - This is the minion task scheduler, which schedules tasks of type "MergeRollupTask". This task is scheduled by the controller periodic task - PinotTaskManager.
For each mergeLevel
from the highest to the lowest granualrity (hourly -> daily -> monthly):
Time buckets calculation - Starting from the watermark, calculate up to k time buckets that has un-merged segments at best effort. Bump up the watermark if necessary.
Segments scheduling - For each time bucket, select all overlapping segments and create minion tasks.
MergeRollupTaskExecutor - This is a minion task executor to execute the MergeRollupTask generated by the task generator. These tasks are run by the pinot-minion component.
Process segments - Download input segments as indicated in the task config. The segment processor framework will partition the data based on time value and rollup if configured.
Upload segments - Upload output segments with the segment replacement protocol. Once completed, the input segments are ready to be deleted, and will be cleaned up by the retention manager.
Step 0: Start a pinot-minion
Step 1: Setup your OFFLINE table. Add "MergeRollupTask" in the task configs
Step 2: Enable PinotTaskManager
The PinotTaskManager periodic task is disabled by default. Enable it by adding this property to your controller conf. Set the frequency to some reasonable value (frequently is better, as extra tasks will not be scheduled unless required). The controller will need a restart after setting this config.
Step 3: Advanced configs
If needed, add more configs such as
where,
This metric keeps track of the task delay in the number of time buckets. For example, if we see this number to be 7, and the merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of delay. It's useful to monitor if the merge task stuck in production.
If we can apply the feature to REALTIME tables, users can potential use long retention REALTIME tables instead of HYBRID tables for convenience. To add the support, we need to allow segment upload for realtime tables and handle potential corner cases.
Currently, Pinot data backfill is at segment level (replace segments with the same names), but the output segments have different names compared to the original segments. We need to introduce a new way to backfill the processed data, one potential approach:
Introduce a new API to get the list of segments for a given time window.
Use segment replacement protocol to swap the group of segments with the backfill ones.
This page describes how to rebalance a table
Rebalance operation is used to recompute assignment of brokers or servers in the cluster. This is not a single command, but more of a series of steps that need to be taken.
In case of servers, rebalance operation is used to balance the distribution of the segments amongst the servers being used by a Pinot table. This is typically done after capacity changes, or config changes such as replication or segment assignment strategies.
In case of brokers, rebalance operation is used to recalculate the broker assignment to the tables. This is typically done after capacity changes (scale up/down brokers).
Metrics published to JMX could also be exposed to Prometheus through tooling like .
To run as a javaagent, and run:
The PinotTaskManager periodic task is disabled by default. Enable this using one of the 2 methods described in section. Set the frequency to some reasonable value (frequently is better, as extra tasks will not be scheduled unless required). Controller will need a restart after setting this config.
timeColumnTransformFunction (removed): Use or roundBucketTimePeriod instead
Segment assignment is configured along with the instance assignment, check for details.
Property | Description | Default |
---|
Query param
Default value
Description
dryRun
false
If set to true, rebalance is run as a dry-run so that you can see the expected changes to the ideal state and instance partition assignment.
includeConsuming
false
Applicable for REALTIME tables. CONSUMING segments are rebalanced only if this is set to true. Moving a CONSUMING segment involves dropping the data consumed so far on old server, and re-consuming on the new server. If an application is sensitive to increased memory utilization due to re-consumption or to a momentary data staleness, they may choose to not include consuming in the rebalance. Whenever the CONSUMING segment completes, the completed segment will be assigned to the right instances, and the new CONSUMING segment will also be started on the correct instances. If you choose to includeConsuming=false and let the segments move later on, any downsized nodes need to remain untagged in the cluster, until the segment completion happens.
downtime
false
This controls whether Pinot allows downtime while rebalancing. If downtime = true, all replicas of a segment can be moved around in one go, which could result in a momentary downtime for that segment (time gap between ideal state updated to new servers and new servers downloading the segments). If downtime = false, Pinot will make sure to keep certain number of replicas (config in next row) always up. The rebalance will be done in multiple iterations under the hood, in order to fulfill this constraint.
Note: If you have only 1 replica for your table, rebalance with downtime=false is not possible.
minAvailableReplicas
1
Applicable for rebalance with downtime=false.
This is the minimum number of replicas that are expected to stay alive through the rebalance.
bestEfforts
false
Applicable for rebalance with downtime=false.
If a no-downtime rebalance cannot be performed successfully, this flag controls whether to fail the rebalance or do a best-effort rebalance.
reassignInstances
false
Applicable to tables where the instance assignment has been persisted to zookeeper. Setting this to true will make the rebalance first update the instance assignment, and then rebalance the segments.
bootstrap
false
Rebalances all segments again, as if adding segments to an empty table. If this is false, then the rebalance will try to minimize segment movements.
Property | Description | Default |
bucketTimePeriod | Time window size for each run. Adjust this to change the time window. E.g. if set to 1h, each task will process 1h data at a time. | 1d |
bufferTimePeriod | Buffer time. Will not schedule tasks unless time window is older than this buffer. Configure this according to how late you expect your data. E.g. if your system can emit events later than 3d, set this to 3d to make sure those are included. Note: Once a given time window has been processed, it will never be processed again. | 2d |
roundBucketTimePeriod (supported since release | Round the time value before merging the rows. This is useful if time column is highly granular in the REALTIME table and is not needed by the application. In the OFFLINE table you can rollup the time values (e.g. milliseconds granularity in REALTIME table, but okay with minute level granularity in the application - set to | None |
mergeType (supported since release | Allowed values are concat - no aggregations rollup - perform metrics aggregations across common dimensions + time dedup - deduplicates rows with the same values | concat |
{metricName}.aggregationType | Aggregation function to apply to the metric for aggregations. Only applicable for | sum |
maxNumRecordsPerSegment | Control the number of records you want in a segment generated. Useful if the time window has many records, but you don't want them all in the same segment. | 5,000,000 |
mergeType | Allowed values are concat - no aggregations rollup - perform metrics aggregations across common dimensions + time | concat |
bucketTimePeriod | Time bucket size. Adjust this to change the time bucket. E.g. if set to 1h, the output segments will have records in 1 hour range. | None |
bufferTimePeriod | Buffer time. Will not schedule tasks unless time bucket is older than this buffer. Configure this according to how late you expect your data. E.g. if your system can emit events later than 3d, set this to 3d to make sure those are included. Note: Once a given time window has been processed, it will never be processed again. | None |
roundBucketTimePeriod | Round the time value before merging the rows. This is useful if time column is highly granular than needed, you can rollup the time values (e.g. milliseconds granularity in the original data, but okay with minute level granularity in the application - set to | None |
{metricName}.aggregationType | Aggregation function to apply to the metric for aggregations. Only applicable for | sum |
maxNumRecordsPerSegment | Control the number of records you want in a segment generated. Useful if the time bucket has many records, but you don't want them all in the same segment. | 5,000,000 |
maxNumRecordsPerTask | Control single task workload. Useful to protect minion from overloading by a single task. | 50,000,000 |
maxNumParallelBuckets | Control number of processing buckets per run. Useful to speed up the task scheduling for bootstrapping. E.g. if set to 10, the task generator will schedule 10 buckets per run. | 1 |
This page describes the pinot cross-release compatibility test suite.
Pinot has unit and integration tests that verify that the system can work well as long as all components are in the same version. Further, each PR goes through reviews in which Pinot committers can decide whether a PR may break compatibility, and if so, how it can be avoided. Even with all this, it is useful to be able to test an upgrade before actually subjecting a live installation to upgrades.
Pinot has multiple components that run independently of each other. Therefore upgrading a mission-critical pinot cluster will result in scenarios where one component is running an old version and the other a new version of Pinot. It can also happen that this state (of multiple versions) is in place for days together. Or, we may need to revert the upgrade process (usually done in reverse order) -- possibly due to reasons outside of Pinot.
Pinot is highly configurable, so it is possible that there are few installations that use the same combination of configuration options as any one site does. Therefore, it may be that a defect or incompatibility exists with that particular combination of configurations, and went undetected in reviews.
In practice, installations upgrade their deployments to newer versions periodically, or when an urgent bug-fix is needed, or when a new release is published. It is also possible that an installation has not upgraded Pinot for a long time. Either way, it is usually the case that installations will pull in a lot more new/modified software than the feature or bug fix they need.
In a mission-critical pinot installation, the administrators require that during (and certainly after) the upgrade, correctness of normal operations (segment pushes, ingestion from streams, queries, monitoring, etc.) is not compromised..
For the reasons stated above, it is useful to have a way to test an upgrade before applying to the production cluster. Further, it is useful to be able to customize the tests to run using the unique table/schema/configurations/queries combination that an installation is using. If an installation has not upgraded pinot for a long time, it is useful to know what parts may be incompatible during the upgrade process, and schedule downtime if required.
As of release 0.8.0, Pinot has a compatibility tester that you can run before upgrading your installation with a new release. You can specify your own configuration for the pinot components, your table configurations and schema, your queries with your sample data, and run the compatibility suite (you can build one based on the sample test suite provided).
We recommend that you upgrade Pinot components in the following order (if you need to roll back a release, do it in the reverse order).
Controller
Broker
Server
Minion
The test suite runs through an upgrade sequence of upgrading each component one at a time (Controller, Broker, and Server in that order), and then reverting the new versions back to old version (Server, Broker and Controller, in that order). In between each upgrade or downgrade (referred to as a "phase"), a set of test operations (as specified in the test suite) is executed. The operations are specified in a declarative way in yaml files. At present the following operations are supported:
Create a table with a specific table config and schema
Create a segment from an input file and add it to a table
Run the queries specified in a file and verify the results as specified in a file
Create a Kafka topic with specified number of partitions
Ingest rows into the Kafka topic (so that server can consume them)
Delete a table
Delete a segment from a table
One or more of the above set of test operations can be done during each phase in the rollout or roll-back sequence. The test suite does the following steps in sequence
Set up a cluster with old version of controller, broker and server
Stop old controller, start new controller
Stop old broker and start new broker
Stop old server and start new server
Stop new server and start old server
Stop new broker and start old broker
Stop new controller and start old controller
Tests can be run in each phase, (i.e. between any two steps outlined above, or, after the last step). You can create a test suite by writing yaml files for each phase. You may decide to skip any phase by not providing a yaml file for that phase.
The idea here is as follows:
Any persisted files (such as table configs, schemas, data segments, etc.) are readable during and after upgrade.
Any persisted files while in the new release are readable after a rollback (in case that is required).
Protocols between the components evolve in a backward compatible manner.
Minion upgrades is currently not supported in the test framework. Also, testing compatibility of the controller APIs is not supported at this time. We welcome contributions in these areas.
See the yaml files provided along with the source code for examples on how to specify operations for each roll forward/backward stage of the upgrade process.
There are two commands available. The first one allows you to identify the versions or builds between which you wish to ascertain compatibility. The second one runs the test suite.
Depending on how old your versions are, you may have some build failures. It will be useful to create the following file as compat-settings.xml
and set it in an environment variable before running the checkoutAndBuild.sh
command:
And the command to run the compatibility test suite is as follows:
You can use command line tools to verify compatibility against a previous release of Pinot (the tools support a --help
option).
Here are the steps to follow before you upgrade your installation
This can be a commit hash, or a release tag (such as release-0.7.1
). You can obtain the commit hash from the controller URI /version
.
This can be a tag or a commit hash.
Clone the current source code from Pinot and go to the appropriate directory. This will get you the latest compatibility tester.
Checkout and build the sources of the two releases you want to verify. Make sure your working directory (-w argument) has enough space to hold two build trees, logs, etc.
The command will exit with a status of 0 if all tests pass, 1 otherwise.
NOTE:
You can run the compCheck.sh
command multiple times against the same build, you just need to make sure to provide a new working directory name each time.
You can specify a -k
option to the compCheck.sh
command to keep the cluster (Kafka, Pinot components) running. You can then attempt the operation (e.g. a query) that failed.
So we can use the same data files and queries, upload them as new set of rows (both in Realtime and Offline tables), we encourage you to modify your table schema by adding an integer column called generationNumber
. Each time data is uploaded, the values written as __GENERATION_NUMBER__
in your input data files (or in the query files) are substituted with a new integer value.
This allows the test suite to upload the same data as different segments, and verify that the current data as well as the previously uploaded ones are all working correctly in terms of responding to queries. The test driver automatically tests all previous generation numbers as well.
See the input file and query file in sample test suite for use of this feature.
Consider an input line in the data file like the following:
When this input line is processed to generate a segment or push data into Kafka, the string __GENERATION_NUMBER__
will be replaced with an integer (each yaml file is one generation, starting with 0).
Similarly, consider a query like the following:
Before issuing this query, the tests will substitute the string __GENERATION_NUMBER__
with the actual generation number like above.
Use of generation number is optional (the test suite will try to substitute the string __GENERATION_NUMBER__
, but not find it if your input files do not have the string in them). Another way is to ensure that the set of queries you provide for each phase also includes results from the previous phases. That will make sure that all previously loaded data are also considered in the results when the queries are issued.
The first time you set up your result files, it is important that you look over the results carefully and make sure that they are correct.
In some cases, Pinot may provide different results each time you execute a query. For example, consider the query:
Since ORDER BY
is not specified, if there are more than 5 results, there is no guarantee that Pinot will return the same five rows every time. In such a case, you can include all possible values of foo
where x = 7
matches, and indicate that in your result file by specifying isSuperset: true
. An example of this feature is shown below:
See the sample test suite for an example of how to use this in the result file.
The sample test suite provided does the following between each stage of the upgrade:
Add a segment to an offline table
Run queries against new segments, and all old segments added thus far.
Add more rows to Kafka, ensuring that at least one segment is completed and at
least some rows are left uncommitted, so that we can test correct re-consumption of those
rows after rollout/rollback.
Run queries against the data ingested so far.
The table configurations schemas, data and queries have been chosen in such a way as to cover the major features that Pinot supports.
As a good practice, we suggest that you build your own test suite that has the tables, schemas, queries, and system configurations used in your installation of Pinot, so that you can verify compatibility for the features/configurations that your cluster uses.
See the section on Ingesting Realtime Data before reading this section.
Pinot servers ingest rows into a consuming segment that resides in volatile memory. Therefore, pinot servers hosting consuming segments tend to be memory bound. They may also have long garbage collection cycles when the segment is completed and memory is released.
You can configure pinot servers to use off-heap memory for dictionary and forward indices of consuming segments by setting the value of pinot.server.instance.realtime.alloc.offheap
to true
. With this configuration in place, the server allocates off-heap memory by memory-mapping files. These files are never flushed to stable storage by Pinot (the Operating System may do so depending on demand for memory on the host). The files are discarded when the consuming segment is turned into a completed segment.
By default the files are created under the directory where the table’s segments are stored in local disk attached to the consuming server. You can set a specific directory for consuming segments with the configuration pinot.server.consumerDir
. Given that there is no control over flushing of pages from the memory mapped for consuming segments, you may want to set the directory to point to a memory-based file system, eliminating wasteful disk I/O.
If memory-mapping is not desirable, you can set pinot.server.instance.realtime.alloc.offheap.direct
to true
. In this case, pinot allocates direct ByteBuffer objects for consuming segments. Using direct allocation can potentially result in address space fragmentation.
Note
We still use heap memory to store inverted indices for consuming segments.
The number of rows in a consuming segment needs to be balanced. Having too many rows can result in memory pressure. On the other hand, having too few rows results in having too many small segments. Having too many segments can be detrimental to query performance, and also increase pressure on the Helix.
The recommended way to do this is to use the realtime.segment.flush.threshold.segment.size
setting as described in StreamConfigs Section. You can run the administrative tool pinot-admin.sh RealtimeProvisioningHelper
that will help you to come up with an optimal setting for the segment size.
This feature is available only if the consumption type is LowLevel
.
The structure of the consuming segments and the completed segments are very different. The memory, CPU, I/O and GC characteristics could be very different while processing queries on these segments. Therefore it may be useful to move the completed segments onto different set of hosts in some use cases.
You can host completed segments on a different set of hosts using the tagOverrideConfig
as described in Table Config. Pinot will automatically move them once the consuming segments are completed.
If you require more fine-tuned control over how segments are hosted on different hosts, we recommend that you use the Tag-Based Instance Assignment feature to accomplish this.
This feature is available only if the consumption type is LowLevel
.
When a realtime segment completes, a winner server is chosen as a committer amongst all replicas by the controller. That committer builds the segment and uploads to the controller. The non-committer servers are asked to catchup to the winning offset. If the non-committer servers are able to catch up, they are asked to build the segment and replace the in-memory segment. If they are unable to catchup, they are asked to download the segment from the controller.
Building a segment can cause excessive garbage and may result in GC pauses on the server. Long GC pauses can affect query processing. In order to avoid this, we have a configuration that allows you to control whether
It might become desirable to force the non-committer servers to download the segment from the controller, instead of building it again. The completionConfig
as described in Table Config can be used to configure this.
This feature is available only if the consumption type is LowLevel
.
Once a committer is asked to commit the segment, it builds a segment, and issues an HTTP POST to the controller, with the segment. The controller than commits the segment in Zookeeper and starts the next consuming segment.
It is possible to conifigure the servers to do a split commit, in which the committer performs the following steps:
Build the segment
Start a transaction with the lead controller to commit the segment (CommitStart phase)
Post the completed segment to any of the controllers (and the controller posts it to segment store)
End the transaction with the lead controller (CommentEnd phase). Optionally, this step can be done with the segment metadata.
This method of committing can be useful if the network bandwidth on the lead controller is limiting segment uploads.In order to accomplish this, you will need to set the following configurations:
On the controller, set pinot.controller.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.commitend.metadata
to true
(default is false).
This tool can help decide the optimum segment size and number of hosts for your table. You will need one sample Pinot segment from your table before you run this command. There are three ways to get a sample segment:
If you have an offline segment, you can use that.
You can provision a test version of your table with some minimum number of hosts that can consume the stream, let it create a few segments with large enough number of rows (say, 500k to 1M rows), and use one of those segments to run the command. You can drop the test version table, and re-provision it once the command outputs some parameters to set.
If you don't have a segment in hand or provisioning of a test version of your table is not an easy option, you can provide schema which is decorated with data characteristics. Then the tool generates a segment based on the provided characteristics behind the scene and proceeds with the realtime analysis. In case the characteristics of real data is very different, you may need to modify the parameters. You can always change the config after you get segments from real data.
As of Pinot version 0.5.0, this command has been improved to display the number of pages mapped, as well as take in the push frequency as an argument if the realtime table being provisioned is a part of a hybrid table. If you are using an older version of this command, please download a later version and re-run the command. The arguments to the command are as follows:
tableConfigFile
: This is the path to the table config file
numPartitions
: Number of partitions in your stream
numHosts
: This is a list of the number of hosts for which you need to compute the actual parameters. For example, if you are planning to deploy between 4 and 8 hosts, you may specify 4,6,8. In this case, the parameters will be computed for each configuration -- that of 4 hosts, 6 hosts, and 8 hosts. You can then decide which of these configurations to use.
numHours
: This is a list of maximum number of hours you want your consuming segments to be in consuming state. After these many hours the segment will move to completed state, even if other criteria (like segment size or number of rows) are not met yet. This value must be smaller than the retention of your stream. If you specify too small a value, then you run the risk of creating too many segments, this resulting in sub-optimal query performance. If you specify this value to be too big, then you may run the risk of having too large segments, running out of "hot" memory (consuming segments are in read-write memory). Specify a few different (comma-separated) values, and the command computes the segment size for each of these.
sampleCompletedSegmentDir
: The path of the directory in which the sample segment is present. See above if you do not have a sample segment.
pushFrequency
: This is optional. If this is a hybrid table, then enter the frequency with which offline segments are pushed (one of "hourly", "daily", "weekly" or "monthly"). This argument is ignored if retentionHours
is specified.
maxUsableHostMemory
: This is the total memory available in each host for hosting retentionHours
worth of data (i.e. "hot" data) of this table. Remember to leave some for query processing (or other tables, if you have them in the same hosts). If your latency needs to be very low, this value should not exceed the physical memory available to store pinot segments of this table, on each host in your cluster. On the other hand, if you are trying to lower cost and can take higher latencies, consider specifying a bigger value here. Pinot will leave the rest to the Operating System to page memory back in as necessary.
retentionHours
: This argument should specify how many hours of data will typically be queried on your table. It is assumed that these are the most recent hours. If pushFrequency
is specified, then it is assumed that the older data will be served by the offline table, and the value is derived automatically. For example, if pushFrequency
is daily
, this value defaults to 72
. If hourly
, then 24
. If weekly
, then 8d
. If monthly
, then 32d
. If neither pushFrequency
nor retentionHours
is specified, then this value is assumed to be the retention time of the realtime table (e.g. if the table is retained for 6 months, then it is assumed that most queries will retrieve all six months of data). As an example, if you have a realtime only table with a 21 day retention, and expect that 90% of your queries will be for the most recent 3 days, you can specify a retentionHours
value of 72. This will help you configure a system that performs much better for most of your queries while taking a performance hit for those that occasionally query older data.
ingestionRate
: Specify the average number of rows ingested per second per partition of your stream.
schemaWithMetadataFile
: This is needed if you do not have a sample segment from the topic to be ingested. This argument allows you to specify a schema file with additional information to describe the data characteristics (like number of unique values each column can have, etc.).
numRows
: This is an optional argument if you want the tool to generate a segment for you. If it is not give, then a default value of 10000
is used.
One you run the command, it produces an output as below:
The idea here is to choose an optimal segment size so that :
The number of segments searched for your queries are minimized
The segment size is neither too large not too small (where "large" and "small" are as per the range for your table).
Overall memory is optimized for your table, considering the other tables in the host, the query traffic, etc.
You can pick the appropriate value for segment size and number of hours in the table config, and set the number of rows to zero. Note that you don't have to pick values exactly as given in each of these combinations (they are calculated guesses anyway). Feel free to choose some values in between or out of range as you feel fit, and adjust them after your table is in production (no restarts required, things will slowly adjust themselves to the new configuration). The example given below chooses from the output.
Case 1: Optimize for performance, high QPS
From the above output you may decide that 6 hours is an optimal consumption time given the number of active segments looked at for a query, and you can afford about 4G of active memory per host. You can choose either 8 or 10 hosts, you choose 10. In this case, the optimal segment size will be 111.98M. You can then enter your realtime table config as below:
Case 2: Optimize for cost, low QPS
You may decide from the output that you want to make do with 6 hosts. You have only 2G of memory per host for active segments but you are willing to map 8G of active memory on that, with plenty of paging for each query. Since QPS is low, you may have plenty of CPU per query so huge segments may not be a problem. Choose 12 or 24h or consumption and pick an appropriate segment size. You may then configure something like:
When the use case has very high qps along with low latency requirements (usually site facing use cases), we need to consider optimizing the scatter-and-gather.
Below table summarizes the two issues with the default behavior of Pinot.
By default, Pinot uniformly distributes all the segments to all servers of a table. When scatter-and-gathering query requests, broker also uniformly distributes the workload among servers for each segment. As a result, each query will span out to all servers with balanced workload. It works pretty well when qps is low and you have a small number of servers in the cluster. However, as we add more servers or have more qps, the probability of hitting slow servers (e.g. gc) increases steeply and Pinot will suffer from a long tail latency.
In order to address this issue, we have introduced a concept of Replica Group
, which allows us to control the number of servers to fan out for each query.
Replica Group
is a set of servers that contains a ‘complete’ set of segments of a table. Once we assign the segment based on replica group, each query can be answered by fanning out to a single replica group instead of all servers.
Replica Group
can be configured by setting the InstanceAssignmentConfig
in the table config. Replica group based routing can be configured by setting replicaGroup
as the instanceSelectorType
in the RoutingConfig
.
As seen above, you can use numReplicaGroups
to control the number of replica groups (replications), and use numInstancesPerReplicaGroup
to control the number of servers to span. For instance, let’s say that you have 12 servers in the cluster. Above configuration will generate 3 replica groups (numReplicaGroups=3
), and each replica group will contain 4 servers (numInstancesPerPartition=4
). In this example, each query will span to a single replica group (4 servers).
As you seen above, replica group gives you the control on the number of servers to span for each query. When you try to decide the proper number of numReplicaGroups
and numInstancesPerReplicaGroup
, you should consider the trade-off between throughput and latency. Given a fixed number of servers, increasing numReplicaGroups
factor while decreasing numInstancesPerReplicaGroup
will give you more throughput because each server requires to process less number of queries. However, each server will need to process more number of segments per query, thus increasing overall latency. Similarly, decreasing numReplicaGroups
while increasing numInstancesPerReplicaGroup
will make each server processing more number of queries but each server needs to process less number of segments per query. So, this number has to be decided based on the use case requirements.
By default, Pinot broker will distribute all segments for query processing and segment pruning is happening in Server. In other words, Server will look at the segment metadata such as min/max time value and discard the segment if it does not contain any data that the query is asking for. Server side pruning works pretty well when the qps is low; however, it becomes the bottleneck if qps is very high (hundreds to thousands queries per second) because unnecessary segments still need to be scheduled for processing and consume cpu resources.
Currently, we have two different mechanisms to prune segments on the broker side to minimize the number of segment for processing before scatter-and-gather.
When the data is partitioned on a dimension, each segment will contain all the rows with the same partition value for a partitioning dimension. In this case, a lot of segments can be pruned if a query requires to look at a single partition to compute the result. Below diagram gives the example of data partitioned on member id while the query includes an equality filter on member id.
Partitoning
can be enabled by setting the following configuration in the table config.
Pinot currently supports Modulo
, Murmur
, ByteArray
and HashCode
hash functions. After setting the above config, data needs to be partitioned with the same partition function and number of partitions before running Pinot segment build and push job for offline push. Realtime partitioning depends on the kafka for partitioning. When emitting an event to kafka, a user need to feed partitioning key and partition function for Kafka producer API.
When applied correctly, partition information should be available in the segment metadata.
Broker side pruning for partitioning can be configured by setting the segmentPrunerTypes
in the RoutingConfig
. Note that the current implementation for partitioning only works for EQUALITY and IN filter (e.g. memberId = xx
, memberId IN (x, y, z)
).
Problem
Impact
Solution
Querying all servers
Bad tail latency, not scalable
Control the number of servers to fan out
Querying all segments
More CPU work on server
Minimize the number of segment