Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Apache Pinot™ uses a variety of terms which can refer to either abstractions that model the storage of data or infrastructure components that drive the functionality of the system, including:
As opposed to RDBMS schemas, multiple tables can be created in Pinot (real-time or batch) that inherit a single schema definition. Tables are independently configured for concerns such as indexing strategies, partitioning, tenants, data sources, and replication.
Pinot schemas are defined in a JSON file. Because that schema definition is in its own file, multiple tables can share a single schema. Each table can have a unique name, indexing strategy, partitioning, data sources, and other metadata.
Pinot table types include:
real-time: Ingests data from a streaming source like Apache Kafka®
offline: Loads data from a batch source
hybrid: Loads data from both a batch source and a streaming source
By default, all tables, brokers, and servers belong to a tenant called DefaultTenant, but you can configure multiple tenants in a Pinot cluster.
A Pinot cluster consists of the following processes, which are typically deployed on separate hardware resources in production. In development, they can fit comfortably into Docker containers on a typical laptop.
Controller: Maintains cluster metadata and manages cluster resources.
Zookeeper: Manages the Pinot cluster on behalf of the controller. Provides fault-tolerant, persistent storage of metadata, including table configurations, schemas, segment metadata, and cluster state.
Broker: Accepts queries from client processes and forwards them to servers for processing.
Server: Provides storage for segment files and compute for query processing.
(Optional) Minion: Computes background tasks other than query processing, minimizing impact on query latency. Optimizes segments, and builds additional indexes to ensure performance (even if data is deleted).
The simplest possible Pinot cluster consists of four components: a server, a broker, a controller, and a Zookeeper node. In production environments, these components typically run on separate server instances, and scale out as needed for data volume, load, availability, and latency. Pinot clusters in production range from fewer than ten total instances to more than 1,000.
Helix is a cluster management solution created by the authors of Pinot. Helix maintains a persistent, fault-tolerant map of the intended state of the Pinot cluster. It constantly monitors the cluster to ensure that the right hardware resources are allocated to implement the present configuration. When the configuration changes, Helix schedules or decommissions hardware resources to reflect the new configuration. When elements of the cluster change state catastrophically, Helix schedules hardware resources to keep the actual cluster consistent with the ideal represented in the metadata. From a physical perspective, Helix takes the form of a controller process plus agents running on servers and brokers.
A real-time and offline server have very different resource usage requirements, where real-time servers are continually consuming new messages from external systems (such as Kafka topics) that are ingested and allocated on segments of a tenant. Because of this, resource isolation can be used to prioritize high-throughput real-time data streams that are ingested and then made available for query through a broker.
A production Pinot cluster contains many brokers. In general, the more brokers, the more concurrent queries a cluster can process, and the lower latency it can deliver on queries.
Pinot minion is an optional component that can be used to run background tasks such as "purge" for GDPR (General Data Protection Regulation). As Pinot is an immutable aggregate store, records containing sensitive private data need to be purged on a request-by-request basis. Minion provides a solution for this purpose that complies with GDPR while optimizing Pinot segments and building additional indices that guarantees performance in the presence of the possibility of data deletion. One can also write a custom task that runs on a periodic basis. While it's possible to perform these tasks on the Pinot servers directly, having a separate process (Minion) lessens the overall degradation of query latency as segments are impacted by mutable writes.
Minions isolate the computational burden of out-of-band data processing from the servers. Although a Pinot cluster can function with or without minions, they are typically present to support routine tasks like batch data ingest.
\
to store data
to partition data
to isolate data
to manage data
Pinot has a distributed systems architecture that scales horizontally. Pinot expects the size of a table to grow infinitely over time. To achieve this, all data needs to be distributed across multiple nodes. Pinot achieves this by breaking data into smaller chunks known as (similar to shards/partitions in HA relational databases). Segments can also be seen as time-based partitions.
Similar to traditional databases, Pinot has the concept of a —a logical abstraction to refer to a collection of related data. As is the case with relational database management systems (RDBMS), a table is a construct that consists of columns and rows (documents) that are queried using SQL. A table is associated with a , which defines the columns in a table as well as their data types.
Pinot stores data in . A Pinot table is conceptually identical to a relational database table with rows and columns. Columns have the same name and data type, known as the table's .
Pinot tables are stored in one or more independent shards called . A small table may be contained by a single segment, but Pinot lets tables grow to an unlimited number of segments. There are different processes for creating segments (see ). Segments have time-based partitions of table data, and are stored on Pinot that scale horizontally as needed for both storage and computation.
To support multi-tenancy, Pinot has first class support for tenants. A table is associated with a . This allows all tables belonging to a particular logical namespace to be grouped under a single tenant name and isolated from other tenants. This isolation between tenants provides different namespaces for applications and teams to prevent sharing tables or schemas. Development teams building applications do not have to operate an independent deployment of Pinot. An organization can operate a single cluster and scale it out as new tenants increase the overall volume of queries. Developers can manage their own schemas and tables without being impacted by any other tenant on a cluster.
Every table is associated with a , or a logical namespace that restricts where the cluster processes queries on the table. A Pinot tenant takes the form of a text tag in the logical tenant namespace. Physical cluster hardware resources (i.e., and ) are also associated with a tenant tag in the common tenant namespace. Tables of a particular tenant tag will only be scheduled for storage and query processing on hardware resources that belong to the same tenant tag. This lets Pinot cluster operators assign specified workloads to certain hardware resources, preventing data from separate workloads from being stored or processed on the same physical hardware.
A Pinot is a collection of the software processes and hardware resources required to ingest, store, and process data. For detail about Pinot cluster components, see .
Pinot uses as a distributed metadata store and and for cluster management.
A is the core orchestrator that drives the consistency and routing in a Pinot cluster. Controllers are horizontally scaled as an independent component (container) and has visibility of the state of all other components in a cluster. The controller reacts and responds to state changes in the system and schedules the allocation of resources for tables, segments, or nodes. As mentioned earlier, Helix is embedded within the controller as an agent that is a participant responsible for observing and driving state changes that are subscribed to by other components.
The Pinot schedules and re-schedules resources in a Pinot cluster when metadata changes or a node fails. As an Apache Helix Controller, it schedules the resources that comprise the cluster and orchestrates connections between certain external processes and cluster components (e.g., ingest of and ). It can be deployed as a single process on its own server or as a group of redundant servers in an active/passive configuration.
The controller exposes a for cluster-wide administrative operations as well as a web-based query console to execute interactive SQL queries and perform simple administrative tasks.
host segments (shards) that are scheduled and allocated across multiple nodes and routed on an assignment to a tenant (there is a single tenant by default). Servers are independent containers that scale horizontally and are notified by Helix through state changes driven by the controller. A server can either be a real-time server or an offline server.
Pinot take query requests from client processes, scatter them to applicable servers, gather the results, and return them to the client. The controller shares cluster metadata with the brokers that allows the brokers to create a plan for executing the query involving a minimal subset of servers with the source data and, when required, other servers to shuffle and consolidate results.
A Pinot is an optional cluster component that executes background tasks on table data apart from the query processes performed by brokers and servers. Minions run on independent hardware resources, and are responsible for executing minion tasks as directed by the controller. Examples of minon tasks include converting batch data from a standard format like Avro or JSON into segment files to be loaded into an offline table, and rewriting existing segment files to purge records as required by data privacy laws like GDPR. Minion tasks can run once or be scheduled to run periodically.
Explore the fundamental concepts of Apache Pinot™ as a distributed OLAP database.
Apache Pinot™ is a database designed to deliver highly concurrent, ultra-low-latency queries on large datasets through a set of common data model abstractions. Delivering on these goals requires several foundational architectural commitments, including:
Storing data in columnar form to support high-performance scanning
Sharding of data to scale both storage and computation
A distributed architecture designed to scale capacity linearly
A tabular data model read by SQL queries
To learn about Pinot components, terminology, and gain a conceptual understanding of how data is stored in Pinot, review the following sections:
Uncover the efficient data processing and storage capabilities of Apache Pinot's server component, optimizing performance for data-driven applications.
Servers are typically segregated into real-time and offline workloads, with "real-time" servers hosting only real-time tables, and "offline" servers hosting only offline tables. This is a ubiquitous operational convention, not a difference or an explicit configuration in the server process itself. There are two types of servers:
Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.
Real-time servers directly ingest from a real-time stream (such as Kafka or EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.
Pinot servers are modeled as Helix participants, hosting Pinot tables (referred to as resources in Helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more Helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).
Pinot servers provide the primary storage for and perform the computation required to execute queries. A production Pinot cluster contains many servers. In general, the more servers, the more data the cluster can retain in tables, the lower latency the cluster can deliver on queries, and the more concurrent queries the cluster can process.
Make sure you've . If you're using Docker, make sure to . To start a server:
Discover the tenant component of Apache Pinot, which facilitates efficient data isolation and resource management within Pinot clusters.
By default, all tables, brokers, and servers belong to a tenant called DefaultTenant, but you can configure multiple tenants in a Pinot cluster.
To support multi-tenancy, Pinot has first-class support for tenants. Every table is associated with a server tenant and a broker tenant, which controls the nodes used by the table as servers and brokers. Multi-tenancy lets Pinot group all tables belonging to a particular use case under a single tenant name.
The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A
and Table B
in the same Pinot cluster.
We can configure Table A
with server tenant Tenant A
and Table B
with server tenant Tenant B
. We can tag some of the server nodes for Tenant A
and some for Tenant B
. This will ensure that segments of Table A
only reside on servers tagged with Tenant A
, and segment of Table B
only reside on servers tagged with Tenant B
. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
No need to create separate clusters for every table or use case!
This section contains two main fields broker
and server
, which decide the tenants used for the broker and server components of this table.
In the above example:
The table will be served by brokers that have been tagged as brokerTenantName_BROKER
in Helix.
If this were an offline table, the offline segments for the table will be hosted in Pinot servers tagged in Helix as serverTenantName_OFFLINE
If this were a real-time table, the real-time segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in Helix as serverTenantName_REALTIME
.
Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant
by tagging three untagged broker nodes as sampleBrokerTenant_BROKER
.
To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances
.
Here's a sample server tenant config. This will create a server tenant sampleServerTenant
by tagging 1 untagged server node as sampleServerTenant_OFFLINE
and 1 untagged server node as sampleServerTenant_REALTIME
.
To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances
+ realtimeInstances
.
Discover how Apache Pinot's broker component optimizes query processing, data retrieval, and enhances data-driven applications.
Pinot brokers take query requests from client processes, scatter them to applicable servers, gather the results, and return results to the client. The controller shares cluster metadata with the brokers, which allows the brokers to create a plan for executing the query involving a minimal subset of servers with the source data and, when required, other servers to shuffle and consolidate results.
A production Pinot cluster contains many brokers. In general, the more brokers, the more concurrent queries a cluster can process, and the lower latency it can deliver on queries.
Pinot brokers are modeled as Helix spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried.
The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed.
Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.
In the case of hybrid tables, the brokers ensure that the overlap between real-time and offline segment data is queried exactly once, by performing offline and real-time federation.
Let's take this example, we have real-time data for five days - March 23 to March 27, and offline data has been pushed until Mar 25, which is two days behind real-time. The brokers maintain this time boundary.
Suppose, we get a query to this table : select sum(metric) from table
. The broker will split the query into 2 queries based on this time boundary – one for offline and one for real-time. This query becomes select sum(metric) from table_REALTIME where date >= Mar 25
and select sum(metric) from table_OFFLINE where date < Mar 25
The broker merges results from both these queries before returning the result to the client.
Discover the controller component of Apache Pinot, enabling efficient data and query management.
The Pinot controller is responsible for the following:
Maintaining global metadata (e.g., configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.
Hosting the Helix Controller and managing other Pinot components (brokers, servers, minions)
Maintaining the mapping of which servers are responsible for which segments. This mapping is used by the servers to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.
Serving admin endpoints for viewing, creating, updating, and deleting configs, which are used to manage and operate the cluster.
Serving endpoints for segment uploads, which are used in offline data pushes. They are responsible for initializing real-time consumption and coordination of persisting real-time segments into the segment store periodically.
Undertaking other management activities such as managing retention of segments, validations.
Use the GET /periodictask/names
API to fetch the names of all the periodic tasks running on your Pinot cluster.
To manually run a named periodic task, use the GET /periodictask/run
API:
The Log Request Id
(api-09630c07
) can be used to search through pinot-controller log file to see log entries related to execution of the Periodic task that was manually run.
If tableName
(and its type OFFLINE
or REALTIME
) is not provided, the task will run against all tables.
Every table is associated with a tenant, or a logical namespace that restricts where the cluster processes queries on the table. A Pinot tenant takes the form of a text tag in the logical tenant namespace. Physical cluster hardware resources (i.e., and ) are also associated with a tenant tag in the common tenant namespace. Tables of a particular tenant tag will only be scheduled for storage and query processing on hardware resources that belong to the same tenant tag. This lets Pinot cluster operators assign specified workloads to certain hardware resources, preventing data in separate workloads from being stored or processed on the same physical hardware.
This tenant is defined in the section of the table config.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Make sure you've . If you're using Docker, make sure to . To start a broker:
The Pinot controller schedules and reschedules resources in a Pinot cluster when metadata changes or a node fails. As an Apache Helix Controller, the Pinot controller schedules the resources that comprise the cluster and orchestrates connections between certain external processes and cluster components (for example, ingest of and ). The Pinot controller can be deployed as a single process on its own server or as a group of redundant servers in an active/passive configuration.
The controller exposes a for cluster-wide administrative operations as well as a web-based query console to execute interactive SQL queries and perform simple administrative tasks.
For redundancy, there can be multiple instances of Pinot controllers. Pinot expects that all controllers are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .
The controller runs several periodic tasks in the background, to perform activities such as management and validation. Each periodic task has to define the run frequency and default frequency. Each task runs at its own schedule or can also be triggered manually if needed. The task runs on the lead controller for each table.
For period task configuration details, see .
Make sure you've . If you're using Docker, make sure to . To start a controller:
Explore the table component in Apache Pinot, a fundamental building block for organizing and managing data in Pinot clusters, enabling effective data processing and analysis.
Pinot schemas are defined in a JSON file. Because that schema definition is in its own file, multiple tables can share a single schema. Each table can have a unique name, indexing strategy, partitioning, data sources, and other metadata.
Pinot table types include:
real-time: Ingests data from a streaming source like Apache Kafka®
offline: Loads data from a batch source
hybrid: Loads data from both a batch source and a streaming source
Pinot supports the following types of tables:
Offline
Offline tables ingest pre-built Pinot segments from external data stores and are generally used for batch ingestion.
Real-time
Real-time tables ingest data from streams (such as Kafka) and build segments from the consumed data.
Hybrid
Hybrid Pinot tables have both real-time as well as offline tables under the hood. By default, all tables in Pinot are hybrid.
Use the following properties to make your tables faster or leaner:
Segment
Indexing
Tenants
For real-time tables, segments are built in a specific interval inside Pinot. You can tune the following for the real-time segments.
The Pinot real-time consumer ingests the data, creates the segment, and then flushes the in-memory segment to disk. Pinot allows you to configure when to flush the segment in the following ways:
Number of consumed rows: After consuming the specified number of rows from the stream, Pinot will persist the segment to disk.
Number of rows per segment: Pinot learns and then estimates the number of rows that need to be consumed. The learning phase starts by setting the number of rows to 100,000 (this value can be changed) and adjusts it to reach the appropriate segment size. Because Pinot corrects the estimate as it goes along, the segment size might go significantly over the correct size during the learning phase. You should set this value to optimize the performance of queries.
Max time duration to wait: Pinot consumers wait for the configured time duration after which segments are persisted to the disk.
However, in certain scenarios, the segment build can get very memory-intensive. In these cases, you might want to enforce the non-committer servers to just download the segment from the controller instead of building it again. You can do this by setting completionMode: "DOWNLOAD"
in the table configuration.
Download Scheme
A Pinot server might fail to download segments from the deep store, such as HDFS, after its completion. However, you can configure servers to download these segments from peer servers instead of the deep store. Currently, only HTTP and HTTPS download schemes are supported. More methods, such as gRPC/Thrift, are planned be added in the future.
You can create multiple indices on a table to increase the performance of the queries. The following types of indices are supported:
Dictionary-encoded forward index with bit compression
Raw value forward index
Sorted forward index with run-length encoding
Bitmap inverted index
Sorted inverted index
Aggregate the real-time stream data as it is consumed to reduce segment sizes. We add the metric column values of all rows that have the same values for all dimension and time columns and create a single row in the segment. This feature is only available on REALTIME
tables.
The only supported aggregation is SUM
. The columns to pre-aggregate need to satisfy the following requirements:
All metrics should be listed in noDictionaryColumns
.
No multi-value dimensions
All dimension columns are treated to have a dictionary, even if they appear as noDictionaryColumns
in the config.
The following table config snippet shows an example of enabling pre-aggregation during real-time ingestion:
Optionally, override if a table should move to a server with different tenant based on segment status. The example below adds a tagOverrideConfig
under the tenants
section for real-time tables to override tags for consuming and completed segments.
In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME
hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE
.
A hybrid table is a table composed of two tables, one offline and one real-time, that share the same name. In a hybrid table, offline segments can be pushed periodically. The retention on the offline table can be set to a high value because segments are coming in on a periodic basis, whereas the retention on the real-time part can be small.
Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments for that time period and use the real-time table only for data not available in the offline table.
A typical use case for hybrid tables is pushing deduplicated, cleaned-up data into an offline table every day while consuming real-time data as it arrives. Data can remain in offline tables for as long as a few years, while the real-time data would be cleaned every few days.
Prerequisites
Sample console output
Start Kafka
Create a Kafka topic
Create a streaming table
Sample output
Start Kafka-Zookeeper
Start Kafka
Create stream table
To create a hybrid table, you have to create the offline and real-time tables individually. You don't need to create a separate hybrid table.
Leverage Apache Pinot's deep store component for efficient large-scale data storage and management, enabling impactful data processing and analysis.
Note: Deep store by itself is not sufficient for restore operations. Pinot stores metadata such as table config, schema, segment metadata in Zookeeper. For restore operations, both Deep Store as well as Zookeeper metadata are required.
There are several different ways that segments are persisted in the deep store.
For offline tables, the batch ingestion job writes the segment directly into the deep store, as shown in the diagram below:
The ingestion job then sends a notification about the new segment to the controller, which in turn notifies the appropriate server to pull down that segment.
For real-time tables, by default, a segment is first built-in memory by the server. It is then uploaded to the lead controller (as part of the Segment Completion Protocol sequence), which writes the segment into the deep store, as shown in the diagram below:
When using this configuration, the server will directly write a completed segment to the deep store, as shown in the diagram below:
For hands-on examples of how to configure the deep store, see the following tutorials:
Explore the minion component in Apache Pinot, empowering efficient data movement and segment generation within Pinot clusters.
A Pinot minion is an optional cluster component that executes background tasks on table data apart from the query processes performed by brokers and servers. Minions run on independent hardware resources, and are responsible for executing minion tasks as directed by the controller. Examples of minon tasks include converting batch data from a standard format like Avro or JSON into segment files to be loaded into an offline table, and rewriting existing segment files to purge records as required by data privacy laws like GDPR. Minion tasks can run once or be scheduled to run periodically.
Minions isolate the computational burden of out-of-band data processing from the servers. Although a Pinot cluster can function with or without minions, they are typically present to support routine tasks like batch data ingest.
The Pinot task generator interface defines the APIs for the controller to generate tasks for minions to execute.
Factory for PinotTaskExecutor
which defines the APIs for Minion to execute the tasks.
Factory for MinionEventObserver
which defines the APIs for task event callbacks on minion.
The PushTask can fetch files from an input folder e.g. from a S3 bucket and converts them into segments. The PushTask converts one file into one segment and keeps file name in segment metadata to avoid duplicate ingestion. Below is an example task config to put in TableConfig to enable this task. The task is scheduled every 10min to keep ingesting remaining files, with 10 parallel task at max and 1 file per task.
NOTE: You may want to simply omit "tableMaxNumTasks" due to this caveat: the task generates one segment per file, and derives segment name based on the time column of the file. If two files happen to have same time range and are ingested by tasks from different schedules, there might be segment name conflict. To overcome this issue for now, you can omit “tableMaxNumTasks” and by default it’s Integer.MAX_VALUE, meaning to schedule as many tasks as possible to ingest all input files in a single batch. Within one batch, a sequence number suffix is used to ensure no segment name conflict. Because the sequence number suffix is scoped within one batch, tasks from different batches might encounter segment name conflict issue said above.
Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask
) on a table, update the table config to include the task type:
Under each enable task type, custom properties can be configured for the task type.
There are also two task configs to be set as part of cluster configs like below. One controls task's overall timeout (1hr by default) and one for how many tasks to run on a single minion worker (1 by default).
There are 2 ways to enable task scheduling:
Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyPeriod
. This takes period strings as values, e.g. 2h, 30m, 1d.
Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule
config for each task type separately. This config in the controller config, controller.task.scheduler.enabled
should be set to true
to enable cron scheduling.
Tasks can be manually scheduled using the following controller rest APIs:
Tasks can be scheduled on specific instances using the following config at task level:
By default, the value is minion_untagged
to have backward-compatibility. This will allow users to schedule tasks on specific nodes and isolate tasks among tables / task-types.
When a task is executed on a segment, the minion node fetches the segment from deepstore. If the deepstore is not accessible, the minion node can download the segment from the server node. This is controlled by the allowDownloadFromServer
config in the task config. By default, this is set to false
.
We can also set this config at a minion instance level pinot.minion.task.allow.download.from.server
(default is false
). This instance level config helps in enforcing this behaviour if the number of tables / tasks is pretty high and we want to enable for all. Note: task-level config will override instance-level config value.
To plug in a custom task, implement PinotTaskGenerator
, PinotTaskExecutorFactory
and MinionEventObserverFactory
(optional) for the task type (all of them should return the same string for getTaskType()
), and annotate them with the following annotations:
After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*
, then they will be auto-registered by the controller and minion.
In the Pinot UI, there is Minion Task Manager tab under Cluster Manager page. From that minion task manager tab, one can find a lot of task related info for troubleshooting. Those info are mainly collected from the Pinot controller that schedules tasks or Helix that tracks task runtime status. There are also buttons to schedule tasks in an ad hoc way. Below are some brief introductions to some pages under the minion task manager tab.
This one shows which types of Minion Task have been used. Essentially which task types have created their task queues in Helix.
Clicking into a task type, one can see the tables using that task. And a few buttons to stop the task queue, cleaning up ended tasks etc.
Then clicking into any table in this list, one can see how the task is configured for that table. And the task metadata if there is one in ZK. For example, MergeRollupTask tracks a watermark in ZK. If the task is cron scheduled, the current and next schedules are also shown in this page like below.
At the bottom of this page is a list of tasks generated for this table for this specific task type. Like here, one MergeRollup task has been generated and completed.
Clicking into a task from that list, we can see start/end time for it, and the subtasks generated for that task (as context, one minion task can have multiple subtasks to process data in parallel). In this example, it happened to have one sub-task here, and it shows when it starts and stops and which minion worker it's running.
Clicking into this subtask, one can see more details about it like the input task configs and error info if the task failed.
There is a controller job that runs every 5 minutes by default and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:
NumMinionTasksInProgress: Number of running tasks
NumMinionSubtasksRunning: Number of running sub-tasks
NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)
NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)
PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states
PercentMinionSubtasksInError: Percent of sub-tasks in error
The controller also emits metrics about how tasks are cron scheduled:
cronSchedulerJobScheduled: Number of current cron schedules registered to be triggered regularly according their cron expressions. It's a Gauge.
cronSchedulerJobTrigger: Number of cron scheduled triggered, as a Meter.
cronSchedulerJobSkipped: Number of late cron scheduled skipped, as a Meter.
cronSchedulerJobExecutionTimeMs: Time used to complete task generation, as a Timer.
For each task, the minion will emit these metrics:
TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative
TASK_EXECUTION: Task execution time, which is the time spent on executing the task
NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1
NUMBER_TASKS_EXECUTED: Number of tasks executed, as a Meter.
NUMBER_TASKS_COMPLETED: Number of tasks completed, as a Meter.
NUMBER_TASKS_CANCELLED: Number of tasks cancelled, as a Meter.
NUMBER_TASKS_FAILED: Number of tasks failed, as a Meter. Different from fatal failure, the task encountered an error which can not be recovered from this run, but it may still succeed by retrying the task.
NUMBER_TASKS_FATAL_FAILED: Number of tasks fatal failed, as a Meter. Different from failure, the task encountered an error, which will not be recoverable even with retrying the task.
Pinot stores data in tables. A Pinot table is conceptually identical to a relational database table with rows and columns. Columns have the same name and data type, known as the table's .
Pinot breaks a table into multiple and stores these segments in a deep-store such as Hadoop Distributed File System (HDFS) as well as Pinot servers.
In the Pinot cluster, a table is modeled as a and each segment of a table is modeled as a .
is used to define the table properties, such as name, type, indexing, routing, and retention. It is written in JSON format and is stored in Zookeeper, along with the table schema.
A table is comprised of small chunks of data known as segments. Learn more about how Pinot creates and manages segments .
For offline tables, segments are built outside of Pinot and uploaded using a distributed executor such as Spark or Hadoop. For details, see .
Replicas A segment can have multiple replicas to provide higher availability. You can configure the number of replicas for a table segment .
Completion Mode By default, if the in-memory segment in the is equivalent to the committed segment, then the non-winner server builds and replaces the segment. If the available segment is not equivalent to the committed segment, the server just downloads the committed segment from the controller.
For details, see .
For more details about peer segment download during real-time ingestion, refer to this design doc on
For more details on each indexing mechanism and corresponding configurations, see .
Set up on columns to make queries faster. You can also keep segments in off-heap instead of on-heap memory for faster queries.
Each table is associated with a tenant. A segment resides on the server, which has the same tenant as itself. For details, see .
You can specify the full name of any tag in this section. For example, you could decide that completed segments for this table should be in Pinot servers tagged as allTables_COMPLETED
). To learn more about, see the section.
To learn how time boundaries work for hybrid tables, see .
Create a table config for your data, or see for all possible batch/streaming tables.
Check out the table config in the to make sure it was successfully uploaded.
Check out the table config in the to make sure it was successfully uploaded.
The deep store (or deep storage) is the permanent store for files.
It is used for backup and restore operations. New nodes in a cluster will pull down a copy of segment files from the deep store. If the local segment files on a server gets damaged in some way (or accidentally deleted), a new copy will be pulled down from the deep store on server restart.
The deep store stores a compressed version of the segment files and it typically won't include any indexes. These compressed files can be stored on a local file system or on a variety of other file systems. For more details on supported file systems, see .
Having all segments go through the controller can become a system bottleneck under heavy load, in which case you can use the peer download policy, as described in .
Make sure you've . If you're using Docker, make sure to . To start a minion:
See for details.
See for details.
As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax ).
See where the TestTask
is plugged-in.
POST /tasks/schedule
Schedule tasks for all task types on all enabled tables
POST /tasks/schedule?taskType=myTask
Schedule tasks for the given task type on all enabled tables
POST /tasks/schedule?tableName=myTable_OFFLINE
Schedule tasks for all task types on the given table
POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE
Schedule tasks for the given task type on the given table
POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE&minionInstanceTag=tag1_MINION
Schedule tasks for the given task type of the given table on the minion nodes tagged as tag1_MINION.
PinotTaskGenerator
@TaskGenerator
PinotTaskExecutorFactory
@TaskExecutorFactory
MinionEventObserverFactory
@EventObserverFactory
This quick start guide will help you bootstrap a Pinot standalone instance on your local machine.
In this guide, you'll learn how to download and install Apache Pinot as a standalone instance.
First, download the Pinot distribution for this tutorial. You can either download a packaged release or build a distribution from the source code.
Install with JDK 11 or 21. JDK 17 should work, but it is not officially supported.
For JDK 8 support, Pinot 0.12.1 is the last version compilable from the source code.
Pinot 1.0+ doesn't support JDK 8 anymore, build with JDK 11+
Note that some installations of the JDK do not contain the JNI bindings necessary to run all tests. If you see an error like java.lang.UnsatisfiedLinkError
while running tests, you might need to change your JDK.
Download the distribution or build from source by selecting one of the following tabs:
Now that we've downloaded Pinot, it's time to set up a cluster. There are two ways to do this: through quick start or through setting up a cluster manually.
Pinot comes with quick start commands that launch instances of Pinot components in the same process and import pre-built datasets.
For example, the following quick start command launches Pinot with a baseball dataset pre-loaded:
If you want to play with bigger datasets (more than a few megabytes), you can launch each component individually.
The video below is a step-by-step walk through for launching the individual components of Pinot and scaling them to multiple instances.
Users could start and customize the cluster by modifying the config files and start the components with config files:
Set break points and inspect variables by starting a Pinot component with debug mode in IntelliJ.
The following example demonstrates server debugging:
Discover the segment component in Apache Pinot for efficient data storage and querying within Pinot clusters, enabling optimized data processing and analysis.
Pinot achieves this by breaking the data into smaller chunks known as segments (similar to shards/partitions in relational databases). Segments can be seen as time-based partitions.
A segment is a horizontal shard representing a chunk of table data with some number of rows. The segment stores data for all columns of the table. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. The segment is laid out in a columnar format so that it can be directly mapped into memory for serving queries.
Columns can be single or multi-valued and the following types are supported: STRING, BOOLEAN, INT, LONG, FLOAT, DOUBLE, TIMESTAMP or BYTES. Only single-valued BIG_DECIMAL data type is supported.
Columns may be declared to be metric or dimension (or specifically as a time dimension) in the schema. Columns can have default null values. For example, the default null value of a integer column can be 0. The default value for bytes columns must be hex-encoded before it's added to the schema.
Pinot uses dictionary encoding to store values as a dictionary ID. Columns may be configured to be “no-dictionary” column in which case raw values are stored. Dictionary IDs are encoded using minimum number of bits for efficient storage (e.g. a column with a cardinality of 3 will use only 2 bits for each dictionary ID).
To create and push the segment in one go, use the following:
Sample Console Output
Alternately, you can separately create and then push, by changing the jobType to SegmentCreation
or SegmenTarPush
.
The Ingestion job spec supports templating with Groovy Syntax.
This is convenient if you want to generate one ingestion job template file and schedule it on a daily basis with extra parameters updated daily.
e.g. you could set inputDirURI
with parameters to indicate the date, so that the ingestion job only processes the data for a particular date. Below is an example that templates the date for input and output directories.
You can pass in arguments containing values for ${year}, ${month}, ${day}
when kicking off the ingestion job: -values $param=value1 $param2=value2
...
This ingestion job only generates segments for date 2014-01-03
Prerequisites
Below is an example of how to publish sample data to your stream. As soon as data is available to the real-time stream, it starts getting consumed by the real-time servers.
Run below command to stream JSON data into Kafka topic: flights-realtime
Run below command to stream JSON data into Kafka topic: flights-realtime
Learn to build and manage Apache Pinot clusters, uncovering key components for efficient data processing and optimized analysis.
A Pinot cluster consists of the following processes, which are typically deployed on separate hardware resources in production. In development, they can fit comfortably into Docker containers on a typical laptop:
Controller: Maintains cluster metadata and manages cluster resources.
Zookeeper: Manages the Pinot cluster on behalf of the controller. Provides fault-tolerant, persistent storage of metadata, including table configurations, schemas, segment metadata, and cluster state.
Broker: Accepts queries from client processes and forwards them to servers for processing.
Server: Provides storage for segment files and compute for query processing.
(Optional) Minion: Computes background tasks other than query processing, minimizing impact on query latency. Optimizes segments, and builds additional indexes to ensure performance (even if data is deleted).
The simplest possible Pinot cluster consists of four components: a server, a broker, a controller, and a Zookeeper node. In production environments, these components typically run on separate server instances, and scale out as needed for data volume, load, availability, and latency. Pinot clusters in production range from fewer than ten total instances to more than 1,000.
Helix is a cluster management solution that maintains a persistent, fault-tolerant map of the intended state of the Pinot cluster. Helix constantly monitors the cluster to ensure that the right hardware resources are allocated for the present configuration. When the configuration changes, Helix schedules or decommissions hardware resources to reflect the new configuration. When elements of the cluster change state catastrophically, Helix schedules hardware resources to keep the actual cluster consistent with the ideal represented in the metadata. From a physical perspective, Helix takes the form of a controller process plus agents running on servers and brokers.
Helix divides nodes into logical components based on their responsibilities:
Participants are the nodes that host distributed, partitioned resources
Spectators are the nodes that observe the current state of each participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.
Another way to visualize the cluster is a logical view, where:
To set up a cluster, see one of the following guides:
In this Apache Pinot concepts guide, we'll learn how segment retention works.
Segments in Pinot tables have a retention time, after which the segments are deleted. Typically, offline tables retain segments for a longer period of time than real-time tables.
The removal of segments is done by the retention manager. By default, the retention manager runs once every 6 hours.
The retention manager purges two types of segments:
Expired segments: Segments whose end time has exceeded the retention period.
There are a couple of scenarios where segments in offline tables won't be purged:
If the segment doesn't have an end time. This would happen if the segment doesn't contain a time column.
If the segment's table has a segmentIngestionType
of REFRESH
.
If the retention period isn't specified, segments aren't purged from tables.
The retention manager initially moves these segments into a Deleted Segments area, from where they will eventually be permanently removed.
Download the latest binary release from , or use this command:
Extract the TAR file:
Navigate to the directory containing the launcher scripts:
You can also find older versions of Apache Pinot at . For example, to download Pinot 0.10.0, run the following command:
Follow these steps to checkout code from and build Pinot locally
Check out Pinot:
Build Pinot:
Navigate to the directory containing the setup scripts. Note that Pinot scripts are located under pinot-distribution/target
, not the target
directory under root
.
For a list of all the available quick start commands, see the .
You can find the commands that are shown in this video in the .
You can use to browse the Zookeeper instance.
Once your cluster is up and running, you can head over to to learn how to run queries against the data.
First, startzookeeper
, controller
, and broker
using the .
Then, use the following configuration under $PROJECT_DIR$\.run
) to start the server, replacing the metrics-core
version and cluster name as needed.
This is an example of how to use it.
Pinot tables are stored in one or more independent shards called segments. A small table may be contained by a single segment, but Pinot lets tables grow to an unlimited number of segments. There are different processes for creating segments (see ). Segments have time-based partitions of table data, and are stored on Pinot that scale horizontally as needed for both storage and computation.
A forward index is built for each column and compressed for efficient memory use. In addition, you can optionally configure inverted indices for any set of columns. Inverted indices take up more storage, but improve query performance. Specialized indexes like Star-Tree index are also supported. For more details, see .
Once the table is configured, we can load some data. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster. Data can be loaded in batch mode or streaming mode. For more details, see the page.
Below are instructions to generate and push segments to Pinot via standalone scripts. For a production setup, you should use frameworks such as Hadoop or Spark. For more details on setting up data ingestion jobs, see
To generate a segment, we need to first create a job spec YAML file. This file contains all the information regarding data format, input data location, and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location. For full configurations, see .
A Pinot cluster is a collection of the software processes and hardware resources required to ingest, store, and process data. For detail about Pinot cluster components, see .
Pinot uses as a distributed metadata store and for cluster management.
For details of cluster configuration settings, see .
Pinot servers are modeled as participants. For details about server nodes, see .
Pinot brokers are modeled as spectators. For details about broker nodes, see .
Pinot controllers are modeled as controllers. For details about controller nodes, see .
A cluster contains
Tenants contain
Tables contain
Typically, there is only one cluster per environment/data center. There is no need to create multiple Pinot clusters because Pinot supports .
Replaced segments: Segments that have been replaced as part of the
Pinot can also be installed on Mac OS using the Brew package manager. For instructions on installing Brew, see the .
Apache Pinot is a real-time distributed OLAP datastore purpose-built for low-latency, high-throughput analytics, and perfect for user-facing analytical workloads.
Apache Pinot™ is a real-time distributed online analytical processing (OLAP) datastore. Use Pinot to ingest and immediately query data from streaming or batch data sources (including, Apache Kafka, Amazon Kinesis, Hadoop HDFS, Amazon S3, Azure ADLS, and Google Cloud Storage).
Apache Pinot includes the following:
Ultra low-latency analytics even at extremely high throughput.
Columnar data store with several smart indexing and pre-aggregation techniques.
Scaling up and out with no upper bound.
Consistent performance based on the size of your cluster and an expected query per second (QPS) threshold.
It's perfect for user-facing real-time analytics and other analytical use cases, including internal dashboards, anomaly detection, and ad hoc data exploration.
User-facing analytics refers to the analytical tools exposed to the end users of your product. In a user-facing analytics application, all users receive personalized analytics on their devices, resulting in hundreds of thousands of queries per second. Queries triggered by apps may grow quickly in proportion to the number of active users on the app, as many as millions of events per second. Data generated in Pinot is immediately available for analytics in latencies under one second.
User-facing real-time analytics requires the following:
Fresh data. The system needs to be able to ingest data in real time and make it available for querying, also in real time.
Support for high-velocity, highly dimensional event data from a wide range of actions and from multiple sources.
Low latency. Queries are triggered by end users interacting with apps, resulting in hundreds of thousands of queries per second with arbitrary patterns.
Reliability and high availability.
Scalability.
Low cost to serve.
Pinot is designed to execute OLAP queries with low latency. It works well where you need fast analytics, such as aggregations, on both mutable and immutable data.
User-facing, real-time analytics
Real-time dashboards for business metrics
Enterprise business intelligence
For analysts and data scientists, Pinot works well as a highly-scalable data platform for business intelligence. Pinot converges big data platforms with the traditional role of a data warehouse, making it a suitable replacement for analysis and reporting.
Enterprise application development
For application developers, Pinot works well as an aggregate store that sources events from streaming data sources, such as Kafka, and makes it available for a query using SQL. You can also use Pinot to aggregate data across a microservice architecture into one easily queryable view of the domain.
If you're new to Pinot, take a look at our Getting Started guide:
To start importing data into Pinot, see how to import batch and stream data:
To start querying data in Pinot, check out our Query guide:
For a conceptual overview that explains how Pinot works, check out the Concepts guide:
To understand the distributed systems architecture that explains Pinot's operating model, take a look at our basic architecture section:
Understand how the components of Apache Pinot™ work together to create a scalable OLAP database that can deliver low-latency, high-concurrency queries at scale.
Apache Pinot™ is a distributed OLAP database designed to serve real-time, user-facing use cases, which means handling large volumes of data and many concurrent queries with very low query latencies. Pinot supports the following requirements:
Ultra low-latency queries (as low as 10ms P95)
High query concurrency (as many as 100,000 queries per second)
High data freshness (streaming data available for query immediately upon ingestion)
Large data volume (up to petabytes)
To accommodate large data volumes with stringent latency and concurrency requirements, Pinot is designed as a distributed database that supports the following requirements:
Highly available: Pinot has no single point of failure. When tables are configured for replication, and a node goes down, the cluster is able to continue processing queries.
Immutable data: Pinot assumes all stored data is immutable, which helps simplify the parts of the system that handle data storage and replication. However, Pinot still supports upserts on streaming entity data and background purges of data to comply with data privacy regulations.
Dynamic configuration changes: Operations like adding new tables, expanding a cluster, ingesting data, modifying an existing table, and adding indexes do not impact query availability or performance.
Helix maintains a picture of the intended state of the cluster, including the number of servers and brokers, the configuration and schema of all tables, connections to streaming ingest sources, currently executing batch ingestion jobs, the assignment of table segments to the servers in the cluster, and more. All of these configuration items are potentially mutable quantities, since operators routinely change table schemas, add or remove streaming ingest sources, begin new batch ingestion jobs, and so on. Additionally, physical cluster state may change as servers and brokers fail or suffer network partition. Helix works constantly to drive the actual state of the cluster to match the intended state, pushing configuration changes to brokers and servers as needed.
There are three physical node types in a Helix cluster:
Participant: These nodes do things, like store data or perform computation. Participants host resources, which are Helix's fundamental storage abstraction. Because Pinot servers store segment data, they are participants.
Spectator: These nodes see things, observing the evolving state of the participants through events pushed to the spectator. Because Pinot brokers need to know which servers host which segments, they are spectators.
Controller: This node observes and manages the state of participant nodes. The controller is responsible for coordinating all state transitions in the cluster and ensures that state constraints are satisfied while maintaining cluster stability.
In addition, Helix defines two logical components to express its storage abstraction:
Partition. A unit of data storage that lives on at least one participant. Partitions may be replicated across multiple participants. A Pinot segment is a partition.
Resource. A logical collection of partitions, providing a single view over a potentially large set of data stored across a distributed system. A Pinot table is a resource.
In summary, the Pinot architecture maps onto Helix components as follows:
Segment
Helix Partition
Table
Helix Resource
Controller
Helix Controller or Helix agent that drives the overall state of the cluster
Server
Helix Participant
Broker
A Helix Spectator that observes the cluster for changes in the state of segments and servers. To support multi-tenancy, brokers are also modeled as Helix Participants.
Minion
Helix Participant that performs computation rather than storing data
Helix uses ZooKeeper to maintain cluster state. ZooKeeper sends Helix spectators notifications of changes in cluster state (which correspond to changes in ZNodes). Zookeeper stores the following information about the cluster:
Controller
Controller that is assigned as the current leader
Servers and Brokers
List of servers and brokers
Configuration of all current servers and brokers
Health status of all current servers and brokers
Tables
List of tables
Table configurations
Table schema
List of the table's segments
Segment
Exact server locations of a segment
State of each segment (online/offline/error/consuming)
Metadata about each segment
Zookeeper, as a first-class citizen of a Pinot cluster, may use the well-known ZNode
structure for operations and troubleshooting purposes. Be advised that this structure can change in future Pinot releases.
Only one controller can be active at a time, so when multiple controllers are present in a cluster, they elect a leader. When that controller instance becomes unavailable, the remaining instances automatically elect a new leader. Leader election is achieved using Apache Helix. A Pinot cluster can serve queries without an active controller, but it can't perform any metadata-modifying operations, like adding a table or consuming a new segment.
Sends the query to each of those servers for local execution against their segments.
Receives the results from each server and merges them.
Sends the query result to the client.
For multi-stage queries, the broker performs the following:
Computes a query plan that runs on multiple sets of servers. The servers selected for the first stage are selected based on the segments required to execute the query, which are determined in a process similar to single-stage queries.
Sends the relevant portions of the query plan to one or more servers in the cluster for each stage of the query plan.
The broker receives a complete result set from the final stage of the query, which is always a single server.
The broker sends the query result to the client.
Because offline tables tend to have long retention periods, offline servers tend to scale based on the size of the data they store.
Real-time servers ingest data from streaming sources, like Apache Kafka®, Apache Pulsar®, or AWS Kinesis. Streaming data ends up in conventional segment files just like batch data, but is first accumulated in an in-memory data structure known as a consuming segment. Each message consumed from a streaming source is written immediately to the relevant consuming segment, and is available for query processing from the consuming segment immediately, since consuming segments participate in query processing as first-class citizens. Consuming segments get flushed to disk periodically based on a completion threshold, which can be calculated by row count, ingestion time, or segment size. A flushed segment on a real-time table is called a completed segment, and is functionally equivalent to a segment created during offline ingest.
Real-time servers tend to be scaled based on the rate at which they ingest streaming data.
Minions isolate the computational burden of out-of-band data processing from the servers. Although a Pinot cluster can function without minions, they are typically present to support routine tasks like ingesting batch data.
The controller (in its capacity as a Helix controller) updates the ideal state of the cluster in its cluster metadata map.
The servers then download the newly created segments directly from the deep store.
The cluster's brokers, which watch for state changes as Helix spectators, detect the new segments and update their segment routing tables accordingly. The cluster is now able to query the new offline segments.
Ingestion is established at the time a real-time table is created, and continues as long as the table exists. When the controller receives the metadata update to create a new real-time table, the table configuration specifies the source of the streaming input data—often a topic in a Kafka cluster. This kicks off a process like this:
The controller picks one or more servers to act as direct consumers of the streaming input source.
The controller creates consuming segments for the new table. It does this by creating an entry in the global metadata map for a new consuming segment for each of the real-time servers selected in step 1.
Through Helix functionality on the controller and the relevant servers, the servers proceed to create consuming segments in memory and establish a connection to the streaming input source. When this input source is Kafka, each server acts as a Kafka consumer directly, with no other components involved in the integration.
Through Helix functionality on the controller and all of the cluster's brokers, the brokers become aware of the consuming segments, and begin including them in query routing immediately.
The consuming servers simultaneously begin consuming messages from the streaming input source, storing them in the consuming segment.
When a server decides its consuming segment is complete, it commits the in-memory consuming segment to a conventional segment file, uploads it to the deep store, and notifies the controller.
The controller and the server create a new consuming segment to continue real-time ingestion.
The controller marks the newly committed segment as online. Brokers then discover the new segment through the Helix notification mechanism, allowing them to route queries to it in the usual fashion.
This section describes quick start commands that launch all Pinot components in a single process.
Prerequisites
Pinot versions in examples
The Docker-based examples on this page use pinot:latest
, which instructs Docker to pull and use the most recent release of Apache Pinot. If you prefer to use a specific release instead, you can designate it by replacing latest
with the release number, like this: pinot:0.12.1
.
The local install-based examples that are run using the launcher scripts will use the Apache Pinot version you installed.
Stopping a running example
To stop a running example, enter Ctrl+C
in the same terminal where you ran the docker run
command to start the example.
macOS Monterey Users
By default the Airplay receiver server runs on port 7000, which is also the port used by the Pinot Server in the Quick Start. You may see the following error when running these examples:
If you disable the Airplay receiver server and try again, you shouldn't see this error message anymore.
This example demonstrates how to do batch processing with Pinot. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the baseballStats
table
Launches a standalone data ingestion job that builds one segment for a given CSV data file for the baseballStats
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to import and query JSON documents in Pinot. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the githubEvents
table
Launches a standalone data ingestion job that builds one segment for a given JSON data file for the githubEvents
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to do batch processing in Pinot where the the data items have complex fields that need to be unnested. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the githubEvents
table
Launches a standalone data ingestion job that builds one segment for a given JSON data file for the githubEvents
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to do stream processing with Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do stream processing with JSON documents in Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
This example demonstrates how to do stream processing in Pinot with RealtimeToOfflineSegmentsTask and MergeRollupTask minion tasks continuously optimizing segments as data gets ingested. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, Pinot Minion, and Pinot Server.
Creates githubEvents
table
Launches a GitHub events stream
Publishes data to a Kafka topic githubEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do stream processing in Pinot where the stream contains items that have complex fields that need to be unnested. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, Pinot Minion, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
This example demonstrates how to do hybrid stream and batch processing with Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates airlineStats
table
Launches a standalone data ingestion job that builds segments under a given directory of Avro files for the airlineStats
table and pushes the segments to the Pinot Controller.
Launches a stream of flights stats
Publishes data to a Kafka topic airlineStatsEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server in the same container.
Creates the baseballStats
table
Launches a data ingestion job that builds one segment for a given CSV data file for the baseballStats
table and pushes the segment to the Pinot Controller.
Creates the dimBaseballTeams
table
Launches a data ingestion job that builds one segment for a given CSV data file for the dimBaseballStats
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
Pinot quick start in Kubernetes
Get started running Pinot in Kubernetes.
This guide assumes that you already have a running Kubernetes cluster.
If you haven't yet set up a Kubernetes cluster, see the links below for instructions:
Make sure to run with enough resources: minikube start --vm=true --cpus=4 --memory=8g --disk-size=50g
Note: Specify StorageClass based on your cloud vendor. Don't mount a blob store (such as AzureFile, GoogleCloudStorage, or S3) as the data serving file system. Use only Amazon EBS/GCP Persistent Disk/Azure Disk-style disks.
For AWS: "gp2"
For GCP: "pd-ssd" or "standard"
For Azure: "AzureDisk"
For Docker-Desktop: "hostpath"
1.1.1 Update Helm dependency
1.1.2 Start Pinot with Helm
Ensure the Kafka deployment is ready before executing the scripts in the following steps. Run the following command:
Below is an example output showing the deployment is ready:
Run the scripts below to create two Kafka topics for data ingestion:
The script below does the following:
Ingests 19492 JSON messages to Kafka topic flights-realtime
at a speed of 1 msg/sec
Ingests 19492 Avro messages to Kafka topic flights-realtime-avro
at a speed of 1 msg/sec
Uploads Pinot schema airlineStats
Creates Pinot table airlineStats
to ingest data from JSON encoded Kafka topic flights-realtime
Creates Pinot table airlineStatsAvro
to ingest data from Avro encoded Kafka topic flights-realtime-avro
The following script (located at ./pinot/helm/pinot
) performs local port forwarding, and opens the Pinot query console in your default web browser.
Install the SuperSet Helm repository:
Get the Helm values configuration file:
For Superset to install Pinot dependencies, edit /tmp/superset-values.yaml
file to add apinotdb
pip dependency into bootstrapScript
field.
You can also build your own image with this dependency or use the image apachepinot/pinot-superset:latest
instead.
Replace the default admin credentials inside the init
section with a meaningful user profile and stronger password.
Install Superset using Helm:
Ensure your cluster is up by running:
Run the below command to port forward Superset to your localhost:18088
.
Navigate to Superset in your browser with the admin credentials you set in the previous section.
Create a new database connection with the following URI: pinot+http://pinot-broker.pinot-quickstart:8099/query?controller=http://pinot-controller.pinot-quickstart:9000/
Once the database is added, you can add more data sets and explore the dashboard options.
Deploy Trino with the Pinot plugin installed:
See the charts in the Trino Helm chart repository:
In order to connect Trino to Pinot, you'll need to add the Pinot catalog, which requires extra configurations. Run the below command to get all the configurable values.
To add the Pinot catalog, edit the additionalCatalogs
section by adding:
After modifying the /tmp/trino-values.yaml
file, deploy Trino with:
Once you've deployed Trino, check the deployment status:
Once Trino is deployed, run the below command to get a runnable Trino CLI.
Download the Trino CLI:
Port forward Trino service to your local if it's not already exposed:
Use the Trino console client to connect to the Trino service:
Query Pinot data using the Trino CLI, like in the sample queries below.
First, deploy Presto with default configurations:
To customize your deployment, run the below command to get all the configurable values.
After modifying the /tmp/presto-values.yaml
file, deploy Presto:
Once you've deployed the Presto instance, check the deployment status:
Download the Presto CLI:
Port forward presto-coordinator
port 8080 to localhost
port 18080:
Start the Presto CLI with the Pinot catalog:
Query Pinot data with the Presto CLI, like in the sample queries below.
To delete your Pinot cluster in Kubernetes, run the following command:
Discover the core components of Apache Pinot, enabling efficient data processing and analytics. Unleash the power of Pinot's building blocks for high-performance data-driven applications.
Apache Pinot™ is a database designed to deliver highly concurrent, ultra-low-latency queries on large datasets through a set of common data model abstractions. Delivering on these goals requires several foundational architectural commitments, including:
Storing data in columnar form to support high-performance scanning
Sharding of data to scale both storage and computation
A distributed architecture designed to scale capacity linearly
A tabular data model read by SQL queries
Learn about the major components and logical abstractions used in Pinot.
Learn how segment thresholds work in Pinot.
The segment threshold determines when a segment is committed in real-time tables.
When data is first ingested from a streaming provider like Kafka, Pinot stores the data in a consuming segment.
This segment is on the disk of the server(s) processing a particular partition from the streaming provider.
The segment threshold is important because it ensures segments are a reasonable size.
When queries are processed, smaller segments may increase query latency due to more overhead (number of threads spawned, meta data processing, and so on).
Larger segments may cause servers to run out of memory. When a server is restarted, the consuming segment must start consuming from the first row again, causing a lag between Pinot and the streaming provider.
Mark Needham explains the segment threshold
Learn about time boundaries in hybrid tables.
Learn about time boundaries in hybrid tables. Hybrid tables are when we have offline and real-time tables with the same name.
When querying these tables, the Pinot broker decides which records to read from the offline table and which to read from the real-time table. It does this using the time boundary.
The time boundary is determined by looking at the maximum end time of the offline segments and the segment ingestion frequency specified for the offline table.
If it's set to hourly, then:
Otherwise:
It is possible to force the hybrid table to use max(all offline segments' end time
) by calling the API (V 0.12.0+)
Note that this will not automatically update the time boundary as more segments are added to the offline table, and must be called each time a segment with more recent end time is uploaded to the offline table. You can revert back to using the derived time boundary by calling API:
When a Pinot broker receives a query for a hybrid table, the broker sends a time boundary annotated version of the query to the offline and real-time tables.
For example, if we executed the following query:
The broker would send the following query to the offline table:
And the following query to the real-time table:
The results of the two queries are merged by the broker before being returned to the client.
This section contains quick start guides to help you get up and running with Pinot.
Explore the Schema component in Apache Pinot, vital for defining the structure and data types of Pinot tables, enabling efficient data processing and analysis.
Each table in Pinot is associated with a schema. A schema defines:
Fields in the table with their data types.
The schema is stored in Zookeeper along with the table configuration.
A schema also defines what category a column belongs to. Columns in a Pinot table can be categorized into three categories:
Pinot does not enforce strict rules on which of these categories columns belong to, rather the categories can be thought of as hints to Pinot to do internal optimizations.
For example, metrics may be stored without a dictionary and can have a different default null value.
The categories are also relevant when doing segment merge and rollups. Pinot uses the dimension and time fields to identify records against which to apply merge/rollups.
Metrics aggregation is another example where Pinot uses dimensions and time are used as the key, and automatically aggregates values for the metric columns.
Let's create a schema and put it in a JSON file. For this example, we have created a schema for flight data.
Then, we can upload the sample schema provided above using either a Bash command or REST API call.
Pinot Data Explorer is a user-friendly interface in Apache Pinot for interactive data exploration, querying, and visualization.
Once you have set up a cluster, you can start exploring the data and the APIs using the Pinot Data Explorer.
The first screen that you'll see when you open the Pinot Data Explorer is the Cluster Manager. The Cluster Manager provides a UI to operate and manage your cluster.
If you want to view the contents of a server, click on its instance name. You'll then see the following:
To view the baseballStats table, click on its name, which will show the following screen:
From this screen, we can edit or delete the table, edit or adjust its schema, as well as several other operations.
For example, if we want to add yearID to the list of inverted indexes, click on Edit Table, add the extra column, and click Save:
You can also execute a sample query select * from baseballStats limit 10
by typing it in the text box and clicking the Run Query button.
Cmd + Enter
can also be used to run the query when focused on the console.
Here are some sample queries you can try:
This guide will show you to run a Pinot cluster using Docker.
Get started setting up a Pinot cluster with Docker using the guide below.
Prerequisites:
Configure Docker memory with the following minimum resources:
CPUs: 8
Memory: 16.00 GB
Swap: 4 GB
Disk Image size: 60 GB
Pull the latest Docker image onto your machine by running the following command:
To pull a specific version, modify the command like below:
Once you've downloaded the Pinot Docker image, it's time to set up a cluster. There are two ways to do this.
Pinot comes with quick start commands that launch instances of Pinot components in the same process and import pre-built datasets.
For example, the following quick start command launches Pinot with a baseball dataset pre-loaded:
Below are the usages of different ports:
2123: Zookeeper Port
9000: Pinot Controller Port
8000: Pinot Broker Port
7050: Pinot Server Port
6000: Pinot Minion Port
The quick start scripts launch Pinot with minimal resources. If you want to play with bigger datasets (more than a few MB), you can launch each of the Pinot components individually.
Create an isolated bridge network in docker
Export the necessary docker image tags for Pinot, Zookeeper, and Kafka.
Start Pinot Controller in daemon and connect to Zookeeper.
Start Pinot Broker in daemon and connect to Zookeeper.
Start Pinot Server in daemon and connect to Zookeeper.
Optionally, you can also start Kafka for setting up real-time streams. This brings up the Kafka broker on port 9092.
Now all Pinot related components are started as an empty cluster.
Run the below command to check container status:
Sample Console Output
Optionally, export the necessary docker image tags for Pinot, Zookeeper, and Kafka.
Create a file called docker-compose.yml that contains the following:
Run the following command to launch all the required components:
OR, optionally, run the following command to launch all the components, including kafka:
Run the below command to check the container status:
Sample Console Output
Step-by-step guide for pushing your own data into the Pinot cluster
Let's gather our data files and put them in pinot-quick-start/rawdata
.
Supported file formats are CSV, JSON, AVRO, PARQUET, THRIFT, ORC. If you don't have sample data, you can use this sample CSV.
Columns are categorized into 3 types:
In our example transcript-schema, the studentID,firstName,lastName,gender,subject
columns are the dimensions, the score
column is the metric and timestampInEpoch
is the time column.
Once you have identified the dimensions, metrics and time columns, create a schema for your data, using the following reference.
Here's the table configuration for the sample CSV file. You can use this as a reference to build your own table configuration. Edit the tableName and schemaName.
Review the directory structure so far.
Upload the table configuration using the following command.
To generate a segment, first create a job specification (JobSpec) yaml file. A JobSpec yaml file contains all the information regarding data format, input data location, and pinot cluster coordinates. Copy the following job specification file (example from Pinot quickstart file). If you're using your own data, be sure to do the following:
Replace transcript
with your table name
Set the correct recordReaderSpec
Depending if you're using Docker or a launcher script, choose one of the following commands to generate a segment to upload to Pinot:
Here is some sample output.
We'd love to hear from you! to ask questions, troubleshoot, and share feedback.
Pinot was originally built at LinkedIn to power rich interactive real-time analytics applications, such as , , , and many more. is another example of a user-facing analytics app built with Pinot.
Pinot can perform typical analytical operations such as slice and dice, drill down, roll up, and pivot on large scale multi-dimensional data. For instance, at LinkedIn, Pinot powers dashboards for thousands of business metrics. Connect various business intelligence (BI) tools such as , , or to visualize data in Pinot.
Pinot prevent any possibility of sharing ownership of database tables across microservice teams. Developers can create their own query models of data from multiple systems of record depending on their use case and needs. As with all aggregate stores, query models are eventually consistent.
Horizontally scalable: Operators can scale a Pinot cluster by adding new nodes when the workload increases. There are even two node types ( and ) to scale query volume, query complexity, and data size independently.
As described in the Pinot , Pinot has four node types:
Distributed systems do not maintain themselves, and in fact require sophisticated scheduling and resource management to function. Pinot uses for this purpose. Helix exists as an independent project, but it was designed by the original creators of Pinot for Pinot's own cluster management purposes, so the architectures of the two systems are well-aligned. Helix takes the form of a process on the controller, plus embedded agents on the brokers and servers. It uses as a fault-tolerant, strongly consistent, durable state store.
The Pinot schedules and re-schedules resources in a Pinot cluster when metadata changes or a node fails. As an Apache Helix Controller, it schedules the resources that comprise the cluster and orchestrates connections between certain external processes and cluster components (e.g., ingest of and ). It can be deployed as a single process on its own server or as a group of redundant servers in an active/passive configuration.
The controller provides a REST interface that allows read and write access to all logical storage resources (e.g., servers, brokers, tables, and segments). See for more information on the web-based admin tool.
The responsibility is to route queries to the appropriate instances, or in the case of multi-stage queries, to compute a complete query plan and distribute it to the servers required to execute it. The broker collects and merges the responses from all servers into a final result, then sends the result back to the requesting client. The broker exposes an HTTP endpoint that accepts SQL queries in JSON format and returns the response in JSON.
Each broker maintains a query routing table. The routing table maps segments to the servers that store them. (When replication is configured on a table, each segment is stored on more than one server.) The broker computes multiple routing tables depending on the configured strategy for a table. The default strategy is to balance the query load across all available servers.
Every query processed by a broker uses the single-stage engine or the . For single-stage queries, the broker does the following:
Computes query routes based on the routing strategy defined in the configuration.
Computes the list of segments to query on each . (See for further details on this process.)
The servers that received query plans each execute their part of the query. For more details on this process, read about the .
host on locally attached storage and process queries on those segments. By convention, operators speak of "real-time" and "offline" servers, although there is no difference in the server process itself or even its configuration that distinguishes between the two. This is merely a convention reflected in the assignment strategy to confine the two different kinds of workloads to two groups of physical instances, since the performance-limiting factors differ between the two kinds of workloads. For example, offline servers might optimize for larger storage capacity, whereas real-time servers might optimize for memory and CPU cores.
Offline servers host segments created by ingesting batch data. The controller writes these segments to the offline server according to the table's replication factor and segment assignment strategy. Typically, the controller writes new segments to the , and affected servers download the segment from deep store. The controller then notifies brokers that a new segment exists, and is available to participate in queries.
A Pinot is an optional cluster component that executes background tasks on table data apart from the query processes performed by brokers and servers. Minions run on independent hardware resources, and are responsible for executing minion tasks as directed by the controller. Examples of minion tasks include converting batch data from a standard format like Avro or JSON into segment files to be loaded into an offline table, and rewriting existing segment files to purge records as required by data privacy laws like GDPR. Minion tasks can run once or be scheduled to run periodically.
Pinot exist in two varieties: offline (or batch) and real-time. Offline tables contain data from batch sources like CSV, Avro, or Parquet files, and real-time tables contain data from streaming sources like like Apache Kafka®, Apache Pulsar®, or AWS Kinesis.
Pinot ingests batch data using an , which follows a process like this:
The job transforms a raw data source (such as a CSV file) into . This is a potentially complex process resulting in a file that is typically several hundred megabytes in size.
The job then transfers the file to the cluster's and notifies the that a new segment exists.
The controller then assigns the segment to one or more "offline" (depending on replication factor) and notifies them that new segments are available.
Pinot ships with QuickStart
commands that launch Pinot components in a single process and import pre-built datasets. These quick start examples are a good place if you're just getting started with Pinot. The examples begin with the example, after the following notes:
You must have either or . The examples are available in each option and work the same. The decision of which to choose depends on your installation preference and how you generally like to work. If you don't know which to choose, using Docker will make your cleanup easier after you are done with the examples.
This example demonstrates how to do with Pinot. The command:
This example demonstrates how to do with JSON documents in Pinot. The command:
This example demonstrates how to do joins in Pinot using the . The command:
Make sure that you've downloaded Apache Pinot. The scripts for the setup in this guide can be found in our.
The Pinot repository has pre-packaged Helm charts for Pinot and Presto. The Helm repository index file is .
Once Presto is deployed, you can run the below command from , or follow the steps below.
However, it's not until a segment is committed that the segment is written to the . The segment threshold decides when that should happen.
For a full list of these guides, see .
Getting data into Pinot is easy. Take a look at these two quick start guides which will help you get up and running with sample data for offline and real-time .
Whether the table uses column-based or table-based null handling. For more information, see .
For configuration details, see .
Since Pinot doesn't have a dedicated DATETIME
datatype support, you need to input time in either STRING, LONG, or INT format. However, Pinot needs to convert the date into an understandable format such as epoch timestamp to do operations. You can refer to for more details on supported formats.
First, Make sure your and running.
For more details on constructing a schema file, see the .
Check out the schema in the to make sure it was successfully uploaded
Navigate to in your browser to open the Data Explorer UI.
Let's run some queries on the data in the Pinot cluster. Navigate to to see the querying interface.
We can see our baseballStats
table listed on the left (you will see meetupRSVP
or airlineStats
if you used the streaming or the hybrid ). Click on the table name to display all the names along with the data types of the columns of the table.
Pinot supports a subset of standard SQL. For more information, see .
The contains all the APIs that you will need to operate and manage your cluster. It provides a set of APIs for Pinot cluster management including health check, instances management, schema and table management, data segments management.
Let's check out the tables in this cluster by going to , click Try it out, and then click Execute. We can see thebaseballStats
table listed here. We can also see the exact cURL call made to the controller API.
You can look at the configuration of this table by going to , click Try it out, type baseballStats
in the table name, and then click Execute.
Let's check out the schemas in the cluster by going to , click Try it out, and then click Execute. We can see a schema called baseballStats
in this list.
Take a look at the schema by going to , click Try it out, type baseballStats
in the schema name, and then click Execute.
Finally, let's check out the data segments in the cluster by going to , click Try it out, type in baseballStats
in the table name, and then click Execute. There's 1 segment for this table, called baseballStats_OFFLINE_0
.
To learn how to upload your own data and schema, see or .
Install
The latest Pinot Docker image is published at apachepinot/pinot:latest
. View a list of .
For a list of all available quick start commands, see .
Start Zookeeper in daemon mode. This is a single node zookeeper setup. Zookeeper is the central metadata store for Pinot and should be set up with replication for production use. For more information, see .
Once your cluster is up and running, see to learn how to run queries against the data.
If you have or installed, you can also try running the .
This example assumes you have set up your cluster using .
Schema is used to define the columns and data types of the Pinot table. A detailed overview of the schema can be found in .
A table configuration is used to define the configuration related to the Pinot table. A detailed overview of the table can be found in .
Use the that is running on your Pinot instance to review the table configuration and schema and make sure it was successfully uploaded. This link uses localhost
as an example.
Pinot table data is stored as Pinot segments. A detailed overview of segments can be found in .
If everything worked, find your table in the to run queries against it.
Dimensions
Typically used in filters and group by, for slicing and dicing into data
Metrics
Typically used in aggregations, represents the quantitative data
Time
Optional column, represents the timestamp associated with each row
Dimension
Dimension columns are typically used in slice and dice operations for answering business queries. Some operations for which dimension columns are used:
GROUP BY
- group by one or more dimension columns along with aggregations on one or more metric columns
Filter clauses such as WHERE
Metric
These columns represent the quantitative data of the table. Such columns are used for aggregation. In data warehouse terminology, these can also be referred to as fact or measure columns.
Some operation for which metric columns are used:
Aggregation - SUM
, MIN
, MAX
, COUNT
, AVG
etc
Filter clause such as WHERE
DateTime
Common operations that can be done on time column:
GROUP BY
Filter clauses such as WHERE
Create and edit a table configuration in the Pinot UI or with the API.
In Apache Pinot, create a table by creating a JSON file, generally referred to as your table config. Update, add, or delete parameters as needed, and then reload the file.
Before you create a Pinot table configuration, you must first have a running Pinot cluster with broker and server tenants.
Use the Pinot API to upload your table config file: POST @fileName.json URL:9000/tables
To modify your Pinot table configuration, use the Pinot UI or the API.
Any time you make a change to your table config, you may need to do one or more of the following, depending on the change.
Simple changes only require updating and saving your modified table configuration file. These include:
Changing the data or segment retention time
To update existing data and segments, after you update and save the changes to the table config file, do the following as applicable:
In the Pinot UI, from the table page, click Reload All Segments.
Using the Pinot API, send POST /segments/{tableName}/reload
.
When you change the transform function used to populate a derived field or increase the number of partitions in an upsert-enabled table, perform a table re-bootstrap. One way to do this is to delete and recreate the table:
Using the Pinot API, first send DELETE /tables/{tableName}
followed by POST /tables
with the new table configuration.
When you change the stream topic or change the Kafka cluster containing the Kafka topic you want to consume from, perform a real-time ingestion pause and resume. To pause and resume real-time ingestion:
Using the Pinot API, first send POST /tables/{tableName}/pauseConsumption
followed by POST /tables/{tableName}/resumeConsumption
.
To update a table configuration in the Pinot UI, do the following:
In the Cluster Manager click the Tenant Name of the tenant that hosts the table you want to modify.
Click the Table Name in the list of tables in the tenant.
Click the Edit Table button. This creates a pop-up window containing the table configuration. Edit the contents in this window. Click Save when you are done.
To update a table configuration using the Pinot API, do the following:
Get the current table configuration with GET /tables/{tableName}
.
Modify the file locally.
Upload the edited file with PUT /table/{tableName} fileName.json
.
The Docker instructions on this page are still WIP
Let's set up a demo Kafka cluster locally, and create a sample topic transcript-topic
.
Next, upload the table and schema to the cluster. As soon as the real-time table is created, it will begin ingesting from the Kafka topic.
Use the following sample JSON file for transcript table data in the following step.
Push the sample JSON file into the Kafka topic, using the Kafka script from the Kafka download.
This page has a collection of frequently asked questions of a general nature with answers from the community.
When data is pushed to Apache Pinot, Pinot makes a backup copy of the data and stores it on the configured deep-storage (S3/GCP/ADLS/NFS/etc). This copy is stored as tar.gz Pinot segments. Note, that Pinot servers keep a (untarred) copy of the segments on their local disk as well. This is done for performance reasons.
Pinot uses Apache Helix for cluster management, which in turn is built on top of Zookeeper. Helix uses Zookeeper to store the cluster state, including Ideal State, External View, Participants, and so on. Pinot also uses Zookeeper to store information such as Table configurations, schemas, Segment Metadata, and so on.
Pinot uses the local timezone by default. To change the timezone, set the pinot.timezone
value in the .conf
config file. It is set once for all Pinot components (Controller, Broker, Server, Minion). See the following sample configuration:
This quickstart guide helps you get started running Pinot on Amazon Web Services (AWS).
For Mac users
Check kubectl version after installation.
For Mac users
Check helm version after installation.
For Mac users
For Mac users
The script below will create a 1 node cluster named pinot-quickstart in us-west-2 with a t3.xlarge machine for demo purposes:
For k8s 1.23+, run the following commands to allow the containers to provision their storage:
Use the following command to monitor the cluster status:
Once the cluster is in ACTIVE status, it's ready to be used.
Run the following command to get the credential for the cluster pinot-quickstart that you just created:
To verify the connection, run the following:
This guide shows how to set up HDFS as deep storage for a Pinot segment.
To use HDFS as deep storage you need to include HDFS dependency jars and plugins.
If you receive an error that says No FileSystem for scheme"hdfs"
, the problem is likely to be a class loading issue.
To fix, try adding the following property to core-site.xml
:
fs.hdfs.impl org.apache.hadoop.hdfs.DistributedFileSystem
And then export /opt/pinot/lib/hadoop-common-<release-version>.jar
in the classpath.
Insert a file into Pinot from Query Console
Ensure you have available Pinot Minion instances deployed within the cluster.
Pinot version is 0.11.0 or above
Parse the query with the table name and directory URI along with a list of options for the ingestion job.
Call controller minion task execution API endpoint to schedule the task on minion
Response has the schema of table name and task job id.
INSERT INTO [database.]table FROM FILE dataDirURI OPTION ( k=v ) [, OPTION (k=v)]*
We are actively developing this feature...
The details will be revealed soon.
This page lists pages with frequently asked questions with answers from the community.
This page has a collection of frequently asked questions about queries with answers from the community.
This implies that the Pinot Broker assigned to the table specified in the query was not found. A common root cause for this is a typo in the table name in the query. Another uncommon reason could be if there wasn't actually a broker with required broker tenant tag for the table.
"timestamp" is a reserved keyword in SQL. Escape timestamp with double quotes.
Other commonly encountered reserved keywords are date, time, table.
For filtering on STRING columns, use single quotes:
The fields in the ORDER BY
clause must be one of the group by clauses or aggregations, BEFORE applying the alias. Therefore, this will not work:
But, this will work:
No. Pagination only works for SELECTION queries.
You can add this at the end of your query: option(timeoutMs=X)
. Tthe following example uses a timeout of 20 seconds for the query:
You can also use SET "timeoutMs" = 20000; SELECT COUNT(*) from myTable
.
For changing the timeout on the entire cluster, set this property pinot.broker.timeoutMs
in either broker configs or cluster configs (using the POST /cluster/configs API from Swagger).
Add these two configs for Pinot server and broker to start tracking of running queries. The query tracks are added and cleaned as query starts and ends, so should not consume much resource.
Then use the Rest APIs on Pinot controller to list running queries and cancel them via the query ID and broker ID (as query ID is only local to broker), like in the following:
There are two ways to verify this:
Log in to a server that hosts segments of this table. Inside the data directory, locate the segment directory for this table. In this directory, there is a file named index_map
which lists all the indexes and other data structures created for each segment. Verify that the requested index is present here.
During query: Use the column in the filter predicate and check the value of numEntriesScannedInFilter
. If this value is 0, then indexing is working as expected (works for Inverted index).
Yes, Pinot uses a default value of LIMIT 10
in queries. The reason behind this default value is to avoid unintentionally submitting expensive queries that end up fetching or processing a lot of data from Pinot. Users can always overwrite this by explicitly specifying a LIMIT
value.
Pinot does not cache query results. Each query is computed in its entirety. Note though, running the same or similar query multiple times will naturally pull in segment pages into memory making subsequent calls faster. Also, for real-time systems, the data is changing in real-time, so results cannot be cached. For offline-only systems, caching layer can be built on top of Pinot, with invalidation mechanism built-in to invalidate the cache when data is pushed into Pinot.
Pinot memory maps segments. It warms up during the first query, when segments are pulled into the memory by the OS. Subsequent queries will have the segment already loaded in memory, and hence will be faster. The OS is responsible for bringing the segments into memory, and also removing them in favor of other segments when other segments not already in memory are accessed.
The query execution engine will prefer to use the star-tree index for all queries where it can be used. The criteria to determine whether the star-tree index can be used is as follows:
All aggregation function + column pairs in the query must exist in the star-tree index.
All dimensions that appear in filter predicates and group-by should be star-tree dimensions.
For queries where above is true, a star-tree index is used. For other queries, the execution engine will default to using the next best index available.
This column represents time columns in the data. There can be multiple time columns in a table, but only one of them can be treated as primary. The primary time column is the one that is present in the .
The primary time column is used by Pinot to maintain the time boundary between offline and real-time data in a hybrid table and for retention management. A primary time column is mandatory if the table's push type is APPEND
and optional if the push type is REFRESH
.
Create a plaintext file table configuration locally using settings from for your use case. You may find it useful to download and then modify it. An example from among these is included at the end of this page in
Changing the realtime settings
When you add or modify indexes or the table schema, perform a . To all segments:
When you re-partition data, perform a segment . To refresh, replace an existing segment with a new one by uploading a segment reusing the existing filename. Use the Pinot API, send POST /segments?tableName={yourTableName}
.
This example comes from the . This table configuration defines a table called airlineStats_OFFLINE, which you can interact with by running the example.
This example assumes you have set up your cluster using .
First, we need to set up a stream. Pinot has out-of-the-box real-time ingestion support for Kafka. Other streams can be plugged in for use, see .
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9876
using the same Zookeeper from the quick-start examples.
Create a Kafka topic
Download the latest . Create a topic.
If you followed , you have already pushed a schema for your sample table. If not, see to learn how to create a schema for your sample data.
If you followed , you pushed an offline table and schema. To create a real-time table configuration for the sample use this table configuration for the transcript table. For a more detailed overview about table, see .
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Browse to the running in your Pinot instance (we use localhost
in this link as an example) to examine the real-time data.
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
Check the JDK version you are using. You may be getting this error if you are using an older version than the current Pinot binary release was built on. If so, you have two options: switch to the same JDK release as Pinot was built with or download the for the Pinot release and it locally.
In this quickstart guide, you will set up a Kubernetes Cluster on
To install kubectl, see .
Follow this link () to install helm.
Follow this link () to install AWS CLI.
Follow this link () to install AWS CLI.
For first-time AWS users, register your account at .
Once you have created the account, go to to create a user and create access keys under Security Credential tab.
Follow this to deploy your Pinot demo.
This feature is supported after the 0.11.0 release. Reference PR:
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
See this page explaining the Pinot response format: .
In order to speed up aggregations, you can enable metrics aggregation on the required column by adding a in the corresponding schema and setting aggregateMetrics
to true in the table configuration. You can also use a star-tree index config for columns like these ().
This page links to multiple quick start guides for deploying Pinot to different public cloud providers.
These quickstart guides show you how to run an Apache Pinot cluster using Kubernetes on different public cloud providers.
This quickstart guide helps you get started running Pinot on Google Cloud Platform (GCP).
For Mac users
Check kubectl version after installation.
For Mac users
Check helm version after installation.
Install Google Cloud SDK
Restart your shell
This script will create a 3 node cluster named pinot-quickstart in us-west1-b with n1-standard-2 machines for demo purposes.
Modify the parameters in the following example command with your gcloud details:
Use the following command do monitor cluster status:
Once the cluster is in RUNNING status, it's ready to be used.
Run the following command to get the credential for the cluster pinot-quickstart that you just created:
To verify the connection, run the following:
This quickstart guide helps you get started running Pinot on Microsoft Azure.
For Mac users
Check kubectl version after installation.
For Mac users
Check helm version after installation.
For Mac users
This script will open your default browser to sign-in to your Azure Account.
Use the following script create a resource group in location eastus.
This script will create a 3 node cluster named pinot-quickstart for demo purposes.
Modify the parameters in the following example command with your resource group and cluster details:
Once the command succeeds, the cluster is ready to be used.
Run the following command to get the credential for the cluster pinot-quickstart that you just created:
To verify the connection, run the following:
This page has a collection of frequently asked questions about ingestion with answers from the community.
While Apache Pinot can work with segments of various sizes, for optimal use of Pinot, you want to get your segments sized in the 100MB to 500MB (un-tarred/uncompressed) range. Having too many (thousands or more) tiny segments for a single table creates overhead in terms of the metadata storage in Zookeeper as well as in the Pinot servers' heap. At the same time, having too few really large (GBs) segments reduces parallelism of query execution, as on the server side, the thread parallelism of query execution is at segment level.
Yes. Each table can be independently configured to consume from any given Kafka topic, regardless of whether there are other tables that are also consuming from the same Kafka topic.
Pinot automatically detects new partitions in Kafka topics. It checks for new partitions whenever RealtimeSegmentValidationManager
periodic job runs and starts consumers for new partitions.
You can configure the interval for this job using thecontroller.realtime.segment.validation.frequencyPeriod
property in the controller configuration.
The following example partitions the segment based on two columns, memberID
and caseNumber
. Note that each partition column is handled separately, so in this case the segment is partitioned on memberID
(partition ID 1) and also partiitoned on caseNumber
(partition ID 2).
For multi-column partitioning to work, you must also set routing.segementPrunerTypes
as follows:
The partitioning logic in the stream should match the partitioning config in Pinot. Kafka uses murmur2
, and the equivalent in Pinot is the Murmur
function.
Set the partitioning configuration as below using same column used in Kafka:
and also set:
For JSON, you can use a hex encoded string to ingest BYTES.
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
By default, Pinot limits the length of a String column to 512 bytes. If you want to overwrite this value, you can set the maxLength attribute in the schema as follows:
Events are available to queries as soon as they are ingested. This is because events are instantly indexed in memory upon ingestion.
The ingestion of events into the real-time table is not transactional, so replicas of the open segment are not immediately consistent. Pinot trades consistency for availability upon network partitioning (CAP theorem) to provide ultra-low ingestion latencies at high throughput.
This typically happens if:
The consumer is lagging a lot.
The consumer was down (server down, cluster down), and the stream moved on, resulting in offset not found when consumer comes back up.
The output from this API should look something like the following:
Not all indexes can be retrospectively applied to existing segments.
The new segments will have star-tree indexes generated after applying the star-tree index configurations to the table configuration.
Pinot does not require ordering of event time stamps. Out of order events are still consumed and indexed into the "currently consuming" segment. In a pathological case, if you have a 2 day old event come in "now", it will still be stored in the segment that is open for consumption "now". There is no strict time-based partitioning for segments, but star-indexes and hybrid tables will handle this as appropriate.
When generating star-indexes, the time column will be part of the star-tree so the tree can still be efficiently queried for segments with multiple time intervals.
max(OfflineTime)
to determine the time-boundary, and instead using an offset?This lets you have an old event up come in without building complex offline pipelines that perfectly partition your events by event timestamps. With this offset, even if your offline data pipeline produces segments with a maximum timestamp, Pinot will not use the offline dataset for that last chunk of segments. The expectation is if you process offline the next time-range of data, your data pipeline will include any late events.
It might seem odd that segments are not strictly time-partitioned, unlike similar systems such as Apache Druid. This allows real-time ingestion to consume out-of-order events. Even though segments are not strictly time-partitioned, Pinot will still index, prune, and query segments intelligently by time intervals for the performance of hybrid tables and time-filtered data.
When generating offline segments, the segments generated such that segments only contain one time interval and are well partitioned by the time column.
Batch ingestion of data into Apache Pinot using Apache Spark.
Pinot supports Apache Spark (2.x and 3.x) as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
To set up Spark, do one of the following:
Follow the instructions below.
If you do build Pinot from Source, you should consider opting into using the build-shaded-jar
jar profile with -Pbuild-shaded-jar
. While Pinot does not bundle spark into its jar, it does bundle certain hadoop libraries.
To run Spark ingestion, you need the following jars in your classpath
pinot-batch-ingestion-spark
plugin jar - available in plugins-external
directory in the package
pinot-all
jar - available in lib
directory in the package
These jars can be specified using spark.driver.extraClassPath
or any other option.
For loading any other plugins that you want to use, use:
The complete spark-submit command should look like this:
Ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Note: You should change the master
to yarn
and deploy-mode
to cluster
for production environments.
If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -
Build Pinot from source with option -DuseProvidedHadoop
Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.
Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files
argument in the command
Add --jars
options that contain the s3/hdfs paths to all the required plugin and pinot-all jar
Point classPath
to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file
Example
For Spark 3.x, replace pinot-batch-ingestion-spark-2.4
with pinot-batch-ingestion-spark-3.2
in all places in the commands.
Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark.
to
org.apache.pinot.plugin.ingestion.batch.spark3.
Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
Q - I am not able to find pinot-batch-ingestion-spark
jar.
For Pinot version prior to 0.10.0, the spark plugin is located in plugin
dir of binary distribution. For 0.10.0 and later, it is located in pinot-external
dir.
Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException
This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath
and spark.executor.classpath
are set. Also, the jars in driver.classpath
should be added to --jars
argument in spark-submit
so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\
but it can be different depending on your cluster setup.
Q - Spark job failing while pushing the segments.
It can be because of misconfigured controllerURI
in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.
Q - My data gets overwritten during ingestion.
Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*
Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
from spark.driver.extraJavaOptions
should fix this. As long as plugins are mentioned in classpath and jars
argument it should not be an issue.
Q - Getting Class not found:
exception
Check if extraClassPath
arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars
argument. If both of these are correct, check if the extraClassPath
contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.
Pinot offers various ways to assist with troubleshooting and debugging problems that might happen.
The table debug API can be invoked via the Swagger UI, as in the following image:
It can also be invoked directly by accessing the URL as follows. The api requires the tableName
, and can optionally take tableType (offline|realtime)
and verbosity
level.
Finally, all pinot components log debug information related to error conditions.
Use the following steps:
If the query executes, look at the query result. Specifically look at numEntriesScannedInFilter
and numDocsScanned
.
If numEntriesScannedInFilter
is very high, consider adding indexes for the corresponding columns being used in the filter predicates. You should also think about partitioning the incoming data based on the dimension most heavily used in your filter queries.
If numDocsScanned
is very high, that means the selectivity for the query is low and lots of documents need to be processed after the filtering. Consider refining the filter to increase the selectivity of the query.
If the query is not executing, you can extend the query timeout by appending a timeoutMs
parameter to the query, for example, select * from mytable limit 10 option(timeoutMs=60000)
. Then repeat step 1, as needed.
Look at garbage collection (GC) stats for the corresponding Pinot servers. If a particular server seems to be running full GC all the time, you can do a couple of things such as
Increase Java Virtual Machine (JVM) heap (java -Xmx<size>
).
Consider using off-heap memory for segments.
Decrease the total number of segments per server (by partitioning the data in a more efficient way).
In this quickstart guide, you will set up a Kubernetes Cluster on
Follow this link () to install kubectl.
Follow this link () to install helm.
To install Google Cloud SDK, see
Follow this to deploy your Pinot demo.
In this quickstart guide, you will set up a Kubernetes Cluster on
Follow this link () to install kubectl.
To install Helm, see .
Follow this link () to install Azure CLI.
Follow this to deploy your Pinot demo.
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
Pinot supports multi-column partitioning for offline tables. Map multiple columns under Pinot assigns the input data to each partition according to the partition configuration individually for each column.
Set up partitioner in the Kafka producer:
To learn how partition works, see .
See the function which can store a top level json field as a STRING in Pinot.
Then you can use these during query time, to extract fields from the json string.
To use explicit code points, you must double-quote (not single-quote) the string, and escape the code point via "\uHHHH", where HHHH is the four digit hex code for the character. See for more details.
However, when the open segment is closed and its in-memory indexes are flushed to persistent storage, all its replicas are guaranteed to be consistent, with the .
In case of Kafka, to recover, set property "auto.offset.reset":"earliest"
in the streamConfigs
section and reset the CONSUMING
segment. See for more details about the configuration.
You can also also use the "Resume Consumption" endpoint with "resumeFrom" parameter set to "smallest" (or "largest" if you want). See for more details.
Inverted indexes are set in the tableConfig
's tableIndexConfig
-> invertedIndexColumns
list. For more info on table configuration, see . For an example showing how to configure an inverted index, see .
Applying inverted indexes to a table configuration will generate an inverted index for all new segments. To apply the inverted indexes to all existing segments, see
Add the columns you want to index to the tableIndexConfig
-> invertedIndexColumns
list. To update the table configuration use the Pinot Swagger API: .
Invoke the reload API: .
Once you've done that, you can check whether the index has been applied by querying the segment metadata API at . Don't forget to include the names of the column on which you have applied the index.
If you want to add or change the or adjust you will need to manually re-load any existing segments.
Star-tree indexes are configured in the table config under the tableIndexConfig
-> starTreeIndexConfigs
(list) and enableDefaultStarTree
(boolean). See here for more about how to configure star-tree indexes:
See the for more details about how hybrid tables handle this. Specifically, the time-boundary is computed as max(OfflineTIme) - 1 unit of granularity
. Pinot does store the min-max time for each segment and uses it for pruning segments, so segments with multiple time intervals may not be perfectly pruned.
Use the Spark-Pinot Connector. For more information, see the .
You can follow the to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the to the following:
We have stopped including spark-core
dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark
in case of any runtime issues. You can either or download latest master build jars.
Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the Java8 version of Pinot. Currently, you need to .
Set to APPEND
in the tableConfig.
If already set to APPEND
, this is likely due to a missing timeColumnName
in your table config. If you can't provide a time column, use our in ingestion spec. Generally using inputFile
segment name generator should fix your issue.
Start with the which will surface many of the commonly occurring problems. The debug api provides information such as tableSize, ingestion status, and error messages related to state transition in server.
Pinot also provides a variety of operational metrics that can be used for creating dashboards, alerting and .
This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into your table config:
where the Kinesis specific properties are:
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
Although you can also specify the accessKey
and secretKey
in the properties above, we don't recommend this insecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
In Kinesis, whenever you reshard a stream, it is done via split or merge operations on shards. If you split a shard, the shard closes and creates 2 new children shards. So if you started with shard0, and then split it, it would result in shard1 and shard2. Similarly, if you merge 2 shards, both those will close and create a child shard. So in the same example, if you merge shards 1 and 2, you'll end up with shard3 as the active shard, while shard0, shard1, shard2 will remain closed forever.
We finish ingesting from parent shards completely
And after 1, the RealtimeValidationManager runs
You will see a period where the ideal state will show all segments ONLINE, as parents have naturally completed ingesting, and we're waiting for RealtimeValidationManager to kickstart the ingestion from children.
ShardID
is of the format "shardId-000000000001". We use the numeric part as partitionId
. Our partitionId
variable is integer. If shardIds grow beyond Integer.MAX\_VALUE
, we will overflow into the partitionId space.
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
This guide shows you how to ingest a stream of records into a Pinot table.
Apache Pinot lets users consume data from streams and push it directly into the database. This process is called stream ingestion. Stream ingestion makes it possible to query data within seconds of publication.
Stream ingestion provides support for checkpoints for preventing data loss.
To set up Stream ingestion, perform the following steps, which are described in more detail in this page:
Create schema configuration
Create table configuration
Create ingestion configuration
Upload table and schema spec
Here's an example where we assume the data to be ingested is in the following format:
For our sample data, the schema configuration looks like this:
ingestionConfig
For our sample data and schema, the table config will look like this:
ingestionConfig
for multi-topics ingestionAll transform functions would apply to both topics' ingestions.
Existing instance assignment strategy would all work as usual.
Underlying ingestion still works as LOWLEVEL
mode, where
transcript-topic1
segments would be named like transcript__0__0__20250101T0000Z
transcript-topic2
segments would be named like transcript__10000__0__20250101T0000Z
Now that we have our table and schema configurations, let's upload them to the Pinot cluster. As soon as the configs are uploaded, Pinot will start ingesting available records from the topic.
There are some scenarios where the message rate in the input stream can come in bursts which can lead to long GC pauses on the Pinot servers or affect the ingestion rate of other real-time tables on the same server. If this happens to you, throttle the consumption rate during stream ingestion to better manage overall performance.
Stream consumption throttling can be tuned using the stream config topic.consumption.rate.limit
which indicates the upper bound on the message rate for the entire topic.
Here is the sample configuration on how to configure the consumption throttling:
Some things to keep in mind while tuning this config are:
Since this configuration applied to the entire topic, internally, this rate is divided by the number of partitions in the topic and applied to each partition's consumer.
In case of multi-tenant deployment (where you have more than 1 table in the same server instance), you need to make sure that the rate limit on one table doesn't step on/starve the rate limiting of another table. So, when there is more than 1 table on the same server (which is most likely to happen), you may need to re-tune the throttling threshold for all the streaming tables.
Once throttling is enabled for a table, you can verify by searching for a log that looks similar to:
In addition, you can monitor the consumption rate utilization with the metric COSUMPTION_QUOTA_UTILIZATION
.
There are some scenarios in which you may want to pause the real-time ingestion while your table is available for queries. For example, if there is a problem with the stream ingestion and, while you are troubleshooting the issue, you still want the queries to be executed on the already ingested data. For these scenarios, you can first issue a Pause request to a Controller host. After troubleshooting with the stream is done, you can issue another request to Controller to resume the consumption.
When a Pause
request is issued, the controller instructs the real-time servers hosting your table to commit their consuming segments immediately. However, the commit process may take some time to complete. Note that Pause
and Resume
requests are async. An OK
response means that instructions for pausing or resuming has been successfully sent to the real-time server. If you want to know if the consumption has actually stopped or resumed, issue a pause status request.
It's worth noting that consuming segments on real-time servers are stored in volatile memory, and their resources are allocated when the consuming segments are first created. These resources cannot be altered if consumption parameters are changed midway through consumption. It may take hours before these changes take effect. Furthermore, if the parameters are changed in an incompatible way (for example, changing the underlying stream with a completely new set of offsets, or changing the stream endpoint from which to consume messages), it will result in the table getting into an error state.
The pause and resume feature is helpful in these instances. When a pause request is issued by the operator, consuming segments are committed without starting new mutable segments. Instead, new mutable segments are started only when the resume request is issued. This mechanism provides the operators as well as developers with more flexibility. It also enables Pinot to be more resilient to the operational and functional constraints imposed by underlying streams.
There is another feature called Force Commit
which utilizes the primitives of the pause and resume feature. When the operator issues a force commit request, the current mutable segments will be committed and new ones started right away. Operators can now use this feature for all compatible table config parameter changes to take effect immediately.
(v 0.12.0+) Once submitted, the forceCommit API returns a jobId that can be used to get the current progress of the forceCommit operation. A sample response and status API call:
For incompatible parameter changes, an option is added to the resume request to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a pause request. Second, change the consumption parameters. Finally, issue the resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.
Often, it is important to understand the rate of ingestion of data into your real-time table. This is commonly done by looking at the consumption lag of the consumer. The lag itself can be observed in many dimensions. Pinot supports observing consumption lag along the offset dimension and time dimension, whenever applicable (as it depends on the specifics of the connector).
The ingestion status of a connector can be observed by querying either the /consumingSegmentsInfo
API or the table's /debug
API, as shown below:
A sample response from a Kafka-based real-time table is shown below. The ingestion status is displayed for each of the CONSUMING segments in the table.
Real-time ingestion includes 3 stages of message processing: Decode, Transform, and Index.
In each of these stages, a failure can happen which may or may not result in an ingestion failure. The following metrics are available to investigate ingestion issues:
Decode stage -> an error here is recorded as INVALID_REALTIME_ROWS_DROPPED
Transform stage -> possible errors here are:
When the transform pipeline sets the $INCOMPLETE_RECORD_KEY$
key in the message, it is recorded as INCOMPLETE_REALTIME_ROWS_CONSUMED
, only when continueOnError
configuration is enabled. If the continueOnError
is not enabled, the ingestion fails.
Index stage -> When there is failure at this stage, the ingestion typically stops and marks the partition as ERROR.
There is yet another metric called ROWS_WITH_ERROR
which is the sum of all error counts in the 3 stages above.
Furthermore, the metric REALTIME_CONSUMPTION_EXCEPTIONS
gets incremented whenever there is a transient/permanent stream exception seen during consumption.
These metrics can be used to understand why ingestion failed for a particular table partition before diving into the server logs.
Kinesis supports authentication using the . The credential provider looks for the credentials in the following order:
You must provide all read
access level
permissions for Pinot to work with an AWS Kinesis data stream. See the for details.
Please check out this recipe for more details:
In Pinot, resharding of any stream is detected by periodic task RealtimeValidationManager: . This runs hourly. If you rehsard, your new shards will not get detected unless:
If you need the ingestion to happen sooner, you can manually invoke the RealtimeValidationManager:
The schema defines the fields along with their data types. The schema also defines whether fields serve as dimensions
, metrics
, or timestamp
. For more details on schema configuration, see .
The next step is to create a table where all the ingested data will flow and can be queried. For details about each table component, see the reference.
The table configuration contains an ingestion configuration (ingestionConfig
), which specifies how to ingest streaming data into Pinot. For details, see the reference.
From , Pinot starts to support ingesting data from multiple stream partitions. (It is currently in Beta mode, and only supports multiple Kafka topics. Other stream types would be supported in the near future.) For our sample data and schema, assume that we duplicate it to 2 topics, transcript-topic1
and transcript-topic2
. If we want to ingest from both topics, then the table config will look like this:
With multi-topics ingestion: (details please refer to the )
would still be handled in the same way.
Note that any configuration change for topic.consumption.rate.limit
in the stream config will NOT take effect immediately. The new configuration will be picked up from the next consuming segment. In order to enforce the new configuration, you need to trigger forceCommit APIs. Refer to for more details.
You can also write an ingestion plugin if the platform you are using is not supported out of the box. For a walkthrough, see .
If a Pinot table is configured to consume using a (partition-based) stream type, then it is possible that the partitions of the table change over time. In Kafka, for example, the number of partitions may increase. In Kinesis, the number of partitions may increase or decrease -- some partitions could be merged to create a new one, or existing partitions split to create new ones.
Pinot runs a periodic task called RealtimeSegmentValidationManager
that monitors such changes and starts consumption on new partitions (or stops consumptions from old ones) as necessary. Since this is a that is run on the controller, it may take some time for Pinot to recognize new partitions and start consuming from them. This may delay the data in new partitions appearing in the results that pinot returns.
If you want to recognize the new partitions sooner, then the periodic task so as to recognize such data immediately.
When a message gets dropped due to the transform, it is recorded as REALTIME_ROWS_FILTERED
currentOffsetsMap
Current consuming offset position per partition
latestUpstreamOffsetMap
(Wherever applicable) Latest offset found in the upstream topic partition
recordsLagMap
(Whenever applicable) Defines how far behind the current record's offset / pointer is from upstream latest record. This is calculated as the difference between the latestUpstreamOffset
and currentOffset
for the partition when the lag computation request is made.
recordsAvailabilityLagMap
(Whenever applicable) Defines how soon after record ingestion was the record consumed by Pinot. This is calculated as the difference between the time the record was consumed and the time at which the record was ingested upstream.
Batch ingestion of data into Apache Pinot using Apache Hadoop.
Next, you need to change the execution config in the job spec to the following -
You can check out the sample job spec here.
Finally execute the hadoop job using the command -
Ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
We’ve seen some requests that data should be massaged (like partitioning, sorting, resizing) before creating and pushing segments to Pinot.
The MapReduce job called SegmentPreprocessingJob
would be the best fit for this use case, regardless of whether the input data is of AVRO or ORC format.
Check the below example to see how to use SegmentPreprocessingJob
.
In Hadoop properties, set the following to enable this job:
In table config, specify the operations in preprocessing.operations
that you'd like to enable in the MR job, and then specify the exact configs regarding those operations:
Minimum number of reducers. Optional. Fetched when partitioning gets disabled and resizing is enabled. This parameter is to avoid having too many small input files for Pinot, which leads to the case where Pinot server is holding too many small segments, causing too many threads.
Maximum number of records per reducer. Optional.Unlike, “preprocessing.num.reducers”, this parameter is to avoid having too few large input files for Pinot, which misses the advantage of muti-threading when querying. When not set, each reducer will finally generate one output file. When set (e.g. M), the original output file will be split into multiple files and each new output file contains at most M records. It does not matter whether partitioning is enabled or not.
Batch ingestion of data into Apache Pinot using Apache Flink.
Pinot supports Apache Flink as a processing framework to push segment files to the database.
PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.
Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.
Here is an example table config
the only required configurations are:
"outputDirURI"
: where PinotSinkFunction should write the constructed segment file to
"push.controllerUri"
: which Pinot cluster (controller) URL PinotSinkFunction should communicate with.
The rest of the configurations are standard for any Pinot table.
Upsert support in Apache Pinot.
Pinot provides native support of upserts during real-time ingestion. There are scenarios where records need modifications, such as correcting a ride fare or updating a delivery status.
Partial upserts are convenient as you only need to specify the columns where values change, and you ignore the rest.
See an overview of how upserts work in Pinot 1.0.
To enable upserts on a Pinot table, do the following:
To update a record, you need a primary key to uniquely identify the record. To define a primary key, add the field primaryKeyColumns
to the schema definition. For example, the schema definition of UpsertMeetupRSVP
in the quick start example has this definition.
Note this field expects a list of columns, as the primary key can be a composite.
When two records of the same primary key are ingested, the record with the greater comparison value (timeColumn by default) is used. When records have the same primary key and event time, then the order is not determined. In most cases, the later ingested record will be used, but this may not be true in cases where the table has a column to sort by.
Partition the input stream by the primary key
Additionally if using segmentPartitionConfig
to leverage Broker segment pruning then it's important to ensure that the partition function used matches both on the Kafka producer side as well as Pinot. In Kafka default for Java client is 32-bit murmur2 hash and for all other languages such as Python its CRC32 (Cyclic Redundancy Check 32-bit).
To enable upsert, make the following configurations in the table configurations.
Full upsert
The upsert mode defaults to FULL
. FULL upsert means that a new record will replace the older record completely if they have same primary key. Example config:
Partial upserts
Partial upsert lets you choose to update only specific columns and ignore the rest.
To enable the partial upsert, set the mode
to PARTIAL
and specify partialUpsertStrategies
for partial upsert columns. Since release-0.10.0
, OVERWRITE
is used as the default strategy for columns without a specified strategy. defaultPartialUpsertStrategy
is also introduced to change the default strategy for all columns.
For example:
Pinot supports the following partial upsert strategies:
OVERWRITE
Overwrite the column of the last record
INCREMENT
Add the new value to the existing values
APPEND
Add the new item to the Pinot unordered set
UNION
Add the new item to the Pinot unordered set if not exists
IGNORE
Ignore the new value, keep the existing value (v0.10.0+)
MAX
Keep the maximum value betwen the existing value and new value (v0.12.0+)
MIN
Keep the minimum value betwen the existing value and new value (v0.12.0+)
None upserts
If set mode to NONE
, the upsert is disabled.
By default, Pinot uses the value in the time column (timeColumn
in tableConfig) to determine the latest record. That means, for two records with the same primary key, the record with the larger value of the time column is picked as the latest update. However, there are cases when users need to use another column to determine the order. In such case, you can use option comparisonColumn
to override the column used for comparison. For example,
For partial upsert table, the out-of-order events won't be consumed and indexed. For example, for two records with the same primary key, if the record with the smaller value of the comparison column came later than the other record, it will be skipped.
In some cases, especially where partial upsert might be employed, there may be multiple producers of data each writing to a mutually exclusive set of columns, sharing only the primary key. In such a case, it may be helpful to use one comparison column per producer group so that each group can manage its own specific versioning semantics without the need to coordinate versioning across other producer groups.
Documents written to Pinot are expected to have exactly 1 non-null value out of the set of comparisonColumns; if more than 1 of the columns contains a value, the document will be rejected. When new documents are written, whichever comparison column is non-null will be compared against only that same comparison column seen in prior documents with the same primary key. Consider the following examples, where the documents are assumed to arrive in the order specified in the array.
The following would occur:
orderReceived: 1
Result: persisted
Reason: first doc seen for primary key "aa"
orderReceived: 2
Result: persisted (replacing orderReceived: 1
)
Reason: comparison column (secondsSinceEpoch
) larger than that previously seen
orderReceived: 3
Result: rejected
Reason: comparison column (secondsSinceEpoch
) smaller than that previously seen
orderReceived: 4
Result: persisted (replacing orderReceived: 2
)
Reason: comparison column (otherComparisonColumn
) larger than previously seen (never seen previously), despite the value being smaller than that seen for secondsSinceEpoch
orderReceived: 5
Result: rejected
Reason: comparison column (otherComparisonColumn
) smaller than that previously seen
orderReceived: 6
Result: persist (replacing orderReceived: 4
)
Reason: comparison column (otherComparisonColumn
) larger than that previously seen
In Pinot, the metadata map is stored in heap memory. To decrease in-memory data and improve performance, minimize the time primary key entries are stored in the metadata map (metadata time-to-live (TTL)). Limiting the TTL is especially useful for primary keys with high cardinality and frequent updates.
Since the metadata TTL is applied on the first comparison column, the time unit of upsert TTL is the same as the first comparison column.
To configure how long primary keys are stored in metadata, specify the length of time in upsertTTL.
For example:{
In this example, Pinot will retain primary keys in metadata for 1 day.
Note that enabling upsert snapshot is required for metadata TTL for in-memory validDocsIDs recovery.
Upsert Pinot table can support soft-deletes of primary keys. This requires the incoming record to contain a dedicated boolean single-field column that serves as a delete marker for a primary key. Once the real-time engine encounters a record with delete column set to true
, the primary key will no longer be part of the queryable set of documents. This means the primary key will not be visible in the queries, unless explicitly requested via query option skipUpsert=true
.
Note that the delete
column has to be a single-value boolean column.
A deleted primary key can be revived by ingesting a record with the same primary, but with higher comparison column value(s).
Note that when reviving a primary key in a partial upsert table, the revived record will be treated as the source of truth for all columns. This means any previous updates to the columns will be ignored and overwritten with the new record's values.
The above config deleteRecordColumn
only soft-deletes the primary key. To decrease in-memory data and improve performance, minimize the time deleted-primary-key entries are stored in the metadata map (deletedKeys time-to-live (TTL)). Limiting the TTL is especially useful for deleted-primary-keys where there are no future updates foreseen.
To configure how long primary keys are stored in metadata, specify the length of time in deletedKeysTTL
For example:
In this example, Pinot will retain the deleted-primary-keys in metadata for 1 day.
When using deletedKeysTTL
together with UpsertCompactionTask
, there can be a scenario where a segment containing deleted-record (where deleteRecordColumn
= true was set for the primary key) gets compacted first and a previous old record is not yet compacted. During server restart, now the old record is added to the metadata manager map and is treated as non-deleted. To prevent data inconsistencies in this scenario, we have added a new config enableDeletedKeysCompactionConsistency
which when set to true, will ensure that the deleted records are not compacted until all the previous records from all other segments are compacted for the deleted primary-key.
Upserts in Pinot enable real-time updates and ensure that queries always retrieve the latest version of a record, making them a powerful feature for managing mutable data efficiently. However, in applications with extremely high QPS and high ingestion rates, queries and upserts happening concurrently can sometimes lead to inconsistencies in query results.
For example, consider a table with 1 million primary keys. A distinct count query should always return 1 million, regardless of how new records are ingested and older records are invalidated. However, at high ingestion and query rates, the query may occasionally return a count slightly above or below 1 million. This happens because queries determine valid records by acquiring validDocIds bitmaps from multiple segments, which indicate which documents are currently valid. Since acquiring these bitmaps is not atomic with respect to ongoing upserts, a query may capture an inconsistent view of the data, leading to overcounting or undercounting of valid records.
This is a classic concurrency issue where reads and writes happen simultaneously, leading to temporary inconsistencies. Typically, such issues are resolved using locks or snapshots to maintain a stable view of the data during query execution. To address this, two new consistency modes - SYNC and SNAPSHOT - have been introduced for upsert enabled tables to ensure consistent query results even when queries and upserts occur concurrently and at very high throughput.
By default, the consistency mode is NONE, meaning the system operates as before. The SYNC mode ensures consistency by blocking upserts while queries execute, guaranteeing that queries always see a stable upserted data view. However, this can introduce write latency. Alternatively, the SNAPSHOT mode creates a consistent snapshot of validDocIds bitmaps for queries to use. This allows upserts to continue without blocking queries, making it more suitable for workloads with both high query and write rates. These new consistency modes provide flexibility, allowing applications to balance consistency guarantees against performance trade-offs based on their specific requirements.
Using implicit partitioned replica-group assignment from low-level consumer won't persist the instance assignment (mapping from partition to servers) to the ZooKeeper, and new added servers will be automatically included without explicit reassigning instances (usually through rebalance). This can cause new segments of the same partition assigned to a different server and break the requirement of upsert.
Upsert snapshot support is also added in release-0.12.0
. To enable the snapshot, set the enableSnapshot
to true
. For example:
Upsert maintains metadata in memory containing which docIds are valid in a particular segment (ValidDocIndexes). This metadata gets lost during server restarts and needs to be recreated again. ValidDocIndexes can not be recovered easily after out-of-TTL primary keys get removed. Enabling snapshots addresses this problem by adding functions to store and recover validDocIds snapshot for Immutable Segments
The snapshots are taken on every segment commit to ensure that they are consistent with the persisted data in case of abrupt shutdown. We recommend that you enable this feature so as to speed up server boot times during restarts.
Upsert preload feature can make it faster to restore the upsert states when server restarts. To enable the preload feature, set the enablePreload
to true
. To enable preloading, enableSnapshot: true
should also be set in the table config. For example:
Under the hood, it uses the validDocIds snapshots to identify the valid docs and restore their upsert metadata quickly instead of performing a whole upsert comparison flow. The flow is triggered before the server is marked as ready, after which the server starts to load the remaining segments without snapshots (hence the name preload).
The feature also requires you to specify pinot.server.instance.max.segment.preload.threads: N
in the server config where N should be replaced with the number of threads that should be used for preload. It's 0 by default to disable the preloading feature.
A bug was introduced in v1.2.0 that when enablePreload and enableSnapshot flags are set to true but max.segment.preload.threads is left as 0, the preloading mechanism is still enabled but segments fail to get loaded as there is no threads for preloading. This was fixed in newer versions, but for v1.2.0, if enablePreload and enableSnapshot are set to true, remember to set max.segment.preload.threads to a positive value as well. Server restart is needed to get max.segment.preload.threads config change into effect.
There are 2 configs added related to handling out-of-order events.
To enable dropping of out-of-order record, set the dropOutOfOrderRecord
to true
. For example:
This feature doesn't persist any out-of-order event to the consuming segment. If not specified, the default value is false
.
When false
, the out-of-order record gets persisted to the consuming segment, but the MetadataManager mapping is not updated thus this record is not referenced in query or in any future updates. You can still see the records when using skipUpsert
query option.
When true
, the out-of-order record doesn't get persisted at all and the MetadataManager mapping is not updated so this record is not referenced in query or in any future updates. You cannot see the records when using skipUpsert
query option.
This is to identify out-of-order events programmatically. To enable this config, add a boolean field in your table schema, say isOutOfOrder
and enable via this config. For example:
This feature persists a true
/ false
value to the isOutOfOrder
field based on the orderness of the event. You can filter out out-of-order events while using skipUpsert
to avoid any confusion. For example:
Pinot supports custom PartitionUpsertMetadataManager that handle records and segments updates.
You can add custom PartitionUpsertMetadataManager as follows:
Create a new java project. Make sure you keep the package name as org.apache.pinot.segment.local.upsert.xxx
In your java project include the dependency
Add your custom partition manager that implements PartitionUpsertMetadataManager interface
Add your custom TableUpsertMetadataManager that implements BaseTableUpsertMetadataManager interface
Place the compiled JAR in the /plugins
directory in pinot. You will need to restart all Pinot instances if they are already running.
Now, you can use the custom upsert manager in table configs as follows:
There are some limitations for the upsert Pinot tables.
The upsert feature is supported for Real-time tables only, and not for Hybrid or Offline tables.
The high-level consumer is not allowed for the input stream ingestion, which means stream.[consumerName].consumer.type
must always be lowLevel
.
The star-tree index cannot be used for indexing, as the star-tree index performs pre-aggregation during the ingestion.
Unlike append-only tables, out-of-order events (with comparison value in incoming record less than the latest available value) won't be consumed and indexed by Pinot partial upsert table, these late events will be skipped.
Unlike other real-time tables, Upsert table takes up more memory resources as it needs to bookkeep the record locations in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Upsert table.
The number of partitions in input streams determines the partition numbers of the Pinot table. The more partitions you have in input topic/stream, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally. Do note that you can't increase the partitions in future for upsert enabled tables so you need to start with good enough partitions (atleast 2-3X the number of pinot servers)
Upsert table maintains an in-memory map from the primary key to the record location. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. Beware when using JSON
column as primary key, same key-values in different order would be considered as different primary keys. In addition, consider the hashFunction
config in the Upsert config, which can be MD5
or MURMUR3
, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.
Set up a dashboard over the metric pinot.server.upsertPrimaryKeysCount.tableName
to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth. **** The total memory usage by upsert is roughly (primaryKeysCount * (sizeOfKeyInBytes + 24))
It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the rate of the primary keys in the input stream per partition and extrapolate the data to a specific time period (based on table retention) to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an upsert table instance.
Putting these together, you can find the table configurations of the quick start examples as the following:
To illustrate how the full upsert works, the Pinot binary comes with a quick start example. Use the following command to creates a real-time upsert table meetupRSVP
.
You can also run partial upsert demo with the following command
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to check out the real-time data.
For partial upsert you can see only the value from configured column changed based on specified partial upsert strategy.
An example for partial upsert is shown below, each of the event_id kept being unique during ingestion, meanwhile the value of rsvp_count incremented.
To see the difference from the non-upsert table, you can use a query option skipUpsert
to skip the upsert effect in the query result.
Can I change primary key columns in existing upsert table?
Yes, you can add or delete columns to primary keys as long as input stream is partitioned on one of the primary key columns. However, you need to restart all Pinot servers so that it can rebuild the primary key to record location map with the new columns.
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
Let's start by downloading Kafka to our local machine.
Next we'll spin up a Kafka broker:
We're going to generate some JSON messages from the terminal using the following script:
datagen.py
If you run this script (python datagen.py
), you'll see the following output:
Let's now pipe that stream of messages into Kafka, by running the following command:
We can check how many messages have been ingested by running the following command:
Output
And we can print out the messages themselves by running the following command
Output
A schema defines what fields are present in the table along with their data types in JSON format.
Create a file called /tmp/pinot/schema-stream.json
and add the following content to it.
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.
Create a file called /tmp/pinot/table-config-stream.json
and add the following content to it.
Create the table and schema by running the appropriate command below:
Pinot supports two versions of the Kafka library: kafka-0.9
and kafka-2.x
for low level consumers.
Update table config for low level consumer: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
Here is an example config which uses SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, ones starting with ssl.
are for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
The connector with Kafka library 2.0+ supports Kafka transactions. The transaction support is controlled by config kafka.isolation.level
in Kafka stream config, which can be read_committed
or read_uncommitted
(default). Setting it to read_committed
will ingest transactionally committed messages in Kafka stream only.
For example,
Note that the default value of this config read_uncommitted
to read all messages. Also, this config supports low-level consumer only.
Here is an example config which uses SASL_SSL based authentication to talk with kafka and schema-registry. Notice there are two sets of SSL options, some for kafka consumer and ones with stream.kafka.decoder.prop.schema.registry.
are for SchemaRegistryClient
used by KafkaConfluentSchemaRegistryAvroMessageDecoder
.
Pinot's Kafka connector supports automatically extracting record headers and metadata into the Pinot table columns. The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Kafka table, you can set the stream config metadata.populate
to true
.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
To avoid errors like The Avro schema must be provided
, designate the location of the schema in your streamConfigs
section. For example, if your current section contains the following:
Then add this key: "stream.kafka.decoder.prop.schema"
followed by a value that denotes the location of your schema.
This page has a collection of frequently asked questions about Pinot on Kubernetes with answers from the community.
The following is an example using Amazon Elastic Kubernetes Service (Amazon EKS).
In the Kubernetes (k8s) cluster, check the storage class: in Amazon EKS, it should be gp2
.
Then update StorageClass to ensure:
Once StorageClass is updated, it should look like this:
Once the storage class is updated, then we can update the PersistentVolumeClaim (PVC) for the server disk size.
Now we want to double the disk size for pinot-server-3
.
The following is an example of current disks:
The following is the output of data-pinot-server-3
:
Now, let's change the PVC size to 2T
by editing the server PVC.
Once updated, the specification's PVC size is updated to 2T
, but the status's PVC size is still 1T
.
Restart the pinot-server-3
pod:
Recheck the PVC size:
This page lists options for importing data into Apache Pinot™ with links to detailed instructions with examples.
Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a ready-made dataset so you can see how things work without needing to find or create your own dataset.
These guides show you how to import data from popular big data platforms.
This guide shows you how to import data using stream ingestion from Apache Kafka topics.
This guide shows you how to import data using stream ingestion with upsert.
This guide shows you how to import data using stream ingestion with deduplication.
This guide shows you how to import data using stream ingestion with CLP.
These guides show you how to import data and persist it in these file systems.
This guide shows you how to import data from various Pinot-supported input formats.
This guide shows you how to handle the complex type in the ingested data, such as map and array.
This guide shows additional examples on how to work with complex types.
This guide shows you how to handle records with dynamic schemas, like JSON log events.
This guide shows you how to reload Pinot segments from your deep store.
This guide shows you how to upload Pinot segments from an old, closed Pinot instance.
This section contains a collection of short guides to show you how to import data from a Pinot-supported file system.
FileSystem is an abstraction provided by Pinot to access data stored in distributed file systems (DFS).
Pinot uses distributed file systems for the following purposes:
Batch ingestion job: To read the input data (CSV, Avro, Thrift, etc.) and to write generated segments to DFS.
Controller: When a segment is uploaded to the controller, the controller saves it in the configured DFS.
Server:- When a server(s) is notified of a new segment, the server copies the segment from remote DFS to their local node using the DFS abstraction.
Pinot lets you choose a distributed file system provider. The following file systems are supported by Pinot:
To use a distributed file system, you need to enable plugins. To do that, specify the plugin directory and include the required plugins:
You can change the file system in the controller
and server
configuration. In the following configuration example, the URI is s3://bucket/path/to/file
and scheme
refers to the file system URI prefix s3
.
You can also change the file system during ingestion. In the ingestion job spec, specify the file system with the following configuration:
This page has a collection of frequently asked questions about operations with answers from the community.
Typically, Apache Pinot components try to use as much off-heap (MMAP/DirectMemory) wherever possible. For example, Pinot servers load segments in memory-mapped files in MMAP mode (recommended), or direct memory in HEAP mode. Heap memory is used mostly for query execution and storing some metadata. We have seen production deployments with high throughput and low-latency work well with just 16 GB of heap for Pinot servers and brokers. The Pinot controller may also cache some metadata (table configurations etc) in heap, so if there are just a few tables in the Pinot cluster, a few GB of heap should suffice.
Pinot relies on deep-storage for storing a backup copy of segments (offline as well as real-time). It relies on Zookeeper to store metadata (table configurations, schema, cluster state, and so on). It does not explicitly provide tools to take backups or restore these data, but relies on the deep-storage (ADLS/S3/GCP/etc), and ZK to persist these data/metadata.
Changing a column name or data type is considered backward incompatible change. While Pinot does support schema evolution for backward compatible changes, it does not support backward incompatible changes like changing name/data-type of a column.
Note that if you are using replica groups, it's expected these configurations equal numReplicaGroups
. If they do not match, Pinot will use numReplicaGroups.
retentionTimeUnit
retentionTimeValue
Updating the retention value in the table config should be good enough, there is no need to rebalance the table or reload its segments.
Likely explanation: num partitions * num replicas < num servers.
In real-time tables, segments of the same partition always remain on the same node. This sticky assignment is needed for replica groups and is critical if using upserts. For instance, if you have 3 partitions, 1 replica, and 4 nodes, only ¾ nodes will be used, and all of p0 segments will be on 1 node, p1 on 1 node, and p2 on 1 node. One server will be unused, and will remain unused through rebalances.
There’s nothing we can do about CONSUMING segments, they will continue to use only 3 nodes if you have 3 partitions. But we can rebalance such that completed segments use all nodes. If you want to force the completed segments of the table to use the new server use this config:
The number of segments generated depends on the number of input files. If you provide only 1 input file, you will get 1 segment. If you break up the input file into multiple files, you will get as many segments as the input files.
This typically happens when the server is unable to load the segment. Possible causes: out-of-memory, no disk space, unable to download segment from deep-store, and similar other errors. Check server logs for more information.
Use the segment reset controller REST API to reset the segment:
Reset: Gets a segment in ERROR
state back to ONLINE
or CONSUMING
state. Behind the scenes, the Pinot controller takes the segment to the OFFLINE
state, waits for External View
to stabilize, and then moves it back to ONLINE
or CONSUMING
state, thus effectively resetting segments or consumers in error states.
Refresh: Replaces the segment with a new one, with the same name but often different data. Under the hood, the Pinot controller sets new segment metadata in Zookeeper, and notifies brokers and servers to check their local states about this segment and update accordingly. Servers also download the new segment to replace the old one, when both have different checksums. There is no separate rest API for refreshing, and it is done as part of the SegmentUpload API
.
Reload: Loads the segment again, often to generate a new index as updated in the table configuration. Underlying, the Pinot server gets the new table configuration from Zookeeper, and uses it to guide the segment reloading. In fact, the last step of REFRESH
as explained above is to load the segment into memory to serve queries. There is a dedicated rest API for reloading. By default, it doesn't download segments, but the option is provided to force the server to download the segment to replace the local one cleanly.
In addition, RESET
brings the segment OFFLINE
temporarily; while REFRESH
and RELOAD
swap the segment on server atomically without bringing down the segment or affecting ongoing queries.
Set this property in your controller.conf file:
Now your brokers and servers should join the cluster as broker_untagged
and server_untagged
. You can then directly use the POST /tenants
API to create the tenants you want, as in the following:
There are two task configurations, but they are set as part of cluster configurations, like in the following example. One controls the task's overall timeout (1hr by default) and one sets how many tasks to run on a single minion worker (1 by default). The <taskType> is the task to tune, such as MergeRollupTask
or RealtimeToOfflineSegmentsTask
etc.
Yes, replica groups work for real-time. There's 2 parts to enabling replica groups:
Replica groups segment assignment.
Replica group query routing.
Replica group segment assignment
Replica group segment assignment is achieved in real-time, if number of servers is a multiple of number of replicas. The partitions get uniformly sprayed across the servers, creating replica groups. For example, consider we have 6 partitions, 2 replicas, and 4 servers.
Replica group query routing
Wait for the pause status to change to success.
Update the credential in the table config.
Resume the consumption.
Pinot supports as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
You can follow the to build Pinot from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
For more details on this MR job, refer to this .
Pinot distribution contains an Apache Flink that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.
Here is an example code snippet to show how to utilize the in a Flink streaming application:
As in the example shown above, the only required information from the Pinot side is the table and the table .
For a more detailed executable, refer to the .
An important requirement for the Pinot upsert table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the API. If the original stream is not partitioned, then a streaming processing job (such as with Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.
The upsert Pinot table can use only the low-level consumer for the input streams. As a result, it uses the implicitly for the segments. Moreover, upsert poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires to use strictReplicaGroup
as the routing strategy. To use that, configure instanceSelectorType
in Routing
as the following:
To prevent this, we recommend using explicit to ensure the instance assignment is persisted. Note that numInstancesPerPartition
should always be 1
in replicaGroupPartitionConfig
.
The upsert manager class name is case-insensitive as well.
Learn how to ingest data from Kafka, a stream processing platform. You should have a local cluster up and running, following the instructions in .
To pull down the latest Docker image, run the following command:
Download Kafka from and then extract it:
Note: The --network pinot-demo flag is optional and assumes that you have a Docker network named pinot-demo that you want to connect the Kafka container to.
On one terminal window run this command:
Start Zookeeper
And on another window, run this command:
Start Kafka Broker
Navigate to and click on the events
table to run a query that shows the first 10 rows in this table.
Querying the events table
This connector is also suitable for Kafka lib version higher than 2.0.0
. In , change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Remember to follow the when updating schema of an existing table!
There is a standalone utility to generate the schema from an Avro file. See [infer the pinot schema from the avro schema and JSON data]() for details.
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
There are multiple options for importing data into Apache Pinot™. The pages in this section provide step-by-step instructions for importing records into Pinot, supported by our . The intent is to get you up and running with imported data as quickly as possible.
By default, Pinot does not come with a storage layer, so all the data sent won't be stored in case of system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add a deep storage. See for all the info and related configs.
This is a list of questions frequently asked in our troubleshooting channel on Slack. To contribute additional questions and answers, .
You can change the number of replicas by updating the table configuration's section. Make sure you have at least as many servers as the replication.
For offline tables, update :
For real-time tables, update :
After changing the replication, run a .
By default there is no retention set for a table in Apache Pinot. You may however, set retention by setting the following properties in the section inside table configs:
See .
Refer to .
See .
As you can see, the set (S0, S2) contains r1 of every partition, and (s1, S3) contains r2 of every partition. The query will only be routed to one of the sets, and not span every server. If you are are adding/removing servers from an existing table setup, you have to run for segment assignment changes to take effect.
Once replica group segment assignment is in effect, the query routing can take advantage of it. For replica group based query routing, set the following in the table config's section, and then restart brokers
When using , user may want to have different encoding and indexing types for a column in different tiers to balance query latency and cost saving more flexibly. For example, segments in the hot tier can use dict-encoding, bloom filter and all kinds of relevant index types for very fast query execution. But for segments in the cold tier, where cost saving matters more than low query latency, one may want to use raw values and bloom filters only.
The following two examples show how to overwrite encoding type and index configs for tiers. Similar changes are also demonstrated in the .
Overwriting single-column index configs using fieldConfigList
. All top level fields in can be overwritten, and fields not overwritten are kept intact.
Overwriting star-tree index configurations using tableIndexConfig
. The StarTreeIndexConfigs
is overwritten as a whole. In fact, all top level fields defined in can be overwritten, so single-column index configs defined in tableIndexConfig
can also be overwritten but it's less clear than using fieldConfigList
.
.
Record key: any type <K>
__key
: String
For simplicity of design, we assume that the record key is always a UTF-8 encoded String
Record Headers: Map<String, String>
Each header key is listed as a separate column:
__header$HeaderKeyName
: String
For simplicity of design, we directly map the string headers from kafka record to pinot table column
Record metadata - offset : long
__metadata$offset
: String
Record metadata - partition : int
__metadata$partition
: String
Record metadata - recordTimestamp : long
__metadata$recordTimestamp
: String
p1
S0
S1
p2
S2
S3
p3
S0
S1
p4
S2
S3
p5
S0
S1
p6
S2
S3
Batch ingestion of data into Apache Pinot.
With batch ingestion you create a table using data already present in a file system such as S3. This is particularly useful when you want to use Pinot to query across large data with minimal latency or to test out new features using a simple data file.
To ingest data from a filesystem, perform the following steps, which are described in more detail in this page:
Create schema configuration
Create table configuration
Upload schema and table configs
Upload data
Batch ingestion currently supports the following mechanisms to upload the data:
Standalone
Here's an example using standalone local processing.
First, create a table using the following CSV data.
In our data, the only column on which aggregations can be performed is score
. Secondly, timestampInEpoch
is the only timestamp column. So, on our schema, we keep score
as metric and timestampInEpoch
as timestamp column.
Here, we have also defined two extra fields: format and granularity. The format specifies the formatting of our timestamp column in the data source. Currently, it's in milliseconds, so we've specified 1:MILLISECONDS:EPOCH
.
We define a table transcript
and map the schema created in the previous step to the table. For batch data, we keep the tableType
as OFFLINE
.
Now that we have both the configs, upload them and create a table by running the following command:
Check out the table config and schema in the \[Rest API]
to make sure it was successfully uploaded.
We now have an empty table in Pinot. Next, upload the CSV file to this empty table.
A table is composed of multiple segments. The segments can be created in the following three ways:
Minion based ingestion\
Upload API\
Ingestion jobs
There are 2 controller APIs that can be used for a quick ingestion test using a small file.
When these APIs are invoked, the controller has to download the file and build the segment locally.
Hence, these APIs are NOT meant for production environments and for large input files.
This API creates a segment using the given file and pushes it to Pinot. All steps happen on the controller.
Example usage:
To upload a JSON file data.json
to a table called foo_OFFLINE
, use below command
Note that query params need to be URLEncoded. For example, {"inputFormat":"json"} in the command below needs to be converted to %7B%22inputFormat%22%3A%22json%22%7D.
The batchConfigMapStr
can be used to pass in additional properties needed for decoding the file. For example, in case of csv, you may need to provide the delimiter
This API creates a segment using file at the given URI and pushes it to Pinot. Properties to access the FS need to be provided in the batchConfigMap. All steps happen on the controller. Example usage:
Segments can be created and uploaded using tasks known as DataIngestionJobs
. A job also needs a config of its own. We call this config the JobSpec
.
For our CSV file and table, the JobSpec
should look like this:
Now that we have the job spec for our table transcript
, we can trigger the job using the following command:
Once the job successfully finishes, head over to the \[query console]
and start playing with the data.
There are 3 ways to upload a Pinot segment:
Segment tar push
Segment URI push
Segment metadata push
This is the original and default push mechanism.
Tar push requires the segment to be stored locally or can be opened as an InputStream on PinotFS. So we can stream the entire segment tar file to the controller.
The push job will:
Upload the entire segment tar file to the Pinot controller.
Pinot controller will:
Save the segment into the controller segment directory(Local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism requires the segment tar file stored on a deep store with a globally accessible segment tar URI.
URI push is light-weight on the client-side, and the controller side requires equivalent work as the tar push.
The push job will:
POST this segment tar URI to the Pinot controller.
Pinot controller will:
Download segment from the URI and save it to controller segment directory (local or any PinotFS).
Extract segment metadata.
Add the segment to the table.
This push mechanism also requires the segment tar file stored on a deep store with a globally accessible segment tar URI.
Metadata push is light-weight on the controller side, there is no deep store download involves from the controller side.
The push job will:
Download the segment based on URI.
Extract metadata.
Upload metadata to the Pinot Controller.
Pinot Controller will:
Add the segment to the table based on the metadata.
4. Segment Metadata Push with copyToDeepStore
This extends the original Segment Metadata Push for cases, where the segments are pushed to a location not used as deep store. The ingestion job can still do metadata push but ask Pinot Controller to copy the segments into deep store. Those use cases usually happen when the ingestion jobs don't have direct access to deep store but still want to use metadata push for its efficiency, thus using a staging location to keep the segments temporarily.
NOTE: the staging location and deep store have to use same storage scheme, like both on s3. This is because the copy is done via PinotFS.copyDir interface that assumes so; but also because this does copy at storage system side, so segments don't need to go through Pinot Controller at all.
To make this work, grant Pinot controllers access to the staging location. For example on AWS, this may require adding an access policy like this example for the controller EC2 instances:
Then use metadata push to add one extra config like this one:
Pinot supports atomic update on segment level, which means that when data consisting of multiple segments are pushed to a table, as segments are replaced one at a time, queries to the broker during this upload phase may produce inconsistent results due to interleaving of old and new data.
See Consistent Push and Rollback for how to enable this feature.
When Pinot segment files are created in external systems (Hadoop/spark/etc), there are several ways to push those data to the Pinot controller and server:
Push segment to other systems and implement your own segment fetcher to pull data from those systems.
Since pinot is written in Java, you can set the following basic Java configurations to tune the segment runner job -
Log4j2 file location with -Dlog4j2.configurationFile
Plugin directory location with -Dplugins.dir=/opt/pinot/plugins
JVM props, like -Xmx8g -Xms4G
If you are using the docker, you can set the following under JAVA_OPTS
variable.
You can set -D mapreduce.map.memory.mb=8192
to set the mapper memory size when submitting the Hadoop job.
You can add config spark.executor.memory
to tune the memory usage for segment creation when submitting the Spark job.
Batch ingestion of data into Apache Pinot using dimension tables.
Dimension tables are replicated on all the hosts for a given tenant to allow faster lookups. When a table is marked as a dimension table, it will be replicated on all the hosts, which means that these tables must be small in size.
Configure dimension tables using following properties in the table configuration:
isDimTable
: Set to true.
ingestionConfig.batchIngestionConfig.segmentIngestionType
: Set to REFRESH
.
dimensionTableConfig.disablePreload
: By default, dimension tables are preloaded to allow for fast lookups. Set to true
to trade off speed for memory by storing only the segment reference and docID. Otherwise, the whole row is stored in the Dimension table hash map.
controller.dimTable.maxSize
: Determines the maximum size quota for a dimension table in a cluster. Table creation will fail if the storage quota exceeds this maximum size.
If no indexes are applied to the columns in a Pinot segment, the query engine needs to scan through every document, checking whether that document meets the filter criteria provided in a query. This can be a slow process if there are a lot of documents to scan.
When indexes are applied, the query engine can more quickly work out which documents satisfy the filter criteria, reducing the time it takes to execute the query.
By default, Pinot creates a forward index for every column. The forward index generally stores documents in insertion order.
However, before flushing the segment, Pinot does a single pass over every column to see whether the data is sorted. If data is sorted, Pinot creates a sorted (forward) index for that column instead of the forward index.
For real-time tables you can also explicitly tell Pinot that one of the columns should be sorted. For more details, see the [Sorted Index Documentation](https://docs.pinot.apache.org/basics/indexing/forward-index#real-time-tables).
For filtering documents within a segment, Pinot supports the following indexing techniques:
Inverted index: Used for exact lookups.
Range index - Used for range queries.
Text index - Used for phrase, term, boolean, prefix, or regex queries.
Geospatial index - Based on H3, a hexagon-based hierarchical gridding. Used for finding points that exist within a certain distance from another point.
JSON index - Used for querying columns in JSON documents.
Star-Tree index - Pre-aggregates results across multiple columns.
Let's see how we can apply these indexing techniques to our data. To recap, the events
table has the following fields:
We might want to write queries that filter on the ts
and uuid
columns, so these are the columns on which we would want to configure indexes.
Since the data we're ingesting into the Kafka topic is all implicitly ordered by timestamp, this means that the ts
column already has a sorted index. This means that any queries that filter on this column are already optimised.
So that leaves us with the uuid
column.
We're going to add an inverted index to the uuid
column so that queries that filter on that column will return quicker. We need to add the following line:
To the tableIndexConfig
section.
Copy the following to the clipboard:
/tmp/pinot/table-config-stream.json
Once you've done that, you'll need to click Reload All Segments and then Yes to apply the indexing change to all segments.
The following query will return the indexes defined on the uuid
column:
Output
We can see from looking at the inverted-index
property that the index has been applied.
You can now run some queries that filter on the uuid
column, as shown below:
You'll need to change the actual uuid
value to a value that exists in your database, because the UUIDs are generated randomly by our script.
Refer to
For more detail, refer to .
Push segment to shared NFS and let pinot pull segment files from the location of that NFS. See .
Push segment to a Web server and let pinot pull segment files from the Web server with HTTP/HTTPS link. See .
Push segment to PinotFS(HDFS/S3/GCS/ADLS) and let pinot pull segment files from PinotFS URI. See and .
The first three options are supported out of the box within the Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files, it will pick up the file and allocate it to proper Pinot servers and brokers. To enable Pinot support for PinotFS, you'll need to provide configuration and proper Hadoop dependencies.
By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of a system crash. In order to persistently store the generated segments, you will need to change controller and server configs to add deep storage. Checkout for all the info and related configs.
Dimension tables are a special kind of offline tables from which data can be looked up via the , providing join-like functionality.
A dimension table cannot be part of a .
dimensionFieldSpecs
: To look up dimension values, dimension tables need a primary key. For details, see .
Learn how to apply indexes to a Pinot table. This guide assumes that you have followed the guide.
Pinot supports a series of different indexes that can be used to optimize query performance. In this guide, we'll learn how to add indexes to the events
table that we set up in the guide.
Navigate to , click on Edit Table, paste the next table config, and then click Save.
We can check that the index has been applied to all our segments by querying Pinot's REST API. You can find Swagger documentation at .
We're using the to extract the fields that we're interested in.
ts
uuid
count
Deduplication support in Apache Pinot.
Pinot provides native support for deduplication (dedup) during the real-time ingestion (v0.11.0+).
To enable dedup on a Pinot table, make the following table configuration and schema changes:
To be able to dedup records, a primary key is needed to uniquely identify a given record. To define a primary key, add the field primaryKeyColumns
to the schema definition.
Note this field expects a list of columns, as the primary key can be composite.
While ingesting a record, if its primary key is found to be already present, the record will be dropped.
instance assignment is persisted. Note that numInstancesPerPartition
should always be 1
in replicaGroupPartitionConfig
.
The high-level consumer is not allowed for the input stream ingestion, which means stream.kafka.consumer.type
must be lowLevel
.
The incoming stream must be partitioned by the primary key such that, all records with a given primaryKey must be consumed by the same Pinot server instance.
To enable dedup for a REALTIME table, add the following to the table config.
Supported values for hashFunction
are NONE
, MD5
and MURMUR3
, with the default being NONE
.
Server stores the existing primary keys in dedup metadata map kept on JVM heap. As the dedup metadata grows, the heap memory pressure increases, which may affect the performance of ingestion and queries. One can set a positive metadata TTL to enable the TTL mechanism to keep the metadata size bounded. By default, the table's time colum is used as the dedup time column. The time unit of TTL is the same as the dedup time column. The TTL should be set long enough so that new records can be deduplicated before the primary keys gets removed.
When ingesting new records, the server has to read the metadata map to check for duplicates. But when server restarts, the documents in existing segments are all unique as ensured by the dedup logic during real-time ingestion. So we can do write-only to bootstrap the metadata map faster.
Unlike other real-time tables, Dedup table takes up more memory resources as it needs to bookkeep the primary key and its corresponding segment reference, in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Dedup table.
Create the Kafka topic with more partitions. The number of Kafka partitions determines the partition numbers of the Pinot table. The more partitions you have in the Kafka topic, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally.
Dedup table maintains an in-memory map from the primary key to the segment reference. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction
config in the Dedup config, which can be MD5
or MURMUR3
, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.
Monitoring: Set up a dashboard over the metric pinot.server.dedupPrimaryKeysCount.tableName
to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth.
Capacity planning: It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the amount of the primary keys in the Kafka throughput per partition and time the primary key space cost to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an dedup table instance.
Batch ingestion of backfill data into Apache Pinot.
Pinot batch ingestion involves two parts: routine ingestion job(hourly/daily) and backfill. Here are some examples to show how routine batch ingestion works in Pinot offline table:
High-level description
Organize raw data into buckets (eg: /var/pinot/airlineStats/rawdata/2014/01/01). Each bucket typically contains several files (eg: /var/pinot/airlineStats/rawdata/2014/01/01/airlineStats_data_2014-01-01_0.avro)
Run a Pinot batch ingestion job, which points to a specific date folder like ‘/var/pinot/airlineStats/rawdata/2014/01/01’. The segment generation job will convert each such avro file into a Pinot segment for that day and give it a unique name.
Run Pinot segment push job to upload those segments with those uniques names via a Controller API
This newly uploaded data can now be queried in Pinot. However, sometimes users will make changes to the raw data which need to be reflected in Pinot. This process is known as 'Backfill'.
Pinot supports data modification only at the segment level, which means you must update entire segments for doing backfills. The high level idea is to repeat steps 2 (segment generation) and 3 (segment upload) mentioned above:
Backfill jobs must run at the same granularity as the daily job. E.g., if you need to backfill data for 2014/01/01, specify that input folder for your backfill job (e.g.: ‘/var/pinot/airlineStats/rawdata/2014/01/01’)
The backfill job will then generate segments with the same name as the original job (with the new data).
When uploading those segments to Pinot, the controller will replace the old segments with the new ones (segment names act like primary keys within Pinot) one by one.
Backfill jobs expect the same number of (or more) data files on the backfill date. So the segment generation job will create the same number of (or more) segments than the original run.
For example, assuming table airlineStats has 2 segments(airlineStats_2014-01-01_2014-01-01_0, airlineStats_2014-01-01_2014-01-01_1) on date 2014/01/01 and the backfill input directory contains only 1 input file. Then the segment generation job will create just one segment: airlineStats_2014-01-01_2014-01-01_0. After the segment push job, only segment airlineStats_2014-01-01_2014-01-01_0 got replaced and stale data in segment airlineStats_2014-01-01_2014-01-01_1 are still there.
If the raw data is modified in such a way that the original time bucket has fewer input files than the first ingestion run, backfill will fail.
Additional examples that demonstrate handling of complex types.
Additional examples that demonstrate handling of complex types.
The Pinot schema for this example would look as follows.
The Pinot table configuration for this schema would look as follows.
Post ingestion, the student records would appear as separate records in Pinot. Note that the nested field scores
is captured as a JSON field.
In this example, we would look at un-nesting the sibling collections "student" and "teacher".
In this example, we would look at un-nesting the nested collection "students.grades".
In this example, we would look at un-nesting the array "finalExam" which is located within the array "students".
In this example, the inner collection "grades" is converted into a multi value string column.
In this example, the array of primitives "extra_curricular" is converted to a Json string.
This guide shows you how to import data from files stored in Amazon S3.
You can configure the S3 file system using the following options:
Each of these properties should be prefixed by pinot.[node].storage.factory.s3.
where node
is either controller
or server
depending on the config
e.g.
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You can also specify the accessKey and secretKey using the properties. However, this method is not secure and should be used only for POC setups.
Support for encoding fields with CLP during ingestion.
This is an experimental feature. Configuration options and usage may change frequently until it is stabilized.
CLP is a compressor designed to encode unstructured log messages in a way that makes them more compressible while retaining the ability to search them. It does this by decomposing the message into three fields:
the message's static text, called a log type;
repetitive variable values, called dictionary variables; and
non-repetitive variable values (called encoded variables since we encode them specially if possible).
Searches are similarly decomposed into queries on the individual fields.
For example, consider this JSON record:
If the user specifies the fields message
and logPath
should be encoded with CLP, then the StreamMessageDecoder will output:
In the fields with the _logtype
suffix, \x11 is a placeholder for an integer variable, \x12 is a placeholder for a dictionary variable, and \x13 is a placeholder for a float variable. In message_encoedVars
, the float variable 0.335
is encoded as an integer using CLP's custom encoding.
All remaining fields are processed in the same way as they are in org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor
. Specifically, fields in the table's schema are extracted from each record and any remaining fields are dropped.
Assuming the user wants to encode message
and logPath
as in the example, they should change/add the following settings to their tableIndexConfig
(we omit irrelevant settings for brevity):
stream.kafka.decoder.prop.fieldsForClpEncoding
is a comma-separated list of names for fields that should be encoded with CLP.
For the table's schema, users should configure the CLP-encoded fields as follows (we omit irrelevant settings for brevity):
We use the maximum possible length for the logtype and dictionary variable columns.
The dictionary and encoded variable columns are multi-valued columns.
To search CLP-encoded fields, you can combine CLPDECODE
with LIKE
. Note, this may decrease performance when querying a large number of rows.
The forward index is the mechanism Pinot employs to store the values of each column. At a conceptual level, the forward index can be thought of as a mapping from document IDs (also known as row indices) to the actual column values of each row.
How forward indexes are implemented depends on the index encoding and whether the column is sorted.
In the case of DICTIONARY
encoding, the forward index doesn't store the actual row values but instead stores dictionary IDs. This introduces an additional level of indirection when reading values, but it allows for more efficient physical layouts when unique number of values in the column is significantly smaller than the number of rows.
When working out whether a column should use dictionary encoded or raw value encoding, the following comparison table may help:
In this approach, each unique value in a column is assigned an ID, and a dictionary is constructed to map these IDs back to their corresponding values. Instead of storing the actual values, the default forward index stores these bit-compressed IDs. This method is particularly effective when dealing with columns containing few unique values, as it significantly improves space efficiency.
The below diagram shows the dictionary encoding for two columns with integer
and string
types. ForcolA
, dictionary encoding saved a significant amount of space for duplicated values.
The diagram below illustrates dictionary encoding for two columns with different data types (integer and string). For colA
, dictionary encoding leads to significant space savings due to duplicated values. However, for colB
, which contains mostly unique values, the compression effect is limited, and padding overhead may be high.
When using the dictionary-encoded forward index for multi-value column, to further compress the forward index for repeated multi-value entires, enable the MV_ENTRY_DICT
compression type which adds another level of dictionary encoding on the multi-value entries. This may be useful, for example, in cases where you pre-join a fact table with dimension table, where the multi-value entries in the dimension table are repeated after joining with the fact table.
It can be enabled with parameter:
When a column is physically sorted, Pinot employs a sorted forward index with run-length encoding, which builds upon dictionary encoding. Instead of storing dictionary IDs for each document ID, this approach stores pairs of start and end document IDs for each unique value.
(For simplicity, this diagram does not include the dictionary encoding layer.)
When dealing with multiple segments, it's crucial to ensure that data is sorted within each segment. Sorting across segments is not necessary.
To guarantee that a segment is sorted by a particular column, follow these steps:
For real-time tables, use the tableIndexConfig.sortedColumn
property. If there is exactly one column specified in that array, Pinot will sort the segment by that column upon committing.
For offline tables, you must pre-sort the data by the specified column before ingesting it into Pinot.
It's crucial to note that for offline tables, the tableIndexConfig.sortedColumn
property is indeed ignored.
Additionally, for online tables, even though this property is specified as a JSON array, at most one column should be included. Using an array with more than one column is incorrect and will not result in segments being sorted by all the columns listed in the array.
When a real-time segment is committed, rows will be sorted by the sorting column and it will be transformed into an offline segment.
During the creation of an offline segment, which also applies when a real-time segment is committed, Pinot scans the data in each column. If it detects that all values within a column are sorted in ascending order, Pinot concludes that the segment is sorted based on that particular column. In case this happens on more than one column, all of them are considered as sorting columns. Consequently, whether a segment is sorted by a column or not solely depends on the actual data distribution within the segment and entirely disregards the value of the sortedColumn
property. This approach also implies that two segments belonging to the same table may have a different number of sorting columns. In the extreme scenario where a segment contains only one row, Pinot will consider all columns within that segment as sorting columns.
Here is an example of a table configuration that illustrates these concepts:
You can check the sorted status of a column in a segment by running the following:
The raw value forward index stores actual values instead of IDs. This means that it eliminates the need for dictionary lookups when fetching values, which can result in improved query performance. Raw forward index is particularly effective for columns with a large number of unique values, where dictionary encoding doesn't provide significant compression benefits.
As shown in the diagram below, dictionary encoding can lead to numerous random memory accesses for dictionary lookups. In contrast, the raw value forward index allows for sequential value scanning, which can enhance query performance when applied appropriately.
Note: Raw value forward index currently does not support inverted index (all others JSON/TEXT/Range/etc are supported). Also, since reading a value from this index requires reading the entire chunk in memory and decompressing, it is not suitable for heavy random reads.
The raw format is used in two scenarios:
When using the raw format, you can configure the following parameters:
The compressionCodec
parameter has the following valid values:
PASS_THROUGH
SNAPPY
ZSTANDARD
LZ4
GZIP
(Introduced in release 1.2.0
)
null
(the JSON null value, not "null"
), which is the default. In this case, PASS_THROUGH
will be used for metrics and LZ4
for other columns.
deriveNumDocsPerChunk
is only used when the datatype may have a variable length, such as with string
, big decimal
, bytes
, etc. By default, Pinot uses a fixed number of elements that was chosen empirically. If changed to true, Pinot will use a heuristic value that depends on the column data.
rawIndexWriterVersion
changes the algorithm used to create the index. This changes the actual data layout, but modern versions of Pinot can read indexes written in older versions. The latest version right now is 4.
targetDocsPerChunk
changes the target number of docs to store in a chunk. For rawIndexWriterVersion
versions 2 and 3, this will store exactly targetDocsPerChunk
per chunk. For rawIndexWriterVersion
version 4, this config is used in conjunction with targetMaxChunkSize
and chunk size is determined with the formula min(lengthOfLongestDocumentInSegment * targetDocsPerChunk, targetMaxChunkSize)
. A negative value will disable dynamic chunk sizing and use the static targetMaxChunkSize
.
targetMaxChunkSize
changes the target max chunk size. For rawIndexWriterVersion
versions 2 and 3, this can only be used with deriveNumDocsPerChunk. For rawIndexWriterVersion
version 4, this sets the upper bound for a dynamically calculated chunk size. Documents larger than the targetMaxChunkSize
will be given their own 'huge' chunk, therefore, it is recommended to size this such that huge chunks are avoided.
The recommended way to configure the forward index using raw format is by including the parameters explained above in the indexes.forward
object. For example:
Deprecated
An alternative method to configure the raw format parameters is available. This older approach can still be used, although it is not recommended. Here are the details of this older method:
chunkCompressionType
: This parameter can be defined as a sibling of name
and encodingType
in the fieldConfigList
section.
deriveNumDocsPerChunk
: You can configure this parameter with the property deriveNumDocsPerChunkForRawIndex
. Note that in properties
, all values must be strings, so valid values for this property are "true"
and "false"
.
rawIndexWriterVersion
: This parameter can be configured using the property rawIndexWriterVersion
. Again, in properties
, all values must be strings, so valid values for this property are "2"
, "3"
, and so on.
For example:
While this older method is still supported, it is not the recommended way to configure these parameters. There are no plans to remove support for this older method, but keep in mind that any new parameters added in the future may only be configurable in the forward
JSON object.
Traditionally the forward index has been a mandatory index for all columns in the on-disk segment file format.
However, certain columns may only be used as a filter in the WHERE
clause for all queries. In such scenarios the forward index is not necessary as essentially other indexes and structures in the segments can provide the required SQL query functionality. Forward index just takes up extra storage space for such scenarios and can ideally be freed up.
Thus, to provide users an option to save storage space, a knob to disable the forward index is now available.
Forward index on one or more columns(s) in your Pinot table can be disabled with the following limitations:
Only supported for immutable (offline) segments.
MV columns with duplicates within a row will lose the duplicated entries on forward index regeneration. The ordering of data with an MV row may also change on regeneration. A backfill is required in such scenarios (to preserve duplicates or ordering).
If forward index regeneration support on reload (i.e. re-enabling the forward index for a forward index disabled column) is required then the dictionary and inverted index must be enabled on that particular column.
Sorted columns will allow the forward index to be disabled, but this operation will be treated as a no-op and the index (which acts as both a forward index and inverted index) will be created.
The older way to do so is still supported, but not recommended.
The forward index can also be regenerated for a column where it is disabled by enabling the index and reloading the segment. The forward index can only be regenerated if the dictionary and inverted index have been enabled for the column. If either have been disabled then the only way to get the forward index back is to regenerate the segments via the offline jobs and re-push / refresh the data.
Warning:
For multi-value (MV) columns the following invariants cannot be maintained after regenerating the forward index for a forward index disabled column:
Ordering guarantees of the MV values within a row
If entries within an MV row are duplicated, the duplicates will be lost. Regenerate the segments via your offline jobs and re-push / refresh the data to get back the original MV data with duplicates.
We will work on removing the second invariant in the future.
Examples of queries which will fail after disabling the forward index for an example column, columnA
, can be found below:
Forward index disabled columns cannot be present in the SELECT
clause even if filters are added on it.
Forward index disabled columns cannot be present in the GROUP BY
and ORDER BY
clauses. They also cannot be part of the HAVING
clause.
A subset of the aggregation functions do work when the forward index is disabled such as MIN
, MAX
, DISTINCTCOUNT
, DISTINCTCOUNTHLL
and more. Some of the other aggregation functions will not work such as the below:
Forward index disabled columns cannot be present in the SELECT DISTINCT
clause.
To run queries on single-value columns where the filter clause contains operators such as >
, <
, >=
, <=
a version 2 range index must be present. Without the range index such queries will fail as shown below:
The FST index supports regex queries on text. Decreases on-disk index by 4-6 times.
Only supports regex queries
Only supported on stored or completed Pinot segments (no consuming segments).
Only supported on dictionary-encoded columns.
Works better for prefix queries
To enable the FST index on a dictionary-encoded column, include the following configuration:
The FST index generates one FST index file (.lucene.fst)
. If the inverted index is enabled, this is further able to take advantage of that.
This page talks about geospatial support in Pinot.
Geospatial data types, such as point, line and polygon;
Geospatial functions, for querying of spatial properties and relationships.
Geospatial indexing, used for efficient processing of spatial operations
Geospatial data types abstract and encapsulate spatial structures such as boundary and dimension. In many respects, spatial data types can be understood simply as shapes. Pinot supports the Well-Known Text (WKT) and Well-Known Binary (WKB) forms of geospatial objects, for example:
POINT (0, 0)
LINESTRING (0 0, 1 1, 2 1, 2 2)
POLYGON (0 0, 10 0, 10 10, 0 10, 0 0),(1 1, 1 2, 2 2, 2 1, 1 1)
MULTIPOINT (0 0, 1 2)
MULTILINESTRING ((0 0, 1 1, 1 2), (2 3, 3 2, 5 4))
MULTIPOLYGON (((0 0, 4 0, 4 4, 0 4, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1)), ((-1 -1, -1 -2, -2 -2, -2 -1, -1 -1)))
GEOMETRYCOLLECTION(POINT(2 0),POLYGON((0 0, 1 0, 1 1, 0 1, 0 0)))
It is common to have data in which the coordinates are geographics
or latitude/longitude.
Unlike coordinates in Mercator or UTM, geographic coordinates are not Cartesian coordinates.
Geographic coordinates do not represent a linear distance from an origin as plotted on a plane. Rather, these spherical coordinates describe angular coordinates on a globe.
Spherical coordinates specify a point by the angle of rotation from a reference meridian (longitude), and the angle from the equator (latitude).
You can treat geographic coordinates as approximate Cartesian coordinates and continue to do spatial calculations. However, measurements of distance, length and area will be nonsensical. Since spherical coordinates measure angular distance, the units are in degrees.
For manipulating geospatial data, Pinot provides a set of functions for analyzing geometric components, determining spatial relationships, and manipulating geometries. In particular, geospatial functions that begin with the ST_
prefix support the SQL/MM specification.
Following geospatial functions are available out of the box in Pinot:
ST_Area(Geometry/Geography g) → double For geometry type, it returns the 2D Euclidean area of a geometry. For geography, returns the area of a polygon or multi-polygon in square meters using a spherical model for Earth.
ST_Equals(Geometry, Geometry) → boolean Returns true if the given geometries represent the same geometry/geography.
ST_Within(Geometry, Geometry) → boolean Returns true if first geometry is completely inside second geometry.
A given geospatial location (longitude, latitude) can map to one hexagon (represented as H3Index). And its neighbors in H3 can be approximated by a ring of hexagons. To quickly identify the distance between any given two geospatial locations, we can convert the two locations in the H3Index, and then check the H3 distance between them. H3 distance is measured as the number of hexagons.
Note the use of transformFunction
that converts the created point into SphericalGeography
format, which is needed by the ST_Distance
function.
Enable the H3 index.
It is recommended to do the latter by using the indexes
section:
Alternative the older way to configure H3 indexes is still supported:
The query below will use the geoindex to filter the Starbucks stores within 5km of the given point in the bay area.
The Pinot geoindex accelerates query evaluation while maintaining accuracy. Currently, geoindex supports the ST_Distance
function in the WHERE
clause.
At the high level, geoindex is used for retrieving the records within the nearby hexagons of the given location, and then use ST_Distance
to accurately filter the matched results.
As in the example diagram above, if we want to find all relevant points within a given distance around San Francisco (area within the red circle), then the algorithm with geoindex will:
First find the H3 distance x
that contains the range (for example, within a red circle).
Finally, for the points contained in the hexagons of kRing(x)
at the outer edge of the red circle H3 distance, the algorithm will filter them by evaluating the condition ST_Distance(loc1, loc2) < x
to find only those that are within the circle.
This page describes configuring the Bloom filter for Apache Pinot
When a column is configured to use this filter, Pinot creates one Bloom filter per segment. The Bloom filter help to prune segments that do not contain any record matching an EQUALITY or IN predicate.
This is useful for query patterns like below where Bloom Filter is defined on playerID column in the table:
A Bloom filter is a probabilistic data structure used to definitively determine if an element is not present in a dataset, but it cannot be employed to determine if an element is present in the dataset. This limitation arises because Bloom filters may produce false positives but never yield false negatives.
An intriguing aspect of these filters is the existence of a mathematical formula that establishes a relationship between their size, the cardinality of the dataset they index, and the rate of false positives.
In Pinot, this cardinality corresponds to the number of unique values expected within each segment. If necessary, the false positive rate and the index size can be configured.
There are 3 optional parameters to configure the Bloom filter:
The lower the fpp
(false positive probability), the greater the accuracy of the Bloom filter, but this reduction in fpp
will also lead to an increase in the index size. It's important to note that maxSizeInBytes
takes precedence over fpp
. If maxSizeInBytes
is set to a value greater than 0 and the calculated size of the Bloom filter, based on the specified fpp
, exceeds this size limit, Pinot will adjust the fpp
to ensure that the Bloom filter size remains within the specified limit.
Similar to other indexes, a Bloom filter can be explicitly deactivated by setting the special parameter disabled
to true.
For example the following table config enables the Bloom filter in the playerId column using the default values:
In case some parameter needs to be customized, they can be included in fieldConfigList.indexes.bloom
. Remember that even the example customizes all parameters, you can just modify the ones you need.
An important requirement for the Pinot dedup table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.
The dedup Pinot table can use only the low-level consumer for the input streams. As a result, it uses the for the segments. Moreover, dedup poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires strictReplicaGroup
as the routing strategy. To use that, configure instanceSelectorType
in Routing
as the following:
The feature also requires you to specify pinot.server.instance.max.segment.preload.threads: N
in the server config where N should be replaced with the number of threads that should be used for preload. It's 0 by default to disable the preloading feature. This preloading thread pool is shared with .
In this example, we would look at un-nesting json records that are batched together as part of a single key at the root level. We will make use of the configs to persist the individual student records as separate rows in Pinot.
Enable the file system backend by including the pinot-s3
plugin. In the controller or server configuration, add the config:
S3 Filesystem supports authentication using the . The credential provider looks for the credentials in the following order -
When performing stream ingestion of JSON records using , users can encode specific fields with by using a CLP-specific StreamMessageDecoder.
We use for the logtype and dictionary variables since their length can vary significantly.
To decode CLP-encoded fields, use .
We are working to integrate efficient searches on CLP-encoded columns as another UDF. The development of this feature is being tracked in this .
Forward indexes are enabled by default, meaning that columns will have a forward index unless explicitly disabled. Disabling the forward index can save storage space when other indexes sufficiently cover the required data patterns. For information on how to disable the forward index and its implications, refer to .
When the encoding is set to RAW
, the forward index is implemented as an array, where the indices correspond to document IDs and the values represent the actual row values. For more details, refer to the section.
The DICTIONARY
encoding can be even more efficient if the segment is sorted by the indexed column. You can learn more about the and the in their respective sections.
To know more about dictionary encoding, see .
Sorted forward indexes offer the benefits of efficient compression and data locality and can also serve as an inverted index. They are active when two conditions are met: the segment is sorted by the column, and the dictionary is enabled for that column. Refer to the for details on enabling the dictionary.
Alternatively, for offline tables and for committed segments in real-time tables, you can retrieve the sorted status from the getServerMetadata endpoint. The following example is based on the :
When the dictionary is disabled for a column, as specified in the .
When the encoding is set to RAW
in the .
If the column has a then the column must be of single-value type and use range index version 2.
To disable the forward index, in under fieldConfigList
, set the disabled
property to true
as shown below:
A table reload operation must be performed for the above config to take effect. Enabling / disabling other indexes on the column can be done via the usual options.
For more information on the FST construction and code, see .
For more information about enabling the FST index, see ways to .
Pinot supports SQL/MM geospatial data and is compliant with the . This includes:
Pinot supports both geometry and geography types, which can be constructed by the corresponding functions as shown in . And for the geography types, the measurement functions such as ST_Distance
and ST_Area
calculate the spherical distance and area on earth respectively.
This aggregate function returns a MULTI geometry or NON-MULTI geometry from a set of geometries. it ignores NULL geometries.
Returns a geometry type object from WKT representation, with the optional spatial system reference.
Returns a geometry type object from WKB representation.
Returns a geometry type point object with the given coordinate values.
Returns a geometry type polygon object from .
Creates a geography instance from a
Returns a specified geography value from .
For geometry type, returns the 2-dimensional cartesian minimum distance (based on spatial ref) between two geometries in projected units. For geography, returns the great-circle distance in meters between two SphericalGeography points. Note that g1, g2 shall have the same type.
Returns the type of the geometry as a string. e.g.: ST_Linestring
, ST_Polygon
,ST_MultiPolygon
etc.
Returns the WKB representation of the geometry.
Returns the WKT representation of the geometry/geography.
Converts a Geometry object to a spherical geography object.
Converts a spherical geographical object to a Geometry object.
Returns true if and only if no points of the second geometry/geography lie in the exterior of the first geometry/geography, and at least one point of the interior of the first geometry lies in the interior of the second geometry. Warning: ST_Contains on Geography only give close approximation
Geospatial functions are typically expensive to evaluate, and using geoindex can greatly accelerate the query evaluation. Geoindexing in Pinot is based on Uber’s , a hexagon-based hierarchical gridding.
For example, in the diagram below, the red hexagons are within the 1 distance of the central hexagon. The size of the hexagon is determined by the resolution of the indexing. Check this table for the level of and the corresponding precision (measured in km).
To use the geoindex, first declare the geolocation field as bytes in the schema, as in the example of the .
Next, declare the geospatial index in the you need to
Verify the dictionary is disabled (see how to ).
Then, for the points within the H3 distance (those covered by the hexagons completely within ), directly accept those points without filtering.
Bloom filters are deactivated by default, implying that columns will not be indexed unless they are explicitly configured within the .