Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Many data analytics use-cases only need aggregated data. For example, data used in charts can be aggregated down to one row per time bucket per dimension combination.
Doing this results in much less storage and better query performance. Configuring this for a table is done via the Aggregation Config in the table config.
The aggregation config controls the aggregations that happen during real-time data ingestion. Offline aggregations must be handled separately.
Below is a description of the config, which is defined in the ingestion config of the table config.
The following are required for ingestion aggregation to work:
Ingestion aggregation config is effective only for real-time tables. (There is no ingestion time aggregation support for offline tables. We need use Merge/Rollup Task or pre-process aggregations in the offline data flow using batch processing engines like Spark/MapReduce).
Stream ingestion type must be lowLevel.
All metrics must have aggregation configs.
All metrics must be noDictionaryColumns.
aggregatedFieldName
must be in the Pinot schema and originalFieldName
must not exist in Pinot schema
Here is an example of sales data, where only the daily sales aggregates per product are needed.
Note that the schema only reflects the final table structure.
From the below aggregation config example, note that price
exists in the input data while total_sales
exists in the Pinot Schema.
car
2
2800.00
18193
truck
1
2200.00
18193
truck
1
700.00
18199
car
2
3300.00
18200
truck
1
800.00
18202
car
3
3700.00
18202
MAX
MIN
SUM
COUNT
Specify as COUNT(*)
DISTINCTCOUNTHLL
DISTINCTCOUNTHLLPLUS
SUMPRECISION
Specify as SUMPRECISION(field, precision)
, precision must be defined. Used to compute the maximum possible size of the field. Cannot be changed later, a new field must be used. The schema for the output field should be BIG_DECIMAL
type.
Startrees can only be added to real-time segments after the segments has sealed, and creating startrees is CPU-intensive. Ingestion Aggregation works for consuming segments and uses no additional CPU.
Startrees take additional memory to store, while ingestion aggregation stores less data than the original dataset.
If the original rows in non-aggregated form are needed, then ingestion-aggregation cannot be used.
aggregateMetrics
setting?The aggregateMetrics
works the same as Ingestion Aggregation, but only allows for the SUM function.
The current changes are backward compatible, so no need to change your table config unless you need a different aggregation function.
Ingestion Aggregation only works for real-time ingestion. For offline data, the offline process needs to generate the aggregates separately.
If a metric isn't aggregated then it will result in more than one row per unique set of dimensions.
Specify as DISTINCTCOUNTHLL(field, log2m)
, default is 12. See for how to define log2m
. Cannot be changed later, a new field must be used. The schema for the output field should be BYTES
type.
Specify as DISTINCTCOUNTHLLPLUS(field, s, p)
. See for how to define s
and p
, they cannot be changed later. The schema for the output field should be BYTES
type.
For performance reasons, null handling support is disabled by default in Apache Pinot. When null support is disabled, all columns are treated as not null. Predicates like IS NOT NULL
evaluates to true,
and IS NULL
evaluates to false
. Aggregation functions like COUNT
, SUM
, AVG
, MODE
, etc. treat all columns as not null.
For example, the predicate in the query below matches all records.
To handle null values in your data, you must configure your tables to store nulls at ingestion time. This has to be done before ingesting the data. Tables where null values are stored support basic null handling, which is limited to IS NULL
and IS NOT NULL
predicates, and can optionally be queried with advanced null handling support.
The following table summarizes the behavior of null handling support in Pinot:
IS NULL
always false
depends on data
depends on data
IS NOT NULL
always true
depends on data
depends on data
Transformation functions
use default value
use default value
null aware
Null aware aggregations
use default value
use default value
null aware
Pinot always stores column values in a forward index. Forward index never stores null values but have to store a value for each row. Therefore independent of the null handling configuration, Pinot always stores a default value for nulls rows in the forward index. The default value used in a column can be specified in the schema configuration by setting the defaultNullValue
field spec. The defaultNullValue
depends on the type of data.
Remember that in the JSON used as table configuration, defaultNullValue
must always be a String. If the column type is not String, Pinot will convert that value to the column type automatically.
To support null handling, Pinot must store null values in segments. The forward index stores the default value for null rows whether null storing is enabled or not. When null storing is enabled, Pinot creates a new index called the null index or null vector index. This index stores the document IDs of the rows that have null values for the column.
Although null storing can be enabled after data has been ingested, data ingested before this mode is enabled will not store the null index and therefore it will be treated as not null.
Null support is configured per table. You can configure one table to store nulls, and configure another table to not store nulls. There are two ways to define null storing support in Pinot:
Column based null handling, where each column in a table is configured as nullable or not nullable. We recommend enabling null storing support by column. This is the only way to support null handling in the multi-stage query engine.
Table based null handling, where all columns in the table are considered nullable. This is how null values were handled before Pinot 1.1.0 and now deprecated.
We recommend configuring column based null storing, which lets you specify null handling per column and supports null handling in the multi-stage query engine.
To enable column based null handling:
Set enableColumnBasedNullHandling to true
in the schema configuration before ingesting data.
Then specify which columns are not nullable using the notNull
field spec, which defaults to false.
This is the only way to enable null storing in Pinot before 1.1.0, but it is deprecated since then. Table based null storing is more expensive in terms of disk space and query performance than column based null storing. Also, it is not possible to support null handling in multi-stage query engine using table based null storing.
To enable table based null storing, enable the nullHandlingEnabled
configuration in tableIndexConfig.nullHandlingEnabled before ingesting data. All columns in the table are now nullable.
Remember nullHandlingEnabled
table configuration enables table based null handling while enableNullHandling
is the query option that enables advanced null handling at query time. See advanced null handling support for more information.
As an example:
To enable basic null handling by at query time, enable Pinot to store nulls at ingestion time. Advanced null handling support can be optionally enabled.
The multi-stage query engine requires column based null storing. Tables with table based null storing are considered not nullable.
If you are converting from null support for the single-stage query engine, you can simplify your model by removing nullHandlingEnabled
at the same time you set enableColumnBasedNullHandling
. Also, when converting:
No reingestion is needed.
If the columns are changed from nullable to not nullable and there is a value that was previously null, the default value will be used instead.
The basic null support is automatically enabled when null values are stored on a segment (see storing nulls at ingestion time).
In this mode, Pinot is able to handle simple predicates like IS NULL
or IS NOT NULL
. Other transformation functions (like CASE
, COALESCE
, +
, etc.) and aggregations functions (like COUNT
, SUM
, AVG
, etc.) will use the default value specified in the schema for null values.
For example, in the following table:
0
null
1
1
2
2
3
2
4
null
If the default value for col1
is 1
, the following query:
Will return the following result:
1
1
2
2
3
2
While
While return the following:
0
2
1
2
2
3
3
3
4
2
And queries like
Will return
0
null
1
1
4
null
Also
5
1
Given that neither count
or mode
function will ignore null
values as expected but read instead the default value (in this case 1
) stored in the forward index.
Advanced null handling has two requirements:
Segments must store null values (see storing nulls at ingestion time).
The query must enable null handling by setting the enableNullHandling
query option to true
.
The later can be done in one of the following ways:
Set enableNullHandling=true
at the beginning of the query.
If using JDBC, set the connection option enableNullHandling=true
(either in the URL or as a property).
Even they have similar names, the nullHandlingEnabled
table configuration and the enableNullHandling
query option are different. Remember nullHandlingEnabled
table configuration modifies how segments are stored and enableNullHandling
query option modifies how queries are executed.
When the enableNullHandling
option is set to true
, the Pinot query engine uses a different execution path that interprets nulls in a standard SQL way. This means that IS NULL
and IS NOT NULL
predicates will evaluate to true
or false
according to whether a null is detected (like in basic null support mode) but also aggregation functions like COUNT
, SUM
, AVG
, MODE
, etc. will deal with null values as expected (usually ignoring null values).
In this mode, some indexes may not be usable, and queries may be significantly more expensive. Performance degradation impacts all the columns in the table, including columns in the query that do not contain null values. This degradation happens even when table uses column base null storing.
If you're not able to generate the null index for your use case, you may filter for null values using a default value specified in your schema or a specific value included in your query.
The following example queries work when the null value is not used in a dataset. Unexpected values may be returned if the specified null value is a valid value in the dataset.
Specify a default null value (defaultNullValue
) in your schema for dimension fields, (dimensionFieldSpecs
), metric fields (metricFieldSpecs)
, and date time fields (dateTimeFieldSpecs
).
Ingest the data.
To filter out the specified default null value, for example, you could write a query like the following:
Filter for a specific value in your query that will not be included in the dataset. For example, to calculate the average age, use -1
to indicate the value of Age
is null
.
Rewrite the following query:
To cover null values as follows:
Segments for offline tables are constructed outside of Pinot, typically in Hadoop via map-reduce jobs and ingested into Pinot via REST API provided by the Controller. Pinot provides libraries to create Pinot segments out of input files in AVRO, JSON or CSV formats in a hadoop job, and push the constructed segments to the controllers via REST APIs.
When an Offline segment is ingested, the controller looks up the table’s configuration and assigns the segment to the servers that host the table. It may assign multiple servers for each segment depending on the number of replicas configured for that table.
Pinot supports different segment assignment strategies that are optimized for various use cases.
Once segments are assigned, Pinot servers get notified via Helix to “host” the segment. The segments are downloaded from the remote segment store to the local storage, untarred, and memory-mapped.
Once the server has loaded (memory-mapped) the segment, Helix notifies brokers of the availability of these segments. The brokers start to include the new segments for queries. Brokers support different routing strategies depending on the type of table, the segment assignment strategy, and the use case.
Data in offline segments are immutable (Rows cannot be added, deleted, or modified). However, segments may be replaced with modified data.
Starting from release-0.11.0
, Pinot supports uploading offline segments to real-time tables. This is useful when user wants to bootstrap a real-time table with some initial data, or add some offline data to a real-time table without changing the data stream. Note that this is different from the setup, and no time boundary is maintained between the offline segments and the real-time segments.
Segments for real-time tables are constructed by Pinot servers with rows ingested from data streams such as Kafka. Rows ingested from streams are made available for query processing as soon as they are ingested, thus enabling applications such as those that need real-time charts on analytics.
In large scale installations, data in streams is typically split across multiple stream partitions. The underlying stream may provide consumer implementations that allow applications to consume data from any subset of partitions, including all partitions (or, just from one partition).
A pinot table can be configured to consume from streams in one of two modes:
LowLevel
: This is the preferred mode of consumption. Pinot creates independent partition-level consumers for each partition. Depending on the the configured number of replicas, multiple consumers may be created for each partition, taking care that no two replicas exist on the same server host. Therefore you need to provision at least as many hosts as the number of replicas configured.
HighLevel
: Pinot creates one stream-level consumer that consumes from all partitions. Each message consumed could be from any of the partitions of the stream. Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.
Of course, the underlying stream should support either mode of consumption in order for a Pinot table to use that mode. Kafka has support for both of these modes. See for more information on the support of other data streams in Pinot.
In either mode, Pinot servers store the ingested rows in volatile memory until either one of the following conditions are met:
A certain number of rows are consumed
The consumption has gone on for a certain length of time
Upon reaching either one of these limits, the servers do the following:
Pause consumption
Persist the rows consumed so far into non-volatile storage
Continue consuming new rows into volatile memory again.
The persisted rows form what we call a completed segment (as opposed to a consuming segment that resides in volatile memory).
During segment completion, one winner is chosen by the controller from all the replicas as the committer server
. The committer server
builds the segment and uploads it to the controller. All the other non-committer servers
follow one of these two paths:
If the in-memory segment is equivalent to the committed segment, the non-committer
server also builds the segment locally and replaces the in-memory segment
If the in-memory segment is non equivalent to the committed segment, the non-committer
server downloads the segment from the controller.
In HighLevel
mode, the servers persist the consumed rows into local store (and not the segment store). Since consumption of rows can be from any partition, it is not possible to guarantee equivalence of segments across replicas.
To query using distributed joins, window functions, and other multi-stage operators in real time, turn on the multi-stage query engine (v2).
To query using distributed joins, window functions, and other multi-stage operators in real time, you must enable the multi-stage query engine (v2). To enable v2, do any of the following:
Enable the multi-stage query engine
Programmatically access the multi-stage query engine:
Query
Query outside of the APIs
To learn more about what the multi-stage query engine is, see .
To enable the multi-stage query engine, in the Pinot Query Console, select the Use Multi-Stage Engine check box.
To query the Pinot multi-stage query engine, use REST APIs or the query option:
The Controller admin API and the Broker query API allow optional JSON payload for configuration. For example:
To enable the multi-stage engine via a query outside of the API, add the useMultistageEngine=true
option to the top of your query.
For example:
Raw source data often needs to undergo some transformations before it is pushed to Pinot.
Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.
A preprocessing job is usually needed to perform these operations. In streaming data sources, you might write a Samza job and create an intermediate topic to store the transformed data.
For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.
To make things easier, Pinot supports transformations that can be applied via the .
If a new column is added to your table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s). To ensure accurate values are reloaded, see how to .
Pinot supports the following functions:
Groovy functions
Built-in functions
A transformation function cannot mix Groovy and built-in functions; only use one type of function at a time.
Groovy functions can be defined using the syntax:
Any valid Groovy expression can be used.
Enabling Groovy
Allowing executable Groovy in ingestion transformation can be a security vulnerability. To enable Groovy for ingestion, set the following controller configuration:
controller.disable.ingestion.groovy=false
If not set, Groovy for ingestion transformation is disabled by default.
Below are some commonly used built-in Pinot functions for ingestion transformations.
These functions enable time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
fromEpochXXX
Converts from an epoch granularity to milliseconds.
Simple date format
Converts simple date format strings to milliseconds and vice versa, per the provided pattern string.
Note
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
Records can be filtered as they are ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.
Consider a table that has a column timestamp
. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:
Consider a table that has a string column campaign
and a multi-value column double column prices
. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:
Transform functions can be defined on columns in the ingestion config of the table config.
For example, imagine that our source data contains the prices
and timestamp
fields. We want to extract the maximum price and store that in the maxPrices
field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch
field. You can do this by applying the following transformation:
Below are some examples of commonly used functions.
Concat firstName
and lastName
to get fullName
Find max value in array bids
Convert timestamp
from MILLISECONDS
to HOURS
Change name of the column from user_id
to userId
Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name
into the column firstName
, run the following:
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.
For example, we might have the following JSON document in the data
field of our source data:
We can apply one transformation to extract the userId
and then another one to pull out the numerical part of the identifier:
There are 2 kinds of flattening:
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD
If a new column is added to table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s).
To ensure accurate values are reloaded, do the following:
Pause consumption (and wait for pause status success): $ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption
Apply new table or schema configurations.
Resume consumption: $ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption
(See on how to set these values, or have pinot compute them for you)
In LowLevel
mode, the completed segments are persisted into the local non-volatile store of pinot server as well as the segment store of the pinot cluster (See ). This allows for easy and automated mechanisms for replacing pinot servers, or expanding capacity, etc. Pinot has that ensure that the completed segment is equivalent across all replicas.
For more details on this protocol, refer to .
See for details.
All the functions defined in annotated with @ScalarFunction
(for example, ) are supported ingestion transformation functions.
Letters that are not part of Simple Date Time legend () need to be escaped. For example:
Filter config also supports SQL-like expression of built-in for filtering records (starting v 0.11.0+). Example:
using the or .
toEpochSeconds
Converts epoch millis to epoch seconds.
Usage:"toEpochSeconds(millis)"
toEpochMinutes
Converts epoch millis to epoch minutes
Usage: "toEpochMinutes(millis)"
toEpochHours
Converts epoch millis to epoch hours
Usage: "toEpochHours(millis)"
toEpochDays
Converts epoch millis to epoch days
Usage: "toEpochDays(millis)"
toEpochSecondsRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)"
toEpochMinutesRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)"
toEpochHoursRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)"
toEpochDaysRounded
Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)"
fromEpochSeconds
Converts from epoch seconds to milliseconds
"fromEpochSeconds(secondsSinceEpoch)"
fromEpochMinutes
Converts from epoch minutes to milliseconds
"fromEpochMinutes(minutesSinceEpoch)"
fromEpochHours
Converts from epoch hours to milliseconds
"fromEpochHours(hoursSinceEpoch)"
fromEpochDays
Converts from epoch days to milliseconds
"fromEpochDays(daysSinceEpoch)"
Converts from milliseconds to a formatted date time string, as per the provided pattern
"toDateTime(millis, 'yyyy-MM-dd')"
Converts a formatted date time string to milliseconds, as per the provided pattern
"fromDateTime(dateTimeStr, 'EEE MMM dd HH:mm:ss ZZZ yyyy')"
json_format
Converts a JSON/AVRO complex object to a string. This json map can then be queried using jsonExtractScalar function.
"json_format(jsonMapField)"
Set up Pinot by starting each component individually
Start Pinot Components using docker
Prerequisites
If running locally, ensure your docker cluster has enough resources, below is a sample config.
Pull Docker image
You can try out pre-built Pinot all-in-one Docker image.
(Optional) You can also follow the instructions here to build your own images.
0. Create a network
Create an isolated bridge network in Docker.
1. Start Zookeeper
Start Zookeeper in daemon.
Start ZKUI to browse Zookeeper data at http://localhost:9090.
2. Start Pinot Controller
Start Pinot Controller in daemon and connect to Zookeeper.
3. Start Pinot Broker
Start Pinot Broker in daemon and connect to Zookeeper.
4. Start Pinot Server
Start Pinot Server in daemon and connect to Zookeeper.
Now all Pinot related components are started as an empty cluster.
You can run below command to check container status.
Sample Console Output
Download Pinot Distribution from http://pinot.apache.org/download/
Start Pinot components via launcher scripts
Start Zookeeper
Start Pinot Controller
See controller page for more details .
Start Pinot Broker
Start Pinot Server
Often times we need to customized the setup of Pinot components. Hence user can compile a config file and use it to start Pinot components.
Below are the examples config files and sample command to start Pinot.
Below is a sample pinot-controller.conf
used in HelmChart setup.
In order to run Pinot Controller, the command is:
Below are some configurations you can set in Pinot Controller. You can head over to Controller for complete list of available configs.
controller.helix.cluster.name
Pinot Cluster name
PinotCluster
controller.host
Pinot Controller Host
Required if config pinot.set.instance.id.to.hostname is false.
pinot.set.instance.id.to.hostname
When enabled, use server hostname to infer controller.host
false
controller.port
Pinot Controller Port
9000
controller.vip.host
The VIP hostname used to set the download URL for segments
${controller.host}
controller.vip.port
The VIP port used to set the download URL for segments
${controller.port}
controller.data.dir
Directory to host segment data
${java.io.tmpdir}/PinotController
controller.zk.str
Zookeeper URL
localhost:2181
cluster.tenant.isolation.enable
Enable Tenant Isolation, default is single tenant cluster
true
Below is a sample pinot-broker.conf
used in HelmChart setup.
In order to run Pinot Broker, the command is:
Below are some configurations you can set in Pinot Broker. You can head over to Broker for complete list of available configs.
instanceId
Unique id to register Pinot Broker in the cluster.
BROKER_${BROKER_HOST}_${pinot.broker.client.queryPort}
pinot.set.instance.id.to.hostname
When enabled, use server hostname to set ${BROKER_HOST} in above config, else use IP address.
false
pinot.broker.client.queryPort
Port to query Pinot Broker
8099
pinot.broker.timeoutMs
Timeout for Broker Query in Milliseconds
10000
pinot.broker.enable.query.limit.override
Configuration to enable Query LIMIT Override to protect Pinot Broker and Server from fetch too many records back.
false
pinot.broker.query.response.limit
When config pinot.broker.enable.query.limit.override is enabled, reset limit for selection query if it exceeds this value.
2147483647
pinot.broker.startup.minResourcePercent
Configuration to consider the broker ServiceStatus as being STARTED if the percent of resources (tables) that are ONLINE for this this broker has crossed the threshold percentage of the total number of tables that it is expected to serve
100.0
Below is a sample pinot-server.conf
used in HelmChart setup.
In order to run Pinot Server, the command is:
Below are some outstanding configurations you can set in Pinot Server. You can head over to Server for complete list of available configs.
instanceId
Unique id to register Pinot Server in the cluster.
Server_${SERVER_HOST}_${pinot.server.netty.port}
pinot.set.instance.id.to.hostname
When enabled, use server hostname to set ${SERVER_HOST} in above config, else use IP address.
false
pinot.server.netty.port
Port to query Pinot Server
8098
pinot.server.adminapi.port
Port for Pinot Server Admin UI
8097
pinot.server.instance.dataDir
Directory to hold all the data
${java.io.tmpDir}/PinotServer/index
pinot.server.instance.segmentTarDir
Directory to hold temporary segments downloaded from Controller or Deep Store
${java.io.tmpDir}/PinotServer/segmentTar
pinot.server.query.executor.timeout
Timeout for Server to process Query in Milliseconds
15000
A TABLE in regular database world is represented as <TABLE>_OFFLINE and/or <TABLE>_REALTIME in Pinot depending on the ingestion mode (batch, real-time, hybrid)
See examples
for all possible batch/streaming tables.
See Batch Tables for table configuration details and how to customize it.
Sample Console Output
By default, the inverted index type is the only type of index that isn't created automatically during segment generation. Instead, they are generated when the segments are loaded on the server. But, waiting to build indexes until load time increases the startup time and takes up resources with every new segment push, which increases the time for other operations such as rebalance.
To automatically create an inverted index during segment generation, add an entry to your table index config in the table configuration file.
This setting works with batch (offline) tables.
When set to true
, Pinot creates an inverted index for the columns that you specify in the invertedIndexColumns
list in the table configuration.
This setting is false
by default.
Set createInvertedIndexDuringSegmentGeneration
to true
in your table config, as follows:
When you update this setting in your table configuration, you must reload the table segment to apply the inverted index to all existing segments.
See Streaming Tables for table configuration details and how to customize it.
Start Kafka
Create a Kafka Topic
Create a Streaming table
Sample output
Start Kafka-Zookeeper
Start Kafka
Create stream table
sortedColumn
with streaming tablesFor streaming tables, you can use a sorted index with sortedColumn
to sort data when generating segments as the segment is created. See Real-time tables for more information.
A sorted forward index can be used as an inverted index with better performance, but with the limitation that the search is only applied to one column per table. See Sorted inverted index to learn more.
Now that the table is configured, let's load some data. Data can be loaded in batch mode or streaming mode. See ingestion overview page for details. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster.
User can always generate and push segments to Pinot via standalone scripts or using frameworks such as Hadoop or Spark. See this page for more details on setting up Data Ingestion Jobs.
Below example goes with the standalone mode.
Sample Console Output
JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. See Pinot Ingestion Job for more details.
Run below command to stream JSON data into Kafka topic: flights-realtime
Run below command to stream JSON data into Kafka topic: flights-realtime