This page has a collection of frequently asked questions about ingestion with answers from the community.
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, make a pull request.
While Apache Pinot can work with segments of various sizes, for optimal use of Pinot, you want to get your segments sized in the 100MB to 500MB (un-tarred/uncompressed) range. Having too many (thousands or more) tiny segments for a single table creates overhead in terms of the metadata storage in Zookeeper as well as in the Pinot servers' heap. At the same time, having too few really large (GBs) segments reduces parallelism of query execution, as on the server side, the thread parallelism of query execution is at segment level.
Yes. Each table can be independently configured to consume from any given Kafka topic, regardless of whether there are other tables that are also consuming from the same Kafka topic.
Pinot automatically detects new partitions in Kafka topics. It checks for new partitions whenever RealtimeSegmentValidationManager
periodic job runs and starts consumers for new partitions.
You can configure the interval for this job using thecontroller.realtime.segment.validation.frequencyPeriod
property in the controller configuration.
Pinot supports multi-column partitioning for offline tables. Map multiple columns under tableIndexConfig.segmentPartitionConfig.columnPartitionMap
. Pinot assigns the input data to each partition according to the partition configuration individually for each column.
The following example partitions the segment based on two columns, memberID
and caseNumber
. Note that each partition column is handled separately, so in this case the segment is partitioned on memberID
(partition ID 1) and also partiitoned on caseNumber
(partition ID 2).
For multi-column partitioning to work, you must also set routing.segementPrunerTypes
as follows:
Set up partitioner in the Kafka producer: https://docs.confluent.io/current/clients/producer.html
The partitioning logic in the stream should match the partitioning config in Pinot. Kafka uses murmur2
, and the equivalent in Pinot is the Murmur
function.
Set the partitioning configuration as below using same column used in Kafka:
and also set:
To learn how partition works, see routing tuning.
For JSON, you can use a hex encoded string to ingest BYTES.
See the json_format(field) function which can store a top level json field as a STRING in Pinot.
Then you can use these json functions during query time, to extract fields from the json string.
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
To use explicit code points, you must double-quote (not single-quote) the string, and escape the code point via "\uHHHH", where HHHH is the four digit hex code for the character. See https://yaml.org/spec/spec.html#escaping/in%20double-quoted%20scalars/ for more details.
By default, Pinot limits the length of a String column to 512 bytes. If you want to overwrite this value, you can set the maxLength attribute in the schema as follows:
Events are available to queries as soon as they are ingested. This is because events are instantly indexed in memory upon ingestion.
The ingestion of events into the real-time table is not transactional, so replicas of the open segment are not immediately consistent. Pinot trades consistency for availability upon network partitioning (CAP theorem) to provide ultra-low ingestion latencies at high throughput.
However, when the open segment is closed and its in-memory indexes are flushed to persistent storage, all its replicas are guaranteed to be consistent, with the commit protocol.
This typically happens if:
The consumer is lagging a lot.
The consumer was down (server down, cluster down), and the stream moved on, resulting in offset not found when consumer comes back up.
In case of Kafka, to recover, set property "auto.offset.reset":"earliest"
in the streamConfigs
section and reset the CONSUMING
segment. See Real-time table configs for more details about the configuration.
You can also also use the "Resume Consumption" endpoint with "resumeFrom" parameter set to "smallest" (or "largest" if you want). See Pause Stream Ingestion for more details.
Inverted indexes are set in the tableConfig
's tableIndexConfig
-> invertedIndexColumns
list. For more info on table configuration, see Table Config Reference. For an example showing how to configure an inverted index, see Inverted Index.
Applying inverted indexes to a table configuration will generate an inverted index for all new segments. To apply the inverted indexes to all existing segments, see How to apply an inverted index to existing segments?
Add the columns you want to index to the tableIndexConfig
-> invertedIndexColumns
list. To update the table configuration use the Pinot Swagger API: http://localhost:9000/help#!/Table/updateTableConfig.
Invoke the reload API: http://localhost:9000/help#!/Segment/reloadAllSegments.
Once you've done that, you can check whether the index has been applied by querying the segment metadata API at http://localhost:9000/help#/Segment/getServerMetadata. Don't forget to include the names of the column on which you have applied the index.
The output from this API should look something like the following:
Not all indexes can be retrospectively applied to existing segments.
If you want to add or change the sorted index column or adjust the dictionary encoding of the default forward index you will need to manually re-load any existing segments.
Star-tree indexes are configured in the table config under the tableIndexConfig
-> starTreeIndexConfigs
(list) and enableDefaultStarTree
(boolean). See here for more about how to configure star-tree indexes: https://docs.pinot.apache.org/basics/indexing/star-tree-index#index-generation
The new segments will have star-tree indexes generated after applying the star-tree index configurations to the table configuration.
Pinot does not require ordering of event time stamps. Out of order events are still consumed and indexed into the "currently consuming" segment. In a pathological case, if you have a 2 day old event come in "now", it will still be stored in the segment that is open for consumption "now". There is no strict time-based partitioning for segments, but star-indexes and hybrid tables will handle this as appropriate.
See the Components > Broker for more details about how hybrid tables handle this. Specifically, the time-boundary is computed as max(OfflineTIme) - 1 unit of granularity
. Pinot does store the min-max time for each segment and uses it for pruning segments, so segments with multiple time intervals may not be perfectly pruned.
When generating star-indexes, the time column will be part of the star-tree so the tree can still be efficiently queried for segments with multiple time intervals.
max(OfflineTime)
to determine the time-boundary, and instead using an offset?This lets you have an old event up come in without building complex offline pipelines that perfectly partition your events by event timestamps. With this offset, even if your offline data pipeline produces segments with a maximum timestamp, Pinot will not use the offline dataset for that last chunk of segments. The expectation is if you process offline the next time-range of data, your data pipeline will include any late events.
It might seem odd that segments are not strictly time-partitioned, unlike similar systems such as Apache Druid. This allows real-time ingestion to consume out-of-order events. Even though segments are not strictly time-partitioned, Pinot will still index, prune, and query segments intelligently by time intervals for the performance of hybrid tables and time-filtered data.
When generating offline segments, the segments generated such that segments only contain one time interval and are well partitioned by the time column.