As explained in architecture, Apache Pinot is a distributed system where different components have specific roles. Queries arrive at one of the brokers, that calculates which servers are going to participate in the query. In most clusters, a single server cannot hold all the segments, so each server has a partial view of the complete data. At the same time, segments are usually replicated on different servers in order to have high availability and better performance. In order to produce a complete result, the broker needs to calculate a subset of servers that contains all the segments that are required to resolve the query. Once this subset is calculated, the router uses a scatter and gather algorithm that sends the query to each server and then merges the partial results into the complete one.
There may be several subsets of servers that contain all the required segments. Any of them will return the correct result for the given query, but the subset used may affect the query performance. It is clear that the more servers participate in the query, the more CPUs, memory and IO can be used and therefore the better peak performance can be achieved. This is why, by default, brokers uniformly distribute the workload among as many servers as possible with balanced workload. Given that servers use segments as the minimum unit of work, the maximum parallelism is defined by the number of segments that are required to resolve the query.
What may be less clear is that the more servers participate in the query, the worse the tail latency (and even the resilience) will be. The reason is that the scatter and gather algorithm cannot produce a correct complete result until the partial results of all participating servers are collected. Therefore the latency of the query must be at least the higher latency of the servers participating and if any server crashes during the execution, the result merged in the broker may be incorrect. The more servers participate in a query, the higher the possibility to touch a server that requires some time to answer or to fail.
An easy-to-understand scenario is the impact of a full GC on a server. Let say a given query takes, on average, 30ms to execute. Depending on the JVM configuration that is used, a full GC may stop the process for a few hundreds of milliseconds, so if our query hits one server doing a full GC the latency will increase by at least one order or magnitude. Given that a correctly configured server shouldn't be stopped by full GCs very often, if to resolve this query a broker uses 3 servers, the possibility of hitting a server running a full GC is very small. Therefore, the tail latency of this query hitting 3 servers is expected to be close to 30ms. But if instead of 3 servers our query is spread on 30 servers, the possibility of hitting a server running a full GC increases significantly and therefore the tail latencies (p95, p99, etc) will increase.
To improve the tail latency, Apache Pinot provides two techniques at routing level:
Reduce the query fanout by exploding data distribution.
Reduce the query fanout by exploding data replication.
As explained above, the broker must calculate the subset of servers that contains all the segments required to have a complete result. By default, the broker doesn't know anything about segments, so the subset of servers must contain all the segments in the table. This increases the number of servers that need to be asked. For example, in an extreme case where somehow the broker were able to know that only one segment was required, it could just ask one of the servers that contains that segment. By skipping servers that will not contain interesting data, the tail latency can be reduced without impacting the maximum peak performance.
Given that the number of segments may be large, the broker cannot store too much information per segment. One example of this is the bloom filters, which are used by servers to prune segments that will not contain relevant data to the given query. Given that the number of segments in a cluster can be quite large, brokers cannot afford to store them in memory and therefore bloom filters cannot be used to improve the rooting.
Instead, the user can inform Apache Pinot about specific patterns on which the data is actually distributed among segments. There are two patterns that can brokers can take advantage of and both can be used at the same time:
Data ingested ordered by the optional time column.
Data ingested partitioned by some column.
When the schema defines a primary time column and data is ingested in approximate order by that column, brokers can optimize routing of queries that filter by that column. This is a common case for example when rows represent events (like logs, metrics, etc). As usual, Apache Pinot does not require events to be inserted in strict order. The closer the data distribution is to the strict order, the more selective this pruner will be but this technique can be used even when the data is not completely ordered.
To enable this optimization, segmentPartitionConfig
must contain the time
pruner type:
Apart from the ascending time, Apache Pinot can also take advantage of other distribution data patterns: Partition by segments. In order to use this feature, the table needs to be configured to use a partition function. Pinot will apply this function to each row in a segment and will store the set of partitions seen in the segment. Later, when a query is executed, Pinot will analyze the query in order to look for partition constraints. For example, when executing a query like select col1 from Table1 where col2 = 3
, Pinot will calculate the partition associated with col2 = 3
and will prune segments that do not contain rows on that partition.
In order to make this pruning more efficient, segments should have the least number of partitions possible, which ideally is 1. More formally, given a function p
, for all segments s
, given any pair of rows r1
and r2
, it should be true that p(r1) = p(r2)
. For example, in a table configured to have 3 partitions by memberId
column, using modulo
as the partition function, a segment that contains a row with memberId
= 101 may also contain another row with memberId
= 2 and another with memberId
= 335, but it should not contain a row with memberId
= 336 or memberId
= 334.
Data cannot always be partitioned by a dimension column or even when it is, not all queries can take advantage of the distribution. But when this optimization can be applied, a lot of segments can be pruned. The current implementation for partitioning only works for EQUALITY and IN filter (e.g. memberId = xx
, memberId IN (x, y, z)
). Below diagram gives the example of data partitioned on member ID while the query includes an equality filter on member ID.
Apache Pinot currently supports Modulo
, Murmur
, ByteArray
and HashCode
hash functions and partitioning can be enabled by setting the following configuration in the table config.
After setting the above config, data should be partitioned with the same partition function and number of partitions before running Pinot segment build and push job for offline push. Here's a scala UDF example of a partition function that Pinot understands, which is to be used for data partitioning.
When applied correctly, partition information should be available in the segment metadata.
In order to maximize the partition pruning efficiency, it is important to reduce the number of partitions contained on each segment. Pinot will not move rows between segments to maximize partition efficiency, so when possible, rows should be correctly distribute before ingesting them into Pinot.
When using real-time tables, usually the input source can be used to do this partition. For example, one very usual input source in real-time tables is Kafka. In this case, in order to achieve maximum partition efficiency, Pinot and Kafka should use the same partition configuration.
When using offline tables, each input file should be crafted to contain rows on the same partition. Imagine we are creating a table from a list of CSV files. Ideally all rows in the csv should belong to the same partition. In the example above where we used modulo
and 3 partitions, the value memberId = 101
will be associated with partition 2 (memberId % 3 = 2
). Therefore to maximize the partition efficiency other rows in the same CSVv should have a memberId
equal to 2, 5 or 104 but should not have values like 1, 3, 100 or 102.
Remember that Pinot does not impose a hard requirement here. It is fine if segments contain rows of more than one partition. What should be avoided is to have most segments associated with most partitions, given that in that case Pinot won't be actually able to prune most segments.
Although using partitions can drastically reduce the fanout, it only applies to specific queries and requires a specific data distribution between segments. The most consistent way to limit the fanout is to define replica group segment alignment. A Replica Group
is a subset of servers that contains a ‘complete’ set of segments of a table. Once we assign the segment based on the replica group, each query can be answered by fanning out to a single replica group instead of all servers.
To use replica groups, the table configuration must be changed in the following ways:
Replica groups must be declared in the InstanceAssignmentConfig
section.
RoutingConfig.instanceSelectorType
must be changed to replicaGroup
.
As seen above, you can use numReplicaGroups
to control the number of replica groups (replications), and use numInstancesPerReplicaGroup
to control the number of servers to span. For instance, let’s say that you have 12 servers in the cluster. Above configuration will generate 3 replica groups (numReplicaGroups=3
), and each replica group will contain 4 servers (numInstancesPerPartition=4
). In this example, each query will span to a single replica group (4 servers).
As seen above, replica groups give you the control on the number of servers to span for each query. When you try to decide the proper number of numReplicaGroups
and numInstancesPerReplicaGroup
, consider the trade-off between throughput and latency. Given a fixed number of servers, increasing numReplicaGroups
factor while decreasing numInstancesPerReplicaGroup
will make each query use less servers, which may reduce the possibility of one of them having a full GC. However, each server will need to process more number of segments per query, reducing the throughput, to the point that extreme values may even increase the average latency. Similarly, decreasing numReplicaGroups
while increasing numInstancesPerReplicaGroup
will make each query use more servers, increasing the possibility of one of them having a full GC but making each server process less number of segments per query. So, this number has to be decided based on the use case requirements.
By default, the Pinot broker will route queries to the segment replica that is currently under the least load. It is possible to have the Pinot broker route all queries for a specific table to the same server for a given segment. You might do this if you are finding inconsistencies in query results due to an offset for consuming segments across different replicas.
You can enable this feature at different levels, as shown in the following examples.
To enable for a specific query via query options, which overrides the table/broker level configuration:
To enable for a specific table using table config settings, which overrides the broker level configuration, add the following in your table config:
To enable for all tables in the cluster using broker config settings:
It's important to note that this feature operates on a best-effort basis and routing may revert to routing to other replicas if there are alterations in segment assignments or if one or more servers become unavailable.
Additionally, adopting this feature could lead to potential skew in server resource utilization, particularly in clusters with a smaller number of tables, as the query load may no longer be evenly distributed across servers.
Adaptive Server Selection is a new routing capability for Pinot Brokers where incoming queries are routed to the best available server instead of following the default round robin approach while choosing servers. With this feature, Brokers will be sensitive to changes on the Servers like GC issues, slowness, network slowness, etc. The broker will thus adaptively route more queries to faster servers and lesser queries to slower servers
There are two main components:
Stats Collection
Routing using Adaptive Server Selection
Each broker maintains stats individually for all servers. These stats are collected at the broker during query processing when the query is routed to the servers and after the response is received from the servers. These stats are maintained in-memory. Some of the stats collected at broker per server are as follows:
Number of in-progress / in-flight queries
EWMA (Exponential Weighted Moving Average) for latencies seen by queries
EWMA (Exponential Weighted Moving Average) for number of ongoing queries at any time
When the broker receives a query, it will use the above stats to pick the best available server. This enables the broker to automatically reduces the number of queries it sends to slow servers and increase the number of queries it sends to faster servers. We currently support the following strategies:
NO_OP : Uses the default RoundRobin approach. In other words, this will give existing behavior where stats are not used by broker when picking the servers to route the query to.
NUM_INFLIGHT_REQ : Uses the number of in-flight requests stat to determine the best server
LATENCY : Uses the EWMA latency stat to determine the best server
HYBRID : Uses a combination of in-flight requests and latency to determine the best server
The above strategies works in tandem with the following available Routing mechanisms today:
Balanced Routing
ReplicaGroup Routing
So, a table can be configured to use Balanced or Replica group segment assignment + routing and can still leverage the adaptive server selection feature.
The configuration for enabling/disabling this feature and the knobs for performance tuning are present at the Broker instance level. The feature is currently turned off by default.
To enable Stats Collection, set pinot.broker.adaptive.server.selector.enable.stats.collection = true
. Note that setting this property alone will only enable stats collection and not perform Adaptive Routing
To enable an Adaptive Routing Strategy, use one of the following configs. The HYBRID
strategy works well for most use cases. Unless you are an advanced user, we recommend using the HYBRID
strategy.
pinot.broker.adaptive.server.selector.type=HYBRID
pinot.broker.adaptive.server.selector.type=NUM_INFLIGHT_REQ
pinot.broker.adaptive.server.selector.type=LATENCY
The following configs are already set to default values that work well for most usecases. For advanced users, the following knobs are available to tune Adaptive Routing Strategies
Prefix all the below properties with pinot.broker.adaptive.server.selector.
Property | Description | Default Value |
---|---|---|
ewma.alpha
Alpha value for Exponential Moving Average. A higher value would provide more weightage to incoming values and lower weightage to older values
0.666
autodecay.window.ms
If the EWMA value has not been updated for a while, the duration after which the value should be decayed
10000
avg.initialization.val
Initial value for EWMA average
1.0
stats.manager.threadpool.size
Number of threads reserved to process Adaptive Server Selection Stats.
2
This section provides information on various options to tune Pinot cluster for storage and query efficiency. Unlike key-value store, tuning Pinot sometimes can be tricky because the cost of query can vary depending on the workload and data characteristics.
If you want to improve query latency for your use case, you can refer to Index Techniques
section. If your use case faces the scalability issue after tuning index, you can refer Optimizing Scatter and Gather
for improving query throughput for Pinot cluster. If you have identified a performance issue on the specific component (broker or server), you can refer to the Tuning Broker
or Tuning Server
section.
Learn about tuning real-time tables.
See the section on Ingesting Real-time Data before reading this section.
Pinot servers ingest rows into a consuming segment that resides in volatile memory. Therefore, pinot servers hosting consuming segments tend to be memory bound. They may also have long garbage collection cycles when the segment is completed and memory is released.
You can configure Pinot servers to use off-heap memory for dictionary and forward indices of consuming segments by setting the value of pinot.server.instance.realtime.alloc.offheap
to true
. With this configuration in place, the server allocates off-heap memory by memory-mapping files. These files are never flushed to stable storage by Pinot (the Operating System may do so depending on demand for memory on the host). The files are discarded when the consuming segment is turned into a completed segment.
By default, the files are created under the directory where the table’s segments are stored in local disk attached to the consuming server. You can set a specific directory for consuming segments with the configuration pinot.server.consumerDir
. Given that there is no control over flushing of pages from the memory mapped for consuming segments, you may want to set the directory to point to a memory-based file system, eliminating wasteful disk I/O.
If you don't want to use memory-mapping, set pinot.server.instance.realtime.alloc.offheap.direct
to true
. In this case, pinot allocates direct ByteBuffer objects for consuming segments. Using direct allocation can potentially result in address space fragmentation.
Note that we still use heap memory to store inverted indices for consuming segments.
The number of rows in a consuming segment needs to be balanced. Having too many rows can result in memory pressure. On the other hand, having too few rows results in having too many small segments. Having too many segments can be detrimental to query performance, and also increase pressure on the Helix.
The recommended way to do this is to use the realtime.segment.flush.threshold.segment.size
setting as described in StreamConfigs Section. You can run the administrative tool pinot-admin.sh RealtimeProvisioningHelper
that will help you to come up with an optimal setting for the segment size.
This feature is available only if the consumption type is LowLevel
.
The structure of the consuming segments and the completed segments are very different. The memory, CPU, I/O and GC characteristics could be very different while processing queries on these segments. Therefore it may be useful to move the completed segments onto different set of hosts in some use cases.
You can host completed segments on a different set of hosts using the tagOverrideConfig
as described in Table Config. Pinot will automatically move them once the consuming segments are completed.
If you require more fine-tuned control over how segments are hosted on different hosts, we recommend that you use the Tag-Based Instance Assignment feature to accomplish this.
This feature is available only if the consumption type is LowLevel
.
When a real-time segment completes, a winner server is chosen as a committer amongst all replicas by the controller. That committer builds the segment and uploads to the controller. The non-committer servers are asked to catchup to the winning offset. If the non-committer servers are able to catch up, they are asked to build the segment and replace the in-memory segment. If they are unable to catchup, they are asked to download the segment from the controller.
Building a segment can cause excessive garbage and may result in GC pauses on the server. Long GC pauses can affect query processing. You might want to force the non-committer servers to download the segment from the controller instead of building it again. The completionConfig
as described in Table Config can be used to configure this.
This feature is available only if the consumption type is LowLevel
.
Once a committer is asked to commit the segment, it builds a segment, and issues an HTTP POST to the controller, with the segment. The controller then commits the segment in Zookeeper and starts the next consuming segment.
It is possible to configure the servers to do a split commit, in which the committer performs the following steps:
Build the segment
Start a transaction with the lead controller to commit the segment (CommitStart phase)
Post the completed segment to any of the controllers (and the controller posts it to segment store)
End the transaction with the lead controller (CommentEnd phase). Optionally, this step can be done with the segment metadata.
This method of committing can be useful if the network bandwidth on the lead controller is limiting segment uploads.In order to accomplish this, you will need to set the following configurations:
On the controller, set pinot.controller.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.commitend.metadata
to true
(default is false).
This tool can help decide the optimum segment size and number of hosts for your table. You will need one sample Pinot segment from your table before you run this command. There are three ways to get a sample segment:
If you have an offline segment, you can use that.
You can provision a test version of your table with some minimum number of hosts that can consume the stream, let it create a few segments with large enough number of rows (say, 500k to 1M rows), and use one of those segments to run the command. You can drop the test version table, and re-provision it once the command outputs some parameters to set.
If you don't have a segment in hand or provisioning of a test version of your table is not an easy option, you can provide schema which is decorated with data characteristics. Then the tool generates a segment based on the provided characteristics behind the scene and proceeds with the real-time analysis. In case the characteristics of real data is very different, you may need to modify the parameters. You can always change the config after you get segments from real data.
As of Pinot version 0.5.0, this command has been improved to display the number of pages mapped, as well as take in the push frequency as an argument if the real-time table being provisioned is a part of a hybrid table. If you are using an older version of this command, download a later version and re-run the command. The arguments to the command are as follows:
tableConfigFile
: This is the path to the table config file
numPartitions
: Number of partitions in your stream
numHosts
: This is a list of the number of hosts for which you need to compute the actual parameters. For example, if you are planning to deploy between 4 and 8 hosts, you may specify 4,6,8. In this case, the parameters will be computed for each configuration -- that of 4 hosts, 6 hosts, and 8 hosts. You can then decide which of these configurations to use.
numHours
: This is a list of maximum number of hours you want your consuming segments to be in consuming state. After these many hours the segment will move to completed state, even if other criteria (like segment size or number of rows) are not met yet. This value must be smaller than the retention of your stream. If you specify too small a value, then you run the risk of creating too many segments, this resulting in sub-optimal query performance. If you specify this value to be too big, then you may run the risk of having too large segments, running out of "hot" memory (consuming segments are in read-write memory). Specify a few different (comma-separated) values, and the command computes the segment size for each of these.
sampleCompletedSegmentDir
: The path of the directory in which the sample segment is present. See above if you do not have a sample segment.
pushFrequency
: This is optional. If this is a hybrid table, then enter the frequency with which offline segments are pushed (one of "hourly", "daily", "weekly" or "monthly"). This argument is ignored if retentionHours
is specified.
maxUsableHostMemory
: This is the total memory available in each host for hosting retentionHours
worth of data (i.e. "hot" data) of this table. Remember to leave some for query processing (or other tables, if you have them in the same hosts). If your latency needs to be very low, this value should not exceed the physical memory available to store pinot segments of this table, on each host in your cluster. On the other hand, if you are trying to lower cost and can take higher latencies, consider specifying a bigger value here. Pinot will leave the rest to the Operating System to page memory back in as necessary.
retentionHours
: This argument should specify how many hours of data will typically be queried on your table. It is assumed that these are the most recent hours. If pushFrequency
is specified, then it is assumed that the older data will be served by the offline table, and the value is derived automatically. For example, if pushFrequency
is daily
, this value defaults to 72
. If hourly
, then 24
. If weekly
, then 8d
. If monthly
, then 32d
. If neither pushFrequency
nor retentionHours
is specified, then this value is assumed to be the retention time of the real-time table (e.g. if the table is retained for 6 months, then it is assumed that most queries will retrieve all six months of data). As an example, if you have a real-time only table with a 21 day retention, and expect that 90% of your queries will be for the most recent 3 days, you can specify a retentionHours
value of 72. This will help you configure a system that performs much better for most of your queries while taking a performance hit for those that occasionally query older data.
ingestionRate
: Specify the average number of rows ingested per second per partition of your stream.
schemaWithMetadataFile
: This is needed if you do not have a sample segment from the topic to be ingested. This argument allows you to specify a schema file with additional information to describe the data characteristics (like number of unique values each column can have, etc.).
numRows
: This is an optional argument if you want the tool to generate a segment for you. If it is not give, then a default value of 10000
is used.
One you run the command, it produces an output as below:
The idea here is to choose an optimal segment size so that :
The number of segments searched for your queries are minimized
The segment size is neither too large not too small (where "large" and "small" are as per the range for your table).
Overall memory is optimized for your table, considering the other tables in the host, the query traffic, etc.
You can pick the appropriate value for segment size and number of hours in the table config, and set the number of rows to zero. Note that you don't have to pick values exactly as given in each of these combinations (they are calculated guesses anyway). Feel free to choose some values in between or out of range as you feel fit, and adjust them after your table is in production (no restarts required, things will slowly adjust themselves to the new configuration). The example given below chooses from the output.
Case 1: Optimize for performance, high QPS
From the above output you may decide that 6 hours is an optimal consumption time given the number of active segments looked at for a query, and you can afford about 4G of active memory per host. You can choose either 8 or 10 hosts, you choose 10. In this case, the optimal segment size will be 111.98M. You can then enter your real-time table config as below:
Case 2: Optimize for cost, low QPS
You may decide from the output that you want to make do with 6 hosts. You have only 2G of memory per host for active segments but you are willing to map 8G of active memory on that, with plenty of paging for each query. Since QPS is low, you may have plenty of CPU per query so huge segments may not be a problem. Choose 12 or 24h or consumption and pick an appropriate segment size. You may then configure something like:
Schedule queries to prioritize them.
Pinot supports various different query scheduling mechanisms to allow for more control around which queries have priority when executing on the server instances.
Currently, there are the following options that can be configured using the pinot.query.scheduler.name
configuration:
First Come First Serve (fcfs
) (Default)
Bounded First Come First Serve (bounded_fcfs
)
Token Bucket (tokenbucket
)
This is the default scheduling mechanism, which simply allows all queries to execute in a first come, first serve mechanism. For most deployments, this is likely sufficient, but high QPS deployments that have skewed query workloads may struggle under this scheduling mechanism.
Bounded query schedulers operate under a context of a single table, and additionally respect the following configurations:
pinot.query.scheduler.threads_per_query_pct
(default 20%) will allow individual threads to take up to this percentage of the threads allocated to a table resource group
pinot.query.scheduler.table_threads_soft_limit_pct
(default 30%) indicates that once this percentage of the available threads are taken up by this resource group, the scheduler should prefer other resource groups but still allow queries in this group
pinot.query.scheduler.table_threads_hard_limit_pct
(default 45%) indicates that once this percentage of the available threads are taken up, the scheduler should not schedule any more queries for this resource group
Similarly to the "first come first serve" scheduling mechanism, this option will bound the resource utilization by ensuring that only a certain number of queries are running concurrently (set using the pinot.query.scheduler.query_runner_threads
configuration).
This query scheduling mechanism will periodically grant tokens to each scheduler group, and will select the group to run with the highest number of tokens assigned to it. This can be configured using the following configurations:
pinot.query.scheduler.tokens_per_ms
will indicate how many tokens to generate per second for each group. The default value for this is the number of threads avaialble to run queries (which will essentially attempt to schedule more queries than is possible to run)
pinot.query.scheduler.token_lifetime_ms
indicates the lifetime of every allocated token
This scheduler applies a linear decay for groups that have recently been scheduled to avoid starvation and allow groups with light workloads to be scheduled.