Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This section provides information on various options to tune Pinot cluster for storage and query efficiency. Unlike Key-Value store, tuning Pinot sometimes can be tricky because the cost of query can vary depending on the workload and data characteristics.
If you want to improve query latency for your use case, you can refer to Index Techniques
section. If your use case faces the scalability issue after tuning index, you can refer Optimizing Scatter and Gather
for improving query throughput for Pinot cluster. If you have identified a performance issue on the specific component (broker or server), you can refer to the Tuning Broker
or Tuning Server
section.
To setup a Pinot cluster, follow these steps
instances
instances
instances
Access control can be setup at various points in Pinot, such as controller endpoints and broker query endpoints. By default we will use AllowAllAccessFactory and hence not be enforcing any access controls. You can add access control by implementing the AccessControlFactory interface.
The access control factory can be configured in the controller configs by setting the fully qualified class name of the AccessControlFactory in the property controller.admin.access.control.factory.class
The access control factory can be configured in the broker configs by setting the fully qualified class name of the AccessControlFactory in the property pinot.broker.access.control.class
. Any other properties required for initializing the factory can be set in the broker configs as properties with the prefix pinot.broker.access.control
.
Please refer to key metrics documented in monitoring pinot.
Pinot uses yammer MetricsRegistry to collect metrics within our application components. These metrics can be published to a metrics server with the help of MetricsRegistryRegistrationListener interface. By default, metrics are published to JMX using the JmxReporterMetricsRegistryRegistrationListener.
You can write a listener to publish metrics to another metrics server by implementing the MetricsRegistryRegistrationListener
interface. This listener can be injected into the controller by setting the fully qualified name of the class in the controller configs for the property pinot.controller.metrics.metricsRegistryRegistrationListeners
.
You would have to design your own systems to view and monitor these metrics. A list of all the metrics published for each component can be found in:
Metrics published to JMX could also be exposed to Prometheus through tooling like jmx_reporter.
To run as a javaagent, download jmx_prometheus_javaagent jar and pinot.yml run:
This will expose a port at 8080 to dump metrics as Prometheus format for Prometheus scrapper to fetch.
When the use case has very high qps along with low latency requirements (usually site facing use cases), we need to consider optimizing the scatter-and-gather.
Below table summarizes the two issues with the default behavior of Pinot.
By default, Pinot uniformly distributes all the segments to all servers of a table. When scatter-and-gathering query requests, broker also uniformly distributes the workload among servers for each segment. As a result, each query will span out to all servers with balanced workload. It works pretty well when qps is low and you have a small number of servers in the cluster. However, as we add more servers or have more qps, the probability of hitting slow servers (e.g. gc) increases steeply and Pinot will suffer from a long tail latency.
In order to address this issue, we have introduced a concept of Replica Group
, which allows us to control the number of servers to fan out for each query.
Replica Group
is a set of servers that contains a ‘complete’ set of segments of a table. Once we assign the segment based on replica group, each query can be answered by fanning out to a single replica group instead of all servers.
Replica Group
can be configured by setting the InstanceAssignmentConfig
in the table config. Replica group based routing can be configured by setting replicaGroup
as the instanceSelectorType
in the RoutingConfig
.
As seen above, you can use numReplicaGroups
to control the number of replica groups (replications), and use numInstancesPerReplicaGroup
to control the number of servers to span. For instance, let’s say that you have 12 servers in the cluster. Above configuration will generate 3 replica groups (numReplicaGroups=3
), and each replica group will contain 4 servers (numInstancesPerPartition=4
). In this example, each query will span to a single replica group (4 servers).
As you seen above, replica group gives you the control on the number of servers to span for each query. When you try to decide the proper number of numReplicaGroups
and numInstancesPerReplicaGroup
, you should consider the trade-off between throughput and latency. Given a fixed number of servers, increasing numReplicaGroups
factor while decreasing numInstancesPerReplicaGroup
will give you more throughput because each server requires to process less number of queries. However, each server will need to process more number of segments per query, thus increasing overall latency. Similarly, decreasing numReplicaGroups
while increasing numInstancesPerReplicaGroup
will make each server processing more number of queries but each server needs to process less number of segments per query. So, this number has to be decided based on the use case requirements.
By default, Pinot broker will distribute all segments for query processing and segment pruning is happening in Server. In other words, Server will look at the segment metadata such as min/max time value and discard the segment if it does not contain any data that the query is asking for. Server side pruning works pretty well when the qps is low; however, it becomes the bottleneck if qps is very high (hundreds to thousands queries per second) because unnecessary segments still need to be scheduled for processing and consume cpu resources.
Currently, we have two different mechanisms to prune segments on the broker side to minimize the number of segment for processing before scatter-and-gather.
When the data is partitioned on a dimension, each segment will contain all the rows with the same partition value for a partitioning dimension. In this case, a lot of segments can be pruned if a query requires to look at a single partition to compute the result. Below diagram gives the example of data partitioned on member id while the query includes an equality filter on member id.
Partitoning
can be enabled by setting the following configuration in the table config.
Pinot currently supports Modulo
, Murmur
, ByteArray
and HashCode
hash functions. After setting the above config, data needs to be partitioned with the same partition function and number of partitions before running Pinot segment build and push job for offline push. Realtime partitioning depends on the kafka for partitioning. When emitting an event to kafka, a user need to feed partitioning key and partition function for Kafka producer API.
When applied correctly, partition information should be available in the segment metadata.
Broker side pruning for partitioning can be configured by setting the segmentPrunerTypes
in the RoutingConfig
. Note that the current implementation for partitioning only works for EQUALITY and IN filter (e.g. memberId = xx
, memberId IN (x, y, z)
).
Dictionary encoding provides the array of unique values. Pinot allows to create a bloom filter on this unique values for each column. Bloom filter can quickly determine whether the value exist in the segment.
Bloom filter can be enabled by setting the following configuration in the table config.
Our implementation limits the size of bloom filter to be less than 1MB per segment along with max false positive of 5% to avoid consuming too much memory. We recommend to put bloom filter for the column with less than 1 million cardinality
.
Note that the current implementation for bloom filter works for EQUALITY filter only.
For more details on how to setup ingestion, refer to
.
See the section on before reading this section.
Pinot servers ingest rows into a consuming segment that resides in volatile memory. Therefore, pinot servers hosting consuming segments tend to be memory bound. They may also have long garbage collection cycles when the segment is completed and memory is released.
You can configure pinot servers to use off-heap memory for dictionary and forward indices of consuming segments by setting the value of pinot.server.instance.realtime.alloc.offheap
to true
. With this configuration in place, the server allocates off-heap memory by memory-mapping files. These files are never flushed to stable storage by Pinot (the Operating System may do so depending on demand for memory on the host). The files are discarded when the consuming segment is turned into a completed segment.
By default the files are created under the directory where the table’s segments are stored in local disk attached to the consuming server. You can set a specific directory for consuming segments with the configuration pinot.server.consumerDir
. Given that there is no control over flushing of pages from the memory mapped for consuming segments, you may want to set the directory to point to a memory-based file system, eliminating wasteful disk I/O.
If memory-mapping is not desirable, you can set pinot.server.instance.realtime.alloc.offheap.direct
to true
. In this case, pinot allocates direct objects for consuming segments. Using direct allocation can potentially result in address space fragmentation.
Note
We still use heap memory to store inverted indices for consuming segments.
The number of rows in a consuming segment needs to be balanced. Having too many rows can result in memory pressure. On the other hand, having too few rows results in having too many small segments. Having too many segments can be detrimental to query performance, and also increase pressure on the Helix.
The recommended way to do this is to use the realtime.segment.flush.desired.size
setting as described in . You can run the administrative tool pinot-admin.sh RealtimeProvisioningHelper
that will help you to come up with an optimal setting for the segment size.
This feature is available only if the consumption type is LowLevel
.
The structure of the consuming segments and the completed segments are very different. The memory, CPU, I/O and GC characteristics could be very different while processing queries on these segments. Therefore it may be useful to move the completed segments onto different set of hosts in some use cases.
This feature is available only if the consumption type is LowLevel
.
When a realtime segment completes, a winner server is chosen as a committer amongst all replicas by the controller. That committer builds the segment and uploads to the controller. The non-committer servers are asked to catchup to the winning offset. If the non-committer servers are able to catch up, they are asked to build the segment and replace the in-memory segment. If they are unable to catchup, they are asked to download the segment from the controller.
Building a segment can cause excessive garbage and may result in GC pauses on the server. Long GC pauses can affect query processing. In order to avoid this, we have a configuration that allows you to control whether
This feature is available only if the consumption type is LowLevel
.
Once a committer is asked to commit the segment, it builds a segment, and issues an HTTP POST to the controller, with the segment. The controller than commits the segment in Zookeeper and starts the next consuming segment.
It is possible to conifigure the servers to do a split commit, in which the committer performs the following steps:
Build the segment
Start a transaction with the lead controller to commit the segment (CommitStart phase)
Post the completed segment to any of the controllers (and the controller posts it to segment store)
End the transaction with the lead controller (CommentEnd phase). Optionally, this step can be done with the segment metadata.
This method of committing can be useful if the network bandwidth on the lead controller is limiting segment uploads.In order to accomplish this, you will need to set the following configurations:
On the controller, set pinot.controller.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.split.commit
to true
(default is false
).
On the server, set pinot.server.enable.commitend.metadata
to true
(default is false).
You can host completed segments on a different set of hosts using the tagOverrideConfig
as described in . Pinot will automatically move them once the consuming segments are completed.
It might become desirable to force the non-committer servers to download the segment from the controller, instead of building it again. The completionConfig
as described in can be used to configure this.
Problem
Impact
Solution
Querying all servers
Bad tail latency, not scalable
Control the number of servers to fan out
Querying all segments
More CPU work on server
Minimize the number of segment
For more details on how to setup a table, refer to