Ingestion Transformations
Last updated
Last updated
Raw source data often needs to undergo some transformations before it is pushed to Pinot.
Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.
A preprocessing job is usually needed to perform these operations. In streaming data sources, you might write a Samza job and create an intermediate topic to store the transformed data.
For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.
To make things easier, Pinot supports transformations that can be applied via the table config.
If a new column is added to your table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s). To ensure accurate values are reloaded, see how to add a new column during ingestion.
Pinot supports the following functions:
Groovy functions
Built-in functions
A transformation function cannot mix Groovy and built-in functions; only use one type of function at a time.
Groovy functions can be defined using the syntax:
Any valid Groovy expression can be used.
Enabling Groovy
Allowing executable Groovy in ingestion transformation can be a security vulnerability. To enable Groovy for ingestion, set the following controller configuration:
controller.disable.ingestion.groovy=false
If not set, Groovy for ingestion transformation is disabled by default.
All the functions defined in this directory annotated with @ScalarFunction
(for example, toEpochSeconds) are supported ingestion transformation functions.
Below are some commonly used built-in Pinot functions for ingestion transformations.
These functions enable time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
toEpochSeconds
Converts epoch millis to epoch seconds.
Usage:"toEpochSeconds(millis)"
toEpochMinutes
Converts epoch millis to epoch minutes
Usage: "toEpochMinutes(millis)"
toEpochHours
Converts epoch millis to epoch hours
Usage: "toEpochHours(millis)"
toEpochDays
Converts epoch millis to epoch days
Usage: "toEpochDays(millis)"
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
toEpochSecondsRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)"
toEpochMinutesRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)"
toEpochHoursRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)"
toEpochDaysRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)"
fromEpochXXX
Converts from an epoch granularity to milliseconds.
fromEpochSeconds
Converts from epoch seconds to milliseconds
"fromEpochSeconds(secondsSinceEpoch)"
fromEpochMinutes
Converts from epoch minutes to milliseconds
"fromEpochMinutes(minutesSinceEpoch)"
fromEpochHours
Converts from epoch hours to milliseconds
"fromEpochHours(hoursSinceEpoch)"
fromEpochDays
Converts from epoch days to milliseconds
"fromEpochDays(daysSinceEpoch)"
Simple date format
Converts simple date format strings to milliseconds and vice versa, per the provided pattern string.
Converts from milliseconds to a formatted date time string, as per the provided pattern
"toDateTime(millis, 'yyyy-MM-dd')"
Converts a formatted date time string to milliseconds, as per the provided pattern
"fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"
Note
Letters that are not part of Simple Date Time legend (https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) need to be escaped. For example:
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
json_format
"json_format(jsonMapField)"
Records can be filtered as they are ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.
Consider a table that has a column timestamp
. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:
Consider a table that has a string column campaign
and a multi-value column double column prices
. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:
Filter config also supports SQL-like expression of built-in scalar functions for filtering records (starting v 0.11.0+). Example:
Transform functions can be defined on columns in the ingestion config of the table config.
For example, imagine that our source data contains the prices
and timestamp
fields. We want to extract the maximum price and store that in the maxPrices
field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch
field. You can do this by applying the following transformation:
Below are some examples of commonly used functions.
Concat firstName
and lastName
to get fullName
Find max value in array bids
Convert timestamp
from MILLISECONDS
to HOURS
Change name of the column from user_id
to userId
Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name
into the column firstName
, run the following:
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.
For example, we might have the following JSON document in the data
field of our source data:
We can apply one transformation to extract the userId
and then another one to pull out the numerical part of the identifier:
There are 2 kinds of flattening:
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD
If a new column is added to table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s).
To ensure accurate values are reloaded, do the following:
Pause consumption (and wait for pause status success): $ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption
Apply new table or schema configurations.
Reload segments using the Pinot Controller API or Pinot Admin Console.
Resume consumption: $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption
Converts a JSON/AVRO complex object to a string. This json map can then be queried using function.