Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Raw source data often needs to undergo some transformations before it is pushed to Pinot.
Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.
A preprocessing job is usually needed to perform these operations. In streaming data sources you might write a Samza job and create an intermediate topic to store the transformed data.
For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.
To make things easier, Pinot supports transformations that can be applied via the table config.
Pinot supports the following functions:
Groovy functions
Inbuilt functions
A transformation function cannot mix Groovy and built-in functions - you can only use one type of function at a time.
Groovy functions can be defined using the syntax:
Any valid Groovy expression can be used.
Enabling Groovy
Allowing execuatable Groovy in ingestion transformation can be a security vulnerability. If you would like to enable Groovy for ingestion, you can set the following controller config.
controller.disable.ingestion.groovy=false
If not set, Groovy for ingestion transformation is disabled by default.
All the functions defined in this directory annotated with @ScalarFunction
(e.g. toEpochSeconds) are supported ingestion transformation functions.
Below are some commonly used built-in Pinot functions for ingestion transformations.
These functions enable time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
toEpochSeconds
Converts epoch millis to epoch seconds.
Usage:"toEpochSeconds(millis)"
toEpochMinutes
Converts epoch millis to epoch minutes
Usage: "toEpochMinutes(millis)"
toEpochHours
Converts epoch millis to epoch hours
Usage: "toEpochHours(millis)"
toEpochDays
Converts epoch millis to epoch days
Usage: "toEpochDays(millis)"
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
toEpochSecondsRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)"
toEpochMinutesRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)"
toEpochHoursRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)"
toEpochDaysRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)"
fromEpochXXX
Converts from an epoch granularity to milliseconds.
fromEpochSeconds
Converts from epoch seconds to milliseconds
"fromEpochSeconds(secondsSinceEpoch)"
fromEpochMinutes
Converts from epoch minutes to milliseconds
"fromEpochMinutes(minutesSinceEpoch)"
fromEpochHours
Converts from epoch hours to milliseconds
"fromEpochHours(hoursSinceEpoch)"
fromEpochDays
Converts from epoch days to milliseconds
"fromEpochDays(daysSinceEpoch)"
Simple date format
Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.
Converts from milliseconds to a formatted date time string, as per the provided pattern
"toDateTime(millis, 'yyyy-MM-dd')"
Converts a formatted date time string to milliseconds, as per the provided pattern
"fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"
Note
Letters that are not part of Simple Date Time legend (https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) need to be escaped. For example:
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
json_format
"json_format(jsonMapField)"
Records can be filtered as they are being ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.
Consider a table that has a column timestamp
. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:
Consider a table that has a string column campaign
and a multi-value column double column prices
. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:
Filter config also supports SQL-like expression of built-in scalar functions for filtering records (starting v 0.11.0+). Example:
Transform functions can be defined on columns in the ingestion config of the table config.
For example, imagine that our source data contains the prices
and timestamp
fields. We want to extract the maximum price and store that in the maxPrices
field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch
field. You can do this by applying the following transformation:
Below are some examples of commonly used functions.
Concat firstName
and lastName
to get fullName
Find max value in array bids
Convert timestamp
from MILLISECONDS
to HOURS
Change name of the column from user_id
to userId
Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name
into the column firstName
, run the following:
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.
For example, we might have the following JSON document in the data
field of our source data:
We can apply one transformation to extract the userId
and then another one to pull out the numerical part of the identifier:
There are 2 kinds of flattening:
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD
By default, Pinot transforms null values coming from the data source to a default value determined by the type of the corresponding column (or as specified in the schema). Eg: for INT column, the default will be 0 and for STRING column, the default is "null"
. This transformation is necessary to ensure all the indices can be built correctly during segment creation. However, we're now unable to keep track of the null values in the Pinot table and hence cannot support queries such as:
There is a workaround by matching with default values in the filter predicate. However, this is error prone since oftentimes it's difficult to distinguish valid values from the default null values. Therefore, we added first class NULL value support in Pinot for overcoming this limitation. As of today, the latest version supports NULL filter predicates only. Generic support for NULL handling in query execution is in progress (eg: within aggregation functions such as count
or sum
).
To turn on NULL
handling, simply enable the boolean flag in the table index config called as nullHandlingEnabled
(see ). Note - this will cause Pinot to use additional memory and disk space per segment. The details are as follows:
During data ingestion (either real-time/offline) eachGenericRow
object derived from the original data source record keeps track of all the column names containing null values. This is done as part of the NullValueTransformer
. For each such column, the segment creation logic updates a NULL value vector (implemented by a roaring bitmap) with the corresponding document ID. Effectively, at the end of the segment creation process we get a per column NULL value vector which can give us the set of document IDs containing null values for that column. This per column vector is then exposed through the DataSource
interface for use in query execution.
During Query execution, if the query includes a IS NULL
or IS NOT NULL
predicate as shown above, we fetch the NULL value vector for the corresponding column within FilterPlanNode
and retrieve the corresponding bitmap which represents all document IDs containing NULL values for that column. This bitmap is then used to create a BitmapBasedFilterOperator
which does the actual filtering operation.
Troubleshoot issues with the multi-stage query engine (v2).
Learn how to when using the multi-stage query engine (v2), and see .
Find instructions on , or see a high-level overview of .
We are continuously improving the v2 multi-stage query engine. A few limitations to call out:
Support for multi value columns is limited to projections, and predicates must use the arrayToMv
function. For example, to successfully run the following query:
You must include arrayToMv
in the query as follows:
Schema and other prefixes are not supported in queries. For example, the following queries are not supported:
Queries without prefixes are supported:
Modifying query behavior based on the cluster configuration is not supported. distinctcounthll
, distinctcounthllmv
, distinctcountrawhll
, and `distinctcountrawhllmv` use
a different default value of log2mParam
in the multi-stage v2 engine. In v2, this value can no longer be configured. Therefore, the following query may produce different results in v1 and v2 engine:
To ensure v2 returns the same result, specify the log2mParam
value in your query:
If a column is repeated more than once in SELECT statement, that column requires disambiguate aliasing. For example, in the following query, the reference to colA
is ambiguous whether it's to the first or second projected colA
:
The solution is to rewrite the query either use aliasing:
Or use index-based referencing:
Pinot single-stage query engine automatically removes the underscore _ character from function names. So co_u_n_t()
is equivalent to count().
In v2, function naming restrictions were tightened, so the underscore(_)
character is only allowed to separate word boundaries in a function name. Also camel case is supported in function names. For example, the following function names are allowed:
Default names for projections with function calls are different between v1 and v2.
For example, in v1, the following query:
Returns the following result:
In v2, the following function:
Returns the following result:
In v2, table and column names and are case sensitive. In v1 they were not. For example, the following two queries are not equivalent in v2:
select * from myTable
select * from mytable
Note: Function names are not case sensitive in v2 or v1.
An arbitrary number of arguments is no longer supported in v2. For example, in v1, the following query worked:
In v2, this query must be rewritten as follows:
IS NULL
and IS NOT NULL
functions do not work correctly in v2
Using the COUNT
function on a NULL
column does not work correctly in v2
Troubleshoot semantic/runtime errors and timeout errors.
Try downloading the latest docker image or building from the latest master commit.
We continuously push bug fixes for the multi-stage engine so bugs you encountered might have already been fixed in the latest master build.
Try rewriting your query.
Some functions previously supported in the single-stage query engine (v1) may have a new way to express in the multi-stage engine (v2). Check and see if you are using any non-standard SQL functions or semantics.