Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
The new multi-stage query engine (a.k.a V2 query engine) is designed to support more complex SQL semantics such as JOIN
, OVER
window, MATCH_RECOGNIZE
and eventually, make Pinot support closer to full ANSI SQL semantics.
It also resolves the bottleneck effect for the broker reduce stage where only a single machine is dedicated to perform heavy lifting such as high cardinality GROUP BY
result merging; ORDER BY
sorting, etc.
To enable the V2 engine,
please make sure to either
Building Apache Pinot using the latest master commit.
Download the latest Apache Pinot docker image using the official guide.
Please add the following configurations to your cluster config:
Start the cluster normally, you should see the following window in the controller query page:
The overall PEP design doc and discussion can be found in the following links
By default, Pinot transforms null values coming from the data source to a default value determined by the type of the corresponding column (or as specified in the schema). Eg: for INT column, the default will be 0 and for STRING column, the default is "null"
. This transformation is necessary to ensure all the indices can be built correctly during segment creation. However, we're now unable to keep track of the null values in the Pinot table and hence cannot support queries such as:
There is a workaround by matching with default values in the filter predicate. However, this is error prone since oftentimes it's difficult to distinguish valid values from the default null values. Therefore, we added first class NULL value support in Pinot for overcoming this limitation. As of today, the latest version supports NULL filter predicates only. Generic support for NULL handling in query execution is in progress (eg: within aggregation functions such as count
or sum
).
To turn on NULL
handling, simply enable the boolean flag in the table index config called as nullHandlingEnabled
(please see tableIndexConfig section). Please note - this will cause Pinot to use additional memory and disk space per segment. The details are as follows:
During data ingestion (either realtime/offline) eachGenericRow
object derived from the original data source record keeps track of all the column names containing null values. This is done as part of the NullValueTransformer
. For each such column, the segment creation logic updates a NULL value vector (implemented by a roaring bitmap) with the corresponding document ID. Effectively, at the end of the segment creation process we get a per column NULL value vector which can give us the set of document IDs containing null values for that column. This per column vector is then exposed through the DataSource
interface for use in query execution.
During Query execution, if the query includes a IS NULL
or IS NOT NULL
predicate as shown above, we fetch the NULL value vector for the corresponding column within FilterPlanNode
and retrieve the corresponding bitmap which represents all document IDs containing NULL values for that column. This bitmap is then used to create a BitmapBasedFilterOperator
which does the actual filtering operation.
Raw source data often needs to undergo some transformations before it is pushed to Pinot.
Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.
A preprocessing job is usually needed to perform these operations. In streaming data sources you might write a Samza job and create an intermediate topic to store the transformed data.
For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.
To make things easier, Pinot supports transformations that can be applied via the .
Pinot supports the following functions:
Groovy functions
Inbuilt functions
A transformation function cannot mix Groovy and inbuilt functions - you can only use one type of function at a time.
Groovy functions can be defined using the syntax:
Any valid Groovy expression can be used.
Enabling Groovy
Allowing execuatable Groovy in ingestion transformation can be a security vulnerability. If you would like to enable Groovy for ingestion, you can set the following controller config.
controller.disable.ingestion.groovy=false
If not set, Groovy for ingestion transformation is disabled by default.
There are also several inbuilt functions that can be used directly as ingestion transform functions
These functions enable time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
fromEpochXXX
Converts from an epoch granularity to milliseconds.
Simple date format
Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.
Note
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
Records can be filtered as they are being ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.
Consider a table that has a column timestamp
. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:
Consider a table that has a string column campaign
and a multi-value column double column prices
. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:
Transform functions can be defined on columns in the ingestion config of the table config.
For example, imagine that our source data contains the prices
and timestamp
fields. We want to extract the maximum price and store that in the maxPrices
field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch
field. You can do this by applying the following transformation:
Below are some examples of commonly used functions.
Concat firstName
and lasName
to get fullName
Find max value in array bids
Convert timestamp
from MILLISECONDS
to HOURS
Change name of the column from user_id
to userId
Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name
into the column firstName
, run the following:
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.
For example, we might have the following JSON document in the data
field of our source data:
We can apply one transformation to extract the userId
and then another one to pull out the numerical part of the identifier:
There are 2 kinds of flattening:
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD
Letters that are not part of Simple Date Time legend () need to be escaped. For example:
toEpochSeconds
Converts epoch millis to epoch seconds.
Usage:"toEpochSeconds(millis)"
toEpochMinutes
Converts epoch millis to epoch minutes
Usage: "toEpochMinutes(millis)"
toEpochHours
Converts epoch millis to epoch hours
Usage: "toEpochHours(millis)"
toEpochDays
Converts epoch millis to epoch days
Usage: "toEpochDays(millis)"
toEpochSecondsRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)"
toEpochMinutesRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)"
toEpochHoursRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)"
toEpochDaysRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)"
fromEpochSeconds
Converts from epoch seconds to milliseconds
"fromEpochSeconds(secondsSinceEpoch)"
fromEpochMinutes
Converts from epoch minutes to milliseconds
"fromEpochMinutes(minutesSinceEpoch)"
fromEpochHours
Converts from epoch hours to milliseconds
"fromEpochHours(hoursSinceEpoch)"
fromEpochDays
Converts from epoch days to milliseconds
"fromEpochDays(daysSinceEpoch)"
Converts from milliseconds to a formatted date time string, as per the provided pattern
"toDateTime(millis, 'yyyy-MM-dd')"
Converts a formatted date time string to milliseconds, as per the provided pattern
"fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"
json_format
Converts a JSON/AVRO complex object to a string. This json map can then be queried using jsonExtractScalar function.
"json_format(jsonMapField)"
Segments for offline tables are constructed outside of Pinot, typically in Hadoop via map-reduce jobs and ingested into Pinot via REST API provided by the Controller. Pinot provides libraries to create Pinot segments out of input files in AVRO, JSON or CSV formats in a hadoop job, and push the constructed segments to the controllers via REST APIs.
When an Offline segment is ingested, the controller looks up the table’s configuration and assigns the segment to the servers that host the table. It may assign multiple servers for each segment depending on the number of replicas configured for that table.
Pinot supports different segment assignment strategies that are optimized for various use cases.
Once segments are assigned, Pinot servers get notified via Helix to “host” the segment. The segments are downloaded from the remote segment store to the local storage, untarred, and memory-mapped.
Once the server has loaded (memory-mapped) the segment, Helix notifies brokers of the availability of these segments. The brokers start to include the new segments for queries. Brokers support different routing strategies depending on the type of table, the segment assignment strategy, and the use case.
Data in offline segments are immutable (Rows cannot be added, deleted, or modified). However, segments may be replaced with modified data.
Starting from release-0.11.0
, Pinot supports uploading offline segments to real-time tables. This is useful when user wants to bootstrap a real-time table with some initial data, or add some offline data to a real-time table without changing the data stream. Note that this is different from the hybrid table setup, and no time boundary is maintained between the offline segments and the real-time segments.
Segments for realtime tables are constructed by Pinot servers with rows ingested from data streams such as Kafka. Rows ingested from streams are made available for query processing as soon as they are ingested, thus enabling applications such as those that need real-time charts on analytics.
In large scale installations, data in streams is typically split across multiple stream partitions. The underlying stream may provide consumer implementations that allow applications to consume data from any subset of partitions, including all partitions (or, just from one partition).
A pinot table can be configured to consume from streams in one of two modes:
LowLevel
: This is the preferred mode of consumption. Pinot creates independent partition-level consumers for each partition. Depending on the the configured number of replicas, multiple consumers may be created for each partition, taking care that no two replicas exist on the same server host. Therefore you need to provision at least as many hosts as the number of replcias configured.
HighLevel
: Pinot creates one stream-level consumer that consumes from all partitions. Each message consumed could be from any of the partitions of the stream. Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.
Of course, the underlying stream should support either mode of consumption in order for a Pinot table to use that mode. Kafka has support for both of these modes. See Stream ingestion for more information on the support of other data streams in Pinot.
In either mode, Pinot servers store the ingested rows in volatile memory until either one of the following conditions are met:
A certain number of rows are consumed
The consumption has gone on for a certain length of time
(See StreamConfigs Section on how to set these values, or have pinot compute them for you)
Upon reaching either one of these limits, the servers do the following:
Pause consumption
Persist the rows consumed so far into non-volatile storage
Continue consuming new rows into volatile memory again.
The persisted rows form what we call a completed segment (as opposed to a consuming segment that resides in volatile memory).
In LowLevel
mode, the completed segments are persisted the into local non-volatile store of pinot server as well as the segment store of the pinot cluster (See Pinot Architecture Overview). This allows for easy and automated mechanisms for replacing pinot servers, or expanding capacity, etc. Pinot has special mechanisms that ensure that the completed segment is equivalent across all replicas.
During segment completion, one winner is chosen by the controller from all the replicas as the committer server
. The committer server
builds the segment and uploads it to the controller. All the other non-committer servers
follow one of these two paths:
If the in-memory segment is equivalent to the committed segment, the non-committer
server also builds the segment locally and replaces the in-memory segment
If the in-memory segment is non equivalent to the committed segment, the non-committer
server downloads the segment from the controller.
For more details on this protocol, please refer to this doc.
In HighLevel
mode, the servers persist the consumed rows into local store (and not the segment store). Since consumption of rows can be from any partition, it is not possible to guarantee equivalence of segments across replicas.
See Consuming and Indexing rows in Realtime for details.
Many data analytics use-cases only need aggregated data. For example, data used in charts can be aggregated down to one row per time bucket per dimension combination.
Doing this results in much less storage and better query performance. Configuring this for a table is done via the Aggregation Config in the table config.
The aggregation config controls the aggregations that happen during realtime data ingestion. Offline aggregations must be handled separately.
Below is a description of the config, which is defined in the ingestion config of the table config.
Here is an example of sales data, where only the daily sales aggregates per product are needed.
Note that the schema only reflects the final table structure.
car
2
2800.00
18193
truck
1
2200.00
18193
truck
1
700.00
18199
car
2
3300.00
18200
truck
1
800.00
18202
car
3
3700.00
18202
The following are required for ingestion aggregation to work:
Stream ingestion type must be lowLevel.
All metrics must have aggregation configs.
All metrics must be noDictionaryColumns.
MAX
MIN
SUM
COUNT
Specify as COUNT(*)
DISTINCTCOUNTHLL
Not available yet, but coming soon
Startrees can only be added to realtime segments after the segments has sealed, and creating startrees is CPU-intensive. Ingestion Aggregation works for consuming segments and uses no additional CPU.
Startrees take additional memory to store, while ingestion aggregation stores less data than the original dataset.
If the original rows in non-aggregated form are needed, then ingestion-aggregation cannot be used.
aggregateMetrics
setting?The aggregateMetrics
works the same as Ingestion Aggregation, but only allows for the SUM function.
The current changes are backward compatible, so no need to change your table config unless you need a different aggregation function.
Ingestion Aggregation only works for realtime ingestion. For offline data, the offline process needs to generate the aggregates separately.
If a metric isn't aggregated then it will result in more than one row per unique set of dimensions.