Ingestion FAQ
This page has a collection of frequently asked questions about ingestion with answers from the community.
Was this helpful?
This page has a collection of frequently asked questions about ingestion with answers from the community.
Was this helpful?
While Apache Pinot can work with segments of various sizes, for optimal use of Pinot, you want to get your segments sized in the 100MB to 500MB (un-tarred/uncompressed) range. Having too many (thousands or more) tiny segments for a single table creates overhead in terms of the metadata storage in Zookeeper as well as in the Pinot servers' heap. At the same time, having too few really large (GBs) segments reduces parallelism of query execution, as on the server side, the thread parallelism of query execution is at segment level.
Yes. Each table can be independently configured to consume from any given Kafka topic, regardless of whether there are other tables that are also consuming from the same Kafka topic.
Pinot automatically detects new partitions in Kafka topics. It checks for new partitions whenever RealtimeSegmentValidationManager
periodic job runs and starts consumers for new partitions.
You can configure the interval for this job using thecontroller.realtime.segment.validation.frequencyPeriod
property in the controller configuration.
Pinot supports multi-column partitioning for offline tables. Map multiple columns under Pinot assigns the input data to each partition according to the partition configuration individually for each column.
The following example partitions the segment based on two columns, memberID
and caseNumber
. Note that each partition column is handled separately, so in this case the segment is partitioned on memberID
(partition ID 1) and also partiitoned on caseNumber
(partition ID 2).
For multi-column partitioning to work, you must also set routing.segementPrunerTypes
as follows:
The partitioning logic in the stream should match the partitioning config in Pinot. Kafka uses murmur2
, and the equivalent in Pinot is the Murmur
function.
Set the partitioning configuration as below using same column used in Kafka:
and also set:
For JSON, you can use a hex encoded string to ingest BYTES.
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
By default, Pinot limits the length of a String column to 512 bytes. If you want to overwrite this value, you can set the maxLength attribute in the schema as follows:
Events are available to queries as soon as they are ingested. This is because events are instantly indexed in memory upon ingestion.
The ingestion of events into the real-time table is not transactional, so replicas of the open segment are not immediately consistent. Pinot trades consistency for availability upon network partitioning (CAP theorem) to provide ultra-low ingestion latencies at high throughput.
This typically happens if:
The consumer is lagging a lot.
The consumer was down (server down, cluster down), and the stream moved on, resulting in offset not found when consumer comes back up.
The output from this API should look something like the following:
Not all indexes can be retrospectively applied to existing segments.
The new segments will have star-tree indexes generated after applying the star-tree index configurations to the table configuration.
Pinot does not require ordering of event time stamps. Out of order events are still consumed and indexed into the "currently consuming" segment. In a pathological case, if you have a 2 day old event come in "now", it will still be stored in the segment that is open for consumption "now". There is no strict time-based partitioning for segments, but star-indexes and hybrid tables will handle this as appropriate.
When generating star-indexes, the time column will be part of the star-tree so the tree can still be efficiently queried for segments with multiple time intervals.
max(OfflineTime)
to determine the time-boundary, and instead using an offset?This lets you have an old event up come in without building complex offline pipelines that perfectly partition your events by event timestamps. With this offset, even if your offline data pipeline produces segments with a maximum timestamp, Pinot will not use the offline dataset for that last chunk of segments. The expectation is if you process offline the next time-range of data, your data pipeline will include any late events.
It might seem odd that segments are not strictly time-partitioned, unlike similar systems such as Apache Druid. This allows real-time ingestion to consume out-of-order events. Even though segments are not strictly time-partitioned, Pinot will still index, prune, and query segments intelligently by time intervals for the performance of hybrid tables and time-filtered data.
When generating offline segments, the segments generated such that segments only contain one time interval and are well partitioned by the time column.
Set up partitioner in the Kafka producer:
To learn how partition works, see .
See the function which can store a top level json field as a STRING in Pinot.
Then you can use these during query time, to extract fields from the json string.
To use explicit code points, you must double-quote (not single-quote) the string, and escape the code point via "\uHHHH", where HHHH is the four digit hex code for the character. See for more details.
However, when the open segment is closed and its in-memory indexes are flushed to persistent storage, all its replicas are guaranteed to be consistent, with the .
In case of Kafka, to recover, set property "auto.offset.reset":"earliest"
in the streamConfigs
section and reset the CONSUMING
segment. See for more details about the configuration.
You can also also use the "Resume Consumption" endpoint with "resumeFrom" parameter set to "smallest" (or "largest" if you want). See for more details.
Inverted indexes are set in the tableConfig
's tableIndexConfig
-> invertedIndexColumns
list. For more info on table configuration, see . For an example showing how to configure an inverted index, see .
Applying inverted indexes to a table configuration will generate an inverted index for all new segments. To apply the inverted indexes to all existing segments, see
Add the columns you want to index to the tableIndexConfig
-> invertedIndexColumns
list. To update the table configuration use the Pinot Swagger API: .
Invoke the reload API: .
Once you've done that, you can check whether the index has been applied by querying the segment metadata API at . Don't forget to include the names of the column on which you have applied the index.
If you want to add or change the or adjust you will need to manually re-load any existing segments.
Star-tree indexes are configured in the table config under the tableIndexConfig
-> starTreeIndexConfigs
(list) and enableDefaultStarTree
(boolean). See here for more about how to configure star-tree indexes:
See the for more details about how hybrid tables handle this. Specifically, the time-boundary is computed as max(OfflineTIme) - 1 unit of granularity
. Pinot does store the min-max time for each segment and uses it for pruning segments, so segments with multiple time intervals may not be perfectly pruned.