Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Here you will find a collection of ready-made sample applications and examples for real-world data
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Cluster is a set of nodes comprising of servers, brokers, controllers and minions.
Pinot uses Apache Helix for cluster management. Helix is a cluster management framework that manages replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.
Helix divides nodes into logical components based on their responsibilities:
The nodes that host distributed, partitioned resources
Pinot Servers are modeled as Participants. For more details about server nodes, see Server.
The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
Pinot Brokers are modeled as Spectators. For more details about broker nodes, see Broker.
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.
Pinot Controllers are modeled as Controllers. For more details about controller nodes, see Controller.
Another way to visualize the cluster is a logical view, where:
Typically, there is only one cluster per environment/data center. There is no need to create multiple Pinot clusters since Pinot supports the concept of tenants. At LinkedIn, the largest Pinot cluster consists of 1000+ nodes.
To set up a cluster, see one of the following guides:
Learn about the various components of Pinot and terminologies used to describe data stored in Pinot
Pinot is designed to deliver low latency queries on large datasets. In order to achieve this performance, Pinot stores data in a columnar format and adds additional indices to perform fast filtering, aggregation and group by.
Pinot uses a variety of terms that can refer to either abstractions that model the storage of data or infrastructure components that drive the functionality of the system.
In contrast to RDBMS schemas, multiple tables in Pinot (real-time or batch) can inherit a single schema definition. Tables are independently configured for concerns such as indexing strategies, partitioning, tenants, data sources, and/or replication.
By default, all tables belong to a default tenant named "default". The concept of tenants is very important, as it satisfies the architectural principle of a "database per service/application" without having to operate many independent data stores. Further, tenants will schedule resources so that segments (shards) are able to restrict a table's data to reside only on a specified set of nodes. Similar to the kind of isolation that is ubiquitously used in Linux containers, compute resources in Pinot can be scheduled to prevent resource contention between tenants.
Auto-scaling is also achievable, however, a set amount of nodes is recommended to keep QPS consistent when query loads vary in sudden unpredictable end-user usage scenarios.
A Pinot cluster comprises multiple distributed system components. These components are useful to understand for operators that are monitoring system usage or are debugging an issue with a cluster deployment.
Controller
Server
Broker
Minion (optional)
Helix is a cluster management solution that was designed and created by the authors of Pinot at LinkedIn. Helix drives the state of a Pinot cluster from a transient state to an ideal state, acting as the fault-tolerant distributed state store that guarantees consistency. Helix is embedded as agents that operate within a controller, broker, and server, and does not exist as an independent and horizontally scaled component.
In addition to cluster management, resource allocation, and scheduling, the controller is also the HTTP gateway for REST API administration of a Pinot deployment. A web-based query console is also provided for operators to quickly and easily run SQL/PQL queries.
A real-time and offline server have very different resource usage requirements, where real-time servers are continually consuming new messages from external systems (such as Kafka topics) that are ingested and allocated on segments of a tenant. Because of this, resource isolation can be used to prioritize high-throughput real-time data streams that are ingested and then made available for query through a broker.
Raw data is broken into small data shards and each shard is converted into a unit known as a . One or more segments together form a , which is the logical container for querying Pinot using .
Similar to traditional databases, Pinot has the concept of a —a logical abstraction to refer to a collection of related data.
As is the case with RDBMS, a table is a construct that consists of columns and rows (documents) that are queried using SQL. A table is associated with a that defines the columns in a table as well as their data types.
Pinot has a distributed systems architecture that scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, all data needs to be distributed across multiple nodes. Pinot achieves this by breaking data into smaller chunks known as (similar to shards/partitions in HA relational databases). Segments can also be seen as time-based partitions.
In order to support multi-tenancy, Pinot has first class support for tenants. A table is associated with a This allows all tables belonging to a particular logical namespace to be grouped under a single tenant name and isolated from other tenants. This isolation between tenants provides different namespaces for applications and teams to prevent sharing tables or schemas. Development teams building applications will never have to operate an independent deployment of Pinot. An organization can operate a single cluster and scale it out as new tenants increase the overall volume of queries. Developers can manage their own schemas and tables without being impacted by any other tenant on a cluster.
Logically, a is simply a group of tenants. As with the classical definition of a cluster, it is also a grouping of a set of compute nodes. Typically, there is only one cluster per environment/data center. There is no needed to create multiple clusters since Pinot supports the concept of tenants. At LinkedIn, the largest Pinot cluster consists of 1000+ nodes distributed across a data center. The number of nodes in a cluster can be added in a way that will linearly increase performance and availability of queries. The number of nodes and the compute resources per node will reliably predict the QPS for a Pinot cluster, and as such, capacity planning can be easily achieved using SLAs that assert performance expectations for end-user applications.
The benefits of scale that make Pinot linearly scalable for an unbounded number of nodes is made possible through its integration with and .
A is the core orchestrator that drives the consistency and routing in a Pinot cluster. Controllers are horizontally scaled as an independent component (container) and has visibility of the state of all other components in a cluster. The controller reacts and responds to state changes in the system and schedules the allocation of resources for tables, segments, or nodes. As mentioned earlier, Helix is embedded within the controller as an agent that is a participant responsible for observing and driving state changes that are subscribed to by other components.
A receives queries from a client and routes their execution to one or more Pinot servers before returning a consolidated response.
host segments (shards) that are scheduled and allocated across multiple nodes and routed on an assignment to a tenant (there is a single-tenant by default). Servers are independent containers that scale horizontally and are notified by Helix through state changes driven by the controller. A server can either be a real-time server or an offline server.
Pinot is an optional component that can be used to run background tasks such as "purge" for GDPR (General Data Protection Regulation). As Pinot is an immutable aggregate store, records containing sensitive private data need to be purged on a request-by-request basis. Minion provides a solution for this purpose that complies with GDPR while optimizing Pinot segments and building additional indices that guarantee performance in the presence of the possibility of data deletion. One can also write a custom task that runs on a periodic basis. While it's possible to perform these tasks on the Pinot servers directly, having a separate process (Minion) lessens the overall degradation of query latency as segments are impacted by mutable writes.
For a general overview that ties together all of the reference material in this section, see .
The Pinot Controller is responsible for the following:
Maintaining global metadata (e.g. configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.
Hosting the Helix Controller and managing other Pinot components (brokers, servers, minions)
Maintaining the mapping of which servers are responsible for which segments. This mapping is used by the servers to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.
Serving admin endpoints for viewing, creating, updating, and deleting configs, which are used to manage and operate the cluster.
Serving endpoints for segment uploads, which are used in offline data pushes. They are responsible for initializing real-time consumption and coordination of persisting real-time segments into the segment store periodically.
Undertaking other management activities such as managing retention of segments, validations.
For redundancy, there can be multiple instances of Pinot controllers. Pinot expects that all controllers are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or ADLS.
The Controller runs several periodic tasks in the background, to perform activities such as management and validation. Each periodic task has its own configs to define the run frequency and default frequency. Each task runs at its own schedule or can also be triggered manually if needed. The task runs on the lead controller for each table.
Here's a list of all the periodic tasks
This task rebuilds the BrokerResource if the instance set has changed.
controller.broker.resource.validation.frequencyPeriod
1h
controller.broker.resource.validation.initialDelayInSeconds
between 2m-5m
This task periodically cleans up stale Pinot broker/server/minion instances.
controller.stale.instances.cleanup.task.frequencyPeriod
1h
controller.stale.instances.cleanup.task.initialDelaySeconds
between 2m-5m
controller.stale.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod
1h
This task manages the segment ValidationMetrics (missingSegmentCount, offlineSegmentDelayHours, lastPushTimeDelayHours, TotalDocumentCount, NonConsumingPartitionCount, SegmentCount), to ensure that all offline segments are contiguous (no missing segments) and that the offline push delay isn't too high.
controller.offline.segment.interval.checker.frequencyPeriod
24h
controller.statuschecker.waitForPushTimePeriod
10m
controller.offlineSegmentIntervalChecker.initialDelayInSeconds
between 2m-5m
TBD
This task validates the ideal state and segment zk metadata of realtime tables,
fixing any partitions which have stopped consuming
starting consumption from new partitions
uploading segments to deep store if segment download url is missing
This task ensures that the consumption of the realtime tables gets fixed and keeps going when met with erroneous conditions.
This task does not fix consumption stalled due to
CONSUMING segment being deleted
Kafka OOR exceptions
controller.realtime.segment.validation.frequencyPeriod
1h
controller.realtime.segment.validation.initialDelayInSeconds
between 2m-5m
This task manages retention of segments for all tables. During the run, it looks at the retentionTimeUnit
and retentionTimeValue
inside the segmentsConfig
of every table, and deletes segments which are older than the retention. The deleted segments are moved to a DeletedSegments folder colocated with the dataDir on segment store, and permanently deleted from that folder in a configurable number of days.
controller.retention.frequencyPeriod
6h
controller.retentionManager.initialDelayInSeconds
between 2m-5m
controller.deleted.segments.retentionInDays
7d
This task is applicable only if you have tierConfig or tagOverrideConfig. It runs rebalance in the background to
relocate COMPLETED segments to tag overrides
relocate ONLINE segments to tiers if tier configs are set
At most one replica is allowed to be unavailable during rebalance.
controller.segment.relocator.frequencyPeriod
1h
controller.segmentRelocator.initialDelayInSeconds
between 2m-5m
This task manages segment status metrics such as realtimeTableCount, offlineTableCount, disableTableCount, numberOfReplicas, percentOfReplicas, percentOfSegments, idealStateZnodeSize, idealStateZnodeByteSize, segmentCount, segmentsInErrorState, tableCompressedSize.
controller.statuschecker.frequencyPeriod
5m
controller.statusChecker.initialDelayInSeconds
between 2m-5m
TBD
Use the GET /periodictask/names
API to fetch the names of all the Periodic Tasks running on your Pinot cluster.
To manually run a named Periodic Task use the GET /periodictask/run
API
The Log Request Id
(api-09630c07
) can be used to search through pinot-controller log file to see log entries related to execution of the Periodic task that was manually run.
If tableName
(and its type OFFLINE
or REALTIME
) is not provided, the task will run against all tables.
Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a controller
A tenant is a logical component defined as a group of server/broker nodes with the same Helix tag.
In order to support multi-tenancy, Pinot has first-class support for tenants. Every table is associated with a server tenant and a broker tenant. This controls the nodes that will be used by this table as servers and brokers. This allows all tables belonging to a particular use case to be grouped under a single tenant name.
The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A
and Table B
in the same Pinot cluster.
We can configure Table A
with server tenant Tenant A
and Table B
with server tenant Tenant B
. We can tag some of the server nodes for Tenant A
and some for Tenant B
. This will ensure that segments of Table A
only reside on servers tagged with Tenant A
, and segment of Table B
only reside on servers tagged with Tenant B
. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
No need to create separate clusters for every table or use case!
This tenant is defined in the tenants section of the table config.
This section contains 2 main fields broker
and server
, which decide the tenants used for the broker and server components of this table.
In the above example:
The table will be served by brokers that have been tagged as brokerTenantName_BROKER
in Helix.
If this were an offline table, the offline segments for the table will be hosted in Pinot servers tagged in Helix as serverTenantName_OFFLINE
If this were a real-time table, the real-time segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in Helix as serverTenantName_REALTIME
.
Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant
by tagging 3 untagged broker nodes as sampleBrokerTenant_BROKER
.
To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances
.
Follow instructions in Getting Pinot to get Pinot locally, and then
Check out the table config in the Rest API to make sure it was successfully uploaded.
Here's a sample server tenant config. This will create a server tenant sampleServerTenant
by tagging 1 untagged server node as sampleServerTenant_OFFLINE
and 1 untagged server node as sampleServerTenant_REALTIME
.
To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances
+ realtimeInstances
.
Follow instructions in Getting Pinot to get Pinot locally, and then
Check out the table config in the Rest API to make sure it was successfully uploaded.
Apache Pinot, a real-time distributed OLAP datastore, purpose-built for low-latency high throughput analytics, perfect for user-facing analytical workloads.
Join us in our Slack channel for questions, troubleshooting, and feedback. You can request an invite from - https://communityinviter.com/apps/apache-pinot/apache-pinot.
We'd love to hear from you!
Pinot is a real-time distributed OLAP datastore, purpose-built to provide ultra low-latency analytics, even at extremely high throughput. It can ingest directly from streaming data sources - such as Apache Kafka and Amazon Kinesis - and make the events available for querying instantly. It can also ingest from batch data sources such as Hadoop HDFS, Amazon S3, Azure ADLS, and Google Cloud Storage.
At the heart of the system is a columnar store, with several smart indexing and pre-aggregation techniques for low latency. This makes Pinot the most perfect fit for user-facing realtime analytics. At the same time, Pinot is also a great choice for other analytical use-cases, such as internal dashboards, anomaly detection, and ad-hoc data exploration.
Pinot was built by engineers at LinkedIn and Uber and is designed to scale up and out with no upper bound. Performance always remains constant based on the size of your cluster and an expected query per second (QPS) threshold.
User-facing analytics, or site-facing analytics, is the analytical tools and applications that you would expose directly to the end-users of your product. In a user-facing analytics application, think of the user-base as ALL end users of an App. This App could be a social networking app, or a food delivery app - anything at all. It’s not just a few analysts doing offline analysis, or a handful of data scientists in a company running ad-hoc queries. This is ALL end-users, receiving personalized analytics on their personal devices (think 100s of 1000s of queries per second). These queries are triggered by apps, and not written by people, and so the scale will be as much as the active users on that App (think millions of events/sec)
And, this is for all the freshest possible data, which touches on the other aspect here - realtime analytics. "Yesterday" might be a long time ago for some businesses and they cannot wait for ETLs and batch jobs. The data needs to be used for analytics, as soon as it is generated (think latencies < 1s).
Wanting such a user-facing analytics application, using realtime events, sounds great. But what does it mean for the underlying infrastructure, to support such an analytical workload?
Such applications require the freshest possible data, and so the system needs to be able to ingest data in real time and make it available for querying, also in real time.
Data for such apps tend to be event data, for a wide range of actions, coming from multiple sources, and so the data comes in at a very high velocity and tends to be highly dimensional.
Queries are triggered by end-users interacting with apps - with queries per second in hundreds of thousands, with arbitrary query patterns, and latencies are expected to be in milliseconds for good user-experience.
And further do all of the above, while being scalable, reliable, highly available, and having a low cost to serve.
This video talks more about user-facing real-time analytics, and how Pinot is used to achieve that.
Here's another great video that goes into the details of how Pinot tackles some of the challenges faced in handling a user-facing analytics workload.
Pinot originated at LinkedIn which currently has one of the largest deployment powering more than 50+ user facing applications such as Viewed My Profile, Talent Analytics, Company Analytics, Ad Analytics and many more. At LinkedIn, Pinot also serves as the backend to visualize and monitor 10,000+ business metrics.
With Pinot's growing popularity, several companies are now using it in production to power a variety of analytics use cases. A detailed list of companies using Pinot can be found here.
A column-oriented database with various compression schemes such as Run Length, Fixed Bit Length
Pluggable indexing technologies - Sorted Index, Bitmap Index, Inverted Index, StarTree Index, Bloom Filter, Range Index, Text Search Index(Lucence/FST), Json Index, Geospatial Index
Ability to optimize query/execution plan based on query and segment metadata
Near real-time ingestion from streams such as Kafka, Kinesis and batch ingestion from sources such as Hadoop, S3, Azure, GCS
SQL-like language that supports selection, aggregation, filtering, group by, order by, distinct queries on data
Support for multi-valued fields
Horizontally scalable and fault-tolerant
Pinot is designed to execute OLAP queries with low latency. It is suited in contexts where fast analytics, such as aggregations, are needed on immutable data, possibly, with real-time data ingestion.
User facing Analytics Products
Pinot is the perfect choice for user-facing analytics products. Pinot was originally built at LinkedIn to power rich interactive real-time analytic applications such as Who Viewed Profile, Company Analytics, Talent Insights, and many more. UberEats Restaurant Manager is another example of a customer-facing Analytics App. At LinkedIn, Pinot powers 50+ user-facing products, ingesting millions of events per second and serving 100k+ queries per second at millisecond latency.
Real-time Dashboard for Business Metrics
Pinot can be also be used to perform typical analytical operations such as slice and dice, drill down, roll up, and pivot on large scale multi-dimensional data. For instance, at LinkedIn, Pinot powers dashboards for thousands of business metrics. One can connect various BI tools such as Superset, Tableau, or PowerBI to visualize data in Pinot.
Instructions to connect Pinot with Superset can be found here.
Anomaly Detection
In addition to visualizing data in Pinot, one can run Machine Learning Algorithms to detect Anomalies in the data stored in Pinot. See ThirdEye for more information on how to use Pinot for Anomaly Detection and Root Cause Analysis.
While Pinot doesn't match the typical mold of a database product, it is best understood based on your role as either an analyst, data scientist, or application developer.
Enterprise business intelligence
For analysts and data scientists, Pinot is best viewed as a highly-scalable data platform for business intelligence. In this view, Pinot converges big data platforms with the traditional role of a data warehouse, making it a suitable replacement for analysis and reporting.
Enterprise application development
For application developers, Pinot is best viewed as an immutable aggregate store that sources events from streaming data sources, such as Kafka, and makes it available for a query using SQL.
As is the case with a microservice architecture, data encapsulation ends up requiring each application to provide its own data store, as opposed to sharing one OLTP database for reads and writes. In this case, it becomes difficult to query the complete view of a domain because it becomes stored in many different databases. This is costly in terms of performance since it requires joins across multiple microservices that expose their data over HTTP under a REST API. To prevent this, Pinot can be used to aggregate all of the data across a microservice architecture into one easily queryable view of the domain.
Pinot tenants prevent any possibility of sharing ownership of database tables across microservice teams. Developers can create their own query models of data from multiple systems of record depending on their use case and needs. As with all aggregate stores, query models are eventually consistent and immutable.
Our documentation is structured to let you quickly get to the content you need and is organized around the different concerns of users, operators, and developers. If you're new to Pinot and want to learn things by example, please take a look at our getting started section.
To start importing data into Pinot, check out our guides on batch import and stream ingestion based on our plugin architecture.
Pinot works very well for querying time series data with many dimensions and metrics over a vast unbounded space of records that scales linearly on a per-node basis. Filters and aggregations are both easy and fast.
Pinot supports SQL for querying read-only data. Learn more about querying Pinot for time series data in our PQL (Pinot Query Language) guide.
Pinot may be deployed to and operated on a cloud provider or a local or virtual machine. You may get started either with a bare-metal installation or a Kubernetes one (either locally or in the cloud). To get immediately started with Pinot, check out these quick start guides for bootstrapping a Pinot cluster using Docker or Kubernetes.
For a high-level overview that explains how Pinot works, please take a look at our basic concepts section.
To understand the distributed systems architecture that explains Pinot's operating model, please take a look at our basic architecture section.
Each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types.
The schema is stored in the Zookeeper, along with the table configuration.
A schema also defines what category a column belongs to. Columns in a Pinot table can be categorized into three categories:
Pinot does not enforce strict rules on which of these categories columns belong to, rather the categories can be thought of as hints to Pinot to do internal optimizations.
For example, metrics may be stored without a dictionary and can have a different default null value.
The categories are also relevant when doing segment merge and rollups. Pinot uses the dimension and time fields to identify records against which to apply merge/rollups.
Metrics aggregation is another example where Pinot uses dimensions and time are used as the key, and automatically aggregates values for the metric columns.
Data types determine the operations that can be performed on a column. Pinot supports the following data types:
BOOLEAN
, TIMESTAMP
, JSON
are added after release 0.7.1
. In release 0.7.1
and older releases, BOOLEAN
is equivalent to STRING.
BIG_DECIMAL
is added after release 0.10.0
.
The lowest granularity TIMESTAMP type supports is milliseconds epoch, nanoseconds is not supported.
There are several built-in virtual columns inside the schema the can be used for debugging purposes:
These virtual columns can be used in queries in a similar way to regular columns.
Let's create a schema and put it in a JSON file. For this example, we have created a schema for flight data.
Then, we can upload the sample schema provided above using either a Bash command or REST API call.
Pinot supports the following types of table:
The user querying the database does not need to know the type of the table. They only need to specify the table name in the query.
e.g. regardless of whether we have an offline table myTable_OFFLINE
, a real-time table myTable_REALTIME
, or a hybrid table containing both of these, the query will be:
You can use the following properties to make your tables faster or leaner:
Segment
Indexing
Tenants
For real-time tables, segments are built in a specific interval inside Pinot. You can tune the following for the real-time segments:
The Pinot real-time consumer ingests the data, creates the segment, and then flushes the in-memory segment to disk. Pinot allows you to configure when to flush the segment in the following ways:
Number of consumed rows - After consuming X no. of rows from the stream, Pinot will persist the segment to disk
Number of desired rows per segment - Pinot learns and then estimates the number of rows that need to be consumed so that the persisted segment is approximately the size. The learning phase starts by setting the number of rows to 100,000 (this value can be changed) and adjusts it to reach the desired segment size. The segment size may go significantly over the desired size during the learning phase. Pinot corrects the estimation as it goes along, so it is not guaranteed that the resulting completed segments are of the exact size as configured. You should set this value to optimize the performance of queries.
Max time duration to wait - Pinot consumers wait for the configured time duration after which segments are persisted to the disk.
Replicas A segment can have multiple replicas to provide higher availability. You can configure the number of replicas for a table segment using
However, in certain scenarios, the segment build can get very memory intensive. It might be desirable to enforce the non-committer servers to just download the segment from the controller, instead of building it again. You can do this by setting completionMode: "DOWNLOAD"
in the table configuration
Download Scheme
A Pinot server may fail to download segments from the deep store such as HDFS after its completion. However, you can configure servers to download these segments from peer servers instead of the deep store. Currently, only HTTP and HTTPS download schemes are supported. More methods such as gRPC/Thrift can be added in the future.
You can create multiple indices on a table to increase the performance of the queries. The following types of indices are supported:
Dictionary-encoded forward index with bit compression
Raw value forward index
Sorted forward index with run-length encoding
Bitmap inverted index
Sorted inverted index
You can aggregate the real-time stream data as it is consumed to reduce segment sizes. We sum the metric column values of all rows that have the same values for all dimension and time columns and create a single row in the segment. This feature is only available on REALTIME
tables.
The only supported aggregation is SUM
. The columns on which pre-aggregation is to be done need to satisfy the following requirements:
All metrics should be listed in noDictionaryColumns
.
There should not be any multi-value dimensions.
All dimension columns are treated to have a dictionary, even if they appear as noDictionaryColumns
in the config.
The following table config snippet shows an example of enabling pre-aggregation during real-time ingestion.
You can also override if a table should move to a server with different tenant based on segment status.
A tagOverrideConfig
can be added under the tenants
section for realtime tables, to override tags for consuming and completed segments. For example:
A hybrid table is a table composed of 2 tables, one offline and one real-time that share the same name. In such a table, offline segments may be pushed periodically. The retention on the offline table can be set to a high value since segments are coming in on a periodic basis, whereas the retention on the real-time part can be small.
Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments for that time period and use the real-time table only for data not available in the offline table.
A typical scenario is pushing a deduped cleaned up data into an offline table every day while consuming real-time data as and when it arrives. The data can be kept in offline tables for even a few years while the real-time data would be cleaned every few days.
Prerequisites
Sample Console Output
Start Kafka
Create a Kafka Topic
Create a Streaming table
Sample output
Start Kafka-Zookeeper
Start Kafka
Create stream table
Creating a hybrid table has to be done in 2 separate steps of creating an offline and real-time table individually. You don't need to create a separate overlay/hybrid table.
It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. Custom tasks can be plugged via annotations into the cluster. Some typical minion tasks are:
Segment creation
Segment purge
Segment merge
PinotTaskGenerator interface defines the APIs for the controller to generate tasks for minions to execute.
Factory for PinotTaskExecutor
which defines the APIs for Minion to execute the tasks.
Factory for MinionEventObserver
which defines the APIs for task event callbacks on minion.
The PushTask can fetch files from an input folder e.g. from a S3 bucket and converts them into segments. The PushTask converts one file into one segment and keeps file name in segment metadata to avoid duplicate ingestion. Below is an example task config to put in TableConfig to enable this task. The task is scheduled every 10min to keep ingesting remaining files, with 10 parallel task at max and 1 file per task.
NOTE: You may want to simply omit "tableMaxNumTasks" due to this caveat: the task generates one segment per file, and derives segment name based on the time column of the file. If two files happen to have same time range and are ingested by tasks from different schedules, there might be segment name conflict. To overcome this issue for now, you can omit “tableMaxNumTasks” and by default it’s Integer.MAX_VALUE, meaning to schedule as many tasks as possible to ingest all input files in a single batch. Within one batch, a sequence number suffix is used to ensure no segment name conflict. Because the sequence number suffix is scoped within one batch, tasks from different batches might encounter segment name conflict issue said above.
Tasks are enabled on a per-table basis. To enable a certain task type (e.g. myTask
) on a table, update the table config to include the task type:
Under each enable task type, custom properties can be configured for the task type.
There are also two task configs to be set as part of cluster configs like below. One controls task's overall timeout (1hr by default) and one for how many tasks to run on a single minion worker (1 by default).
There are 2 ways to enable task scheduling:
Tasks can be scheduled periodically for all task types on all enabled tables. Enable auto task scheduling by configuring the schedule frequency in the controller config with the key controller.task.frequencyPeriod
. This takes period strings as values, e.g. 2h, 30m, 1d.
Tasks can also be scheduled based on cron expressions. The cron expression is set in the schedule
config for each task type separately. This config in the controller config, controller.task.scheduler.enabled
should be set to true
to enable cron scheduling.
Tasks can be manually scheduled using the following controller rest APIs:
To plug in a custom task, implement PinotTaskGenerator
, PinotTaskExecutorFactory
and MinionEventObserverFactory
(optional) for the task type (all of them should return the same string for getTaskType()
), and annotate them with the following annotations:
After annotating the classes, put them under the package of name org.apache.pinot.*.plugin.minion.tasks.*
, then they will be auto-registered by the controller and minion.
In Pinot UI, there is Minion Task Manager tab under Cluster Manager page. From that minion task manager tab, one can find a lot of task related info for troubleshooting. Those info are mainly collected from the Pinot controller that schedules tasks or Helix that tracks task runtime status. There are also buttons to schedule tasks in an ad hoc way. Below are some brief introductions to some pages under the minion task manager tab.
This one shows which types of Minion Task have been used. Essentially which task types have created their task queues in Helix.
Clicking into a task type, one can see the tables using that task. And a few buttons to stop the task queue, cleaning up ended tasks etc.
Then clicking into any table in this list, one can see how the task is configured for that table. And the task metadata if there is one in ZK. For example, MergeRollupTask tracks a watermark in ZK. If the task is cron scheduled, the current and next schedules are also shown in this page like below.
At the bottom of this page, one can see a list of tasks generated for this table for this specific task type. Like here, one MergeRollup task has been generated and completed.
Clicking into a task from that list, we can see start/end time for it, and the sub tasks generated for that task (as context, one minion task can have multiple sub-tasks to process data in parallel). In this example, it happened to have one sub-task here, and it shows when it starts and stops and which minion worker it's running.
Clicking into this subtask, one can see more details about it like the input task configs and error info if the task failed.
There is a controller job that runs every 5 minutes by default and emits metrics about Minion tasks scheduled in Pinot. The following metrics are emitted for each task type:
NumMinionTasksInProgress: Number of running tasks
NumMinionSubtasksRunning: Number of running sub-tasks
NumMinionSubtasksWaiting: Number of waiting sub-tasks (unassigned to a minion as yet)
NumMinionSubtasksError: Number of error sub-tasks (completed with an error/exception)
PercentMinionSubtasksInQueue: Percent of sub-tasks in waiting or running states
PercentMinionSubtasksInError: Percent of sub-tasks in error
The controller also emits metrics about how tasks are cron scheduled:
cronSchedulerJobScheduled: Number of current cron schedules registered to be triggered regularly according their cron expressions. It's a Gauge.
cronSchedulerJobTrigger: Number of cron scheduled triggered, as a Meter.
cronSchedulerJobSkipped: Number of late cron scheduled skipped, as a Meter.
cronSchedulerJobExecutionTimeMs: Time used to complete task generation, as a Timer.
For each task, the Minion will emit these metrics:
TASK_QUEUEING: Task queueing time (task_dequeue_time - task_inqueue_time), assuming the time drift between helix controller and pinot minion is minor, otherwise the value may be negative
TASK_EXECUTION: Task execution time, which is the time spent on executing the task
NUMBER_OF_TASKS: number of tasks in progress on that minion. Whenever a Minion starts a task, increase the Gauge by 1, whenever a Minion completes (either succeeded or failed) a task, decrease it by 1
NUMBER_TASKS_EXECUTED: Number of tasks executed, as a Meter.
NUMBER_TASKS_COMPLETED: Number of tasks completed, as a Meter.
NUMBER_TASKS_CANCELLED: Number of tasks cancelled, as a Meter.
NUMBER_TASKS_FAILED: Number of tasks failed, as a Meter. Different from fatal failure, the task encountered an error which can not be recovered from this run, but it may still succeed by retrying the task.
NUMBER_TASKS_FATAL_FAILED: Number of tasks fatal failed, as a Meter. Different from failure, the task encountered an error, which will not be recoverable even with retrying the task.
This page covers everything you need to know about how queries are computed in Pinot's distributed systems architecture.
This page will introduce you to the guiding principles behind the design of Apache Pinot. Here you will learn the distributed systems architecture that allows Pinot to scale the performance of queries linearly based on the number of nodes in a cluster. You'll also be introduced to the two different types of tables used to ingest and query data in offline (batch) or real-time (stream) mode.
Pinot was designed by engineers at LinkedIn and Uber to scale query performance based on the number of nodes in a cluster. As you add more nodes, query performance will always improve based on the expected query volume per second quota. To achieve horizontal scalability to an unbounded number of nodes and data storage, without performance degradation, the following guiding design principles were established.
Highly available: Pinot is built to serve low latency analytical queries for customer facing applications. By design, there is no single point of failure in Pinot. The system continues to serve queries when a node goes down.
Horizontally scalable: Ability to scale by adding new nodes as a workload changes.
Latency vs Storage: Pinot is built to provide low latency even at high-throughput. Features such as segment assignment strategy, routing strategy, star-tree indexing were developed to achieve this.
Immutable data: Pinot assumes that all data stored is immutable. For GDPR compliance, we provide an add-on solution for purging data while maintaining performance guarantees.
Dynamic configuration changes: Operations such as adding new tables, expanding a cluster, ingesting data, modifying indexing config, and re-balancing must be performed without impacting query availability or performance.
Helix divides nodes into three logical components based on their responsibilities:
Participant: These are the nodes in the cluster that actually host the distributed storage resources.
Spectator: These nodes observe the current state of each participant and routes requests accordingly. Routers, for example, need to know the instance on which a partition is hosted and its state in order to route the request to the appropriate endpoint. Routing is continually being changed to optimize cluster performance as storage primitives are added and changed.
Helix uses Zookeeper to maintain cluster state. Each component in a Pinot cluster takes a Zookeeper address as a startup parameter. The various components that are distributed in a Pinot cluster will watch Zookeeper notifications and issue updates via its embedded Helix-defined agent.
Helix agents use Zookeeper to store and update configurations, as well as for distributed coordination. Zookeeper stores the following information about the cluster:
Knowing the ZNode
layout structure in Zookeeper for Helix agents in a cluster is useful for operations and/or troubleshooting cluster state and health.
To achieve fault tolerance, one can start multiple controllers (typically three) and one of them will act as a leader. If the leader crashes or dies, another leader is automatically elected. Leader election is achieved using Apache Helix. Having at-least one controller is required to perform any DDL equivalent operation on the cluster, such as adding a table or a segment.
The controller does not interfere with query execution. Query execution is not impacted even when all controllers nodes are offline. If all controller nodes are offline, the state of the cluster will stay as it was when the last leader went down. When a new leader comes online, a cluster resumes re-balancing activity and can accept new tables or segments.
Brokers need three key things to start.
Cluster name
Zookeeper address
Broker instance name
At the start, a broker registers as a Helix Participant and awaits notifications from other Helix agents. These notifications will be handled for table creation, a new segment being loaded, or a server starting up/or going down, in addition to any configuration changes.
Service Discovery/Routing Table
Irrespective of the kind of notification, the key responsibility of a broker is to maintain the query routing table. The query routing table is simply a mapping between segments and the servers that a segment resides on. Typically, a segment resides on more than one server. The broker computes multiple routing tables depending on the configured routing strategy for a table. The default strategy is to balance the query load across all available servers.
There are advanced routing strategies available such as ReplicaAware routing, partition-based routing, and minimal server selection routing. These strategies are meant for special or generic cases that are meant to serve very high throughput queries.
Query processing
For every query, a cluster's broker performs the following:
Scatter-Gather: sends the requests to each server and gathers the responses.
Merge: merges the query results returned from each server.
Sends the query result to the client.
Fault tolerance
Broker instances scale horizontally without an upper bound. In a majority of cases, only three brokers are required. If most query results that are returned to a client are <1MB in size per query, one can run a broker and servers inside the same instance container. This lowers the overall footprint of a cluster deployment for use cases that do not need to guarantee a strict SLA on query performance in production.
In theory, a server can host both real-time segments and offline segments. However, in practice, we use different types of machine SKUs for real-time servers and offline servers. The advantage of separating real-time servers and offline servers is to allow each to scale independently.
Offline servers
Real-time servers
The two types of tables also scale differently.
Real-time tables have a smaller retention period and scales query performance based on the ingestion rate.
Offline tables have larger retention and scales performance based on the size of stored data.
Tables for real-time and offline can be configured differently depending on usage requirements. For example, you can choose to enable star-tree indexing for an offline table, while the real-time table with the same schema may not need it.
At table creation, a controller creates a new entry in Zookeeper for the consuming segment. Helix notices the new segment and notifies the real-time server, which starts consuming data from the streaming source. The broker, which watches for changes, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
Whenever the segment is complete (i.e. full), the real-time server notifies the Controller, which checks with all replicas and picks a winner to commit the segment to. The winner commits the segment and uploads it to the cluster's segment store, updating the state of the segment from "consuming" to "online". The controller then prepares a new segment in a "consuming" state.
Queries are received by brokers—which checks the request against the segment-to-server routing table—scattering the request between real-time and offline servers.
The two tables then process the request by filtering and aggregating the queried data, which is then returned back to the broker. Finally, the broker gathers together all of the pieces of the query response and responds back to the client with the result.
Learn about the deep store that stores a compressed copy of segment files in Pinot.
Note: Deep Store by itself is not sufficient for restore operations. Pinot stores metadata such as table config, schema, segment metadata in Zookeeper. For restore operations, both Deep Store as well as Zookeeper metadata are required.
There are several different ways that segments are persisted in the deep store.
For offline tables, the batch ingestion job writes the segment directly into the deep store, as shown in the diagram below:
The ingestion job then sends a notification about the new segment to the controller, which in turn notifies the appropriate server to pull down that segment.
For real-time tables, by default, a segment is first built-in memory by the server. It is then uploaded to the lead controller (as part of the Segment Completion Protocol sequence), which writes the segment into the deep store, as shown in the diagram below:
When using this configuration the server will directly write a completed segment to the deep store, as shown in the diagram below:
For hands-on examples of how to configure the deep store, see the following tutorials:
Pinot also supports columns that contain lists or arrays of items, but there isn't an explicit data type to represent these lists or arrays. Instead, you can indicate that a dimension column accepts multiple values. For more information, see in the Schema configuration reference.
Since Pinot doesn't have a dedicated DATETIME
datatype support, you need to input time in either STRING, LONG, or INT format. However, Pinot needs to convert the date into an understandable format such as epoch timestamp to do operations. You can refer to for more details on supported formats.
First, Make sure your and running.
For more details on constructing a schema file, see the .
Check out the schema in the to make sure it was successfully uploaded
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The columns, data types, and other metadata related to the table are defined using a .
Pinot breaks a table into multiple and stores these segments in a deep-store such as HDFS as well as Pinot servers.
In the Pinot cluster, a table is modeled as a and each segment of a table is modeled as a .
is used to define the table properties, such as name, type, indexing, routing, retention etc. It is written in JSON format and is stored in Zookeeper, along with the table schema.
A table is comprised of small chunks of data. These chunks are known as Segments. To learn more about how Pinot creates and manages segments see
For offline tables, Segments are built outside of pinot and uploaded using a distributed executor such as Spark or Hadoop. For more details, see .
Completion Mode By default, if the in-memory segment in the is equivalent to the committed segment, then the non-winner server builds and replaces the segment. If the available segment is not equivalent to the committed segment, the server simply downloads the committed segment from the controller.
For more details on why this is needed, see
For more details about peer segment download during real-time ingestion, please refer to this design doc on
For more details on each indexing mechanism and corresponding configurations, see .
You can also set up on columns to make queries faster. Further, you can also keep segments in off-heap instead of on-heap memory for faster queries.
Each table is associated with a tenant. A segment resides on the server, which has the same tenant as itself. For more details on how tenants work, see .
In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME
hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE
. It is possible to specify the full name of any tag in this section (so, for example, you could decide that completed segments for this table should be in pinot servers tagged as allTables_COMPLETED
). To learn more about this config, see the section.
To understand how time boundary works in the case of a hybrid table, see .
Create a table config for your data, or see for all possible batch/streaming tables.
Check out the table config in the to make sure it was successfully uploaded.
Check out the table config in the to make sure it was successfully uploaded.
A Minion is a standby component that leverages the to offload computationally intensive tasks from other components.
Make sure you've . If you're using docker, make sure to . To start a minion
See for details.
See for details.
As shown below, the RealtimeToOfflineSegmentsTask will be scheduled at the first second of every minute (following the syntax ).
See where the TestTask
is plugged-in.
It's recommended that you read to better understand the terms used in this guide.
As described in the , Pinot has multiple distributed system components:, , , and .
Pinot uses for cluster management. Helix is embedded as an agent within the different components and uses for coordination and maintaining the overall cluster state and health.
All Pinot and are managed by Helix. Helix is a generic cluster management framework to manage partitions and replicas in a distributed system. It's helpful to think of Helix as an event-driven discovery service with push and pull notifications that drives the state of a cluster to an ideal configuration. A finite-state machine maintains a contract of stateful operations that drives the health of the cluster towards its optimal configuration. Query load is optimized as Helix updates routing configurations between nodes based on where data is stored in the cluster.
Controller: The observes and manages the state of participant nodes. The controller is responsible for coordinating all state transitions in the cluster and ensures that state constraints are satisfied while maintaining cluster stability.
Pinot's acts as the driver of the cluster's overall state and health. Because of its role as a Helix participant and spectator, which drives the state of other components, it is the first component that is typically started after Zookeeper. Two parameters are required for starting a controller: Zookeeper address and cluster name. The controller will automatically create a cluster via Helix if it does not yet exist.
The provides a REST interface to perform CRUD operations on all logical storage resources (servers, brokers, tables, and segments).
See for more information on the web-based admin tool.
The responsibility of the is to route a given query to an appropriate instance. A broker will collect and merge the responses from all servers into a final result and send it back to the requesting client. The broker provides HTTP endpoints that accept SQL queries and returns the response in JSON format.
Fetches the routes that are computed for a query based on the routing strategy defined in a configuration.
Computes the list of segments to query from on each .
host and do most of the heavy lifting during query processing. Though the architecture shows that there are two kinds of servers, real-time and offline, a server does not really know if it's going to be a real-time server or an offline server. The responsibility of a server depends on the assignment strategy.
Offline servers typically host segments that are immutable. In this case, segments are created outside of a cluster and uploaded via a shell-based request. Based on the replication factor and the segment assignment strategy, the controller picks one or more servers to host the segment. Servers are notified via Helix about the new segments. Servers fetch the segments from deep store and load them before being ready to serve query requests. At this point, the cluster's detects that new segments are available and starts including them in query responses.
Real-time servers are different from the offline servers. Real-time nodes ingest data from streaming sources, such as Kafka, and generate the indexed segments in-memory (flushing segments to disk periodically). In memory segments are also known as consuming segments. These consuming segments get flushed periodically based on completion threshold (based on number of rows, time or segment size). At this point, they are known as completed segments. Completed segments are similar to the offline server's segments. Queries go over the in-flight (consuming) segments and the completed segments.
is an optional component and is not required to get started with Pinot. Minion is used for purging data from a Pinot cluster (for reasons such as GDPR compliance in the UK).
Within Pinot, a logical is modeled as one of two types of physical tables: offline or real-time. The reason for having two types of tables is because each one follows a different state model.
A real-time and offline table provide different configuration options for indexing and, in the case of real-time, the connector properties for the stream data source (i.e. Kafka). Table types also allow users to use different containers for real-time and offline nodes. For instance, offline servers might use virtual machines with larger storage capacity where real-time servers might need higher system memory and/or more CPU cores.
There are a few things to keep in mind when configuring the different types of tables for your workloads. When ingesting data from the same source, you can have two tables that ingest the same data that are configured differently for real-time and offline queries. Even though the two tables have the same data, performance will scale differently for queries based on your requirements. In this scenario, real-time and offline tables must share the same .
In batch mode, data is ingested into Pinot via an . An ingestion job transforms a raw data source (such as a CSV file) into . Once segments are generated for the imported data, an ingestion job stores them into the cluster's segment store (a.k.a deep store) and notifies the . The notification is processed and the result is that the Helix agent on the controller updates the ideal state configuration in Zookeeper. Helix will then notify the offline that there are new segments available. In response to the notification from the controller, the offline server downloads the newly created segments directly from the cluster's segment store. The cluster's broker, which watches for state changes in Helix, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
The deep store (or deep storage) is the permanent store for files.
It is used for backup and restore operations. New nodes in a cluster will pull down a copy of segment files from the deep store. If the local segment files on a server gets damaged in some way (or accidentally deleted), a new copy will be pulled down from the deep store on server restart.
The deep store stores a compressed version of the segment files and it typically won't include any indexes. These compressed files can be stored on a local file system or on a variety of other file systems. For more details on supported file systems, see .
Having all segments go through the controller can become a system bottleneck under heavy load, in which case you can use the peer download policy, as described in .
Dimension
Dimension columns are typically used in slice and dice operations for answering business queries. Some operations for which dimension columns are used:
GROUP BY
- group by one or more dimension columns along with aggregations on one or more metric columns
Filter clauses such as WHERE
Metric
These columns represent the quantitative data of the table. Such columns are used for aggregation. In data warehouse terminology, these can also be referred to as fact or measure columns.
Some operation for which metric columns are used:
Aggregation - SUM
, MIN
, MAX
, COUNT
, AVG
etc
Filter clause such as WHERE
DateTime
This column represents time columns in the data. There can be multiple time columns in a table, but only one of them can be treated as primary. The primary time column is the one that is present in the segment config.
The primary time column is used by Pinot to maintain the time boundary between offline and real-time data in a hybrid table and for retention management. A primary time column is mandatory if the table's push type is APPEND
and optional if the push type is REFRESH
.
Common operations that can be done on time column:
GROUP BY
Filter clauses such as WHERE
INT
0
LONG
0
FLOAT
0.0
DOUBLE
0.0
BIG_DECIMAL
Not supported
0.0
BOOLEAN
0 (false)
N/A
TIMESTAMP
0 (1970-01-01 00:00:00 UTC)
N/A
STRING
"null"
N/A
JSON
"null"
N/A
BYTES
byte array of length 0
byte array of length 0
$hostName
Dimension
STRING
Name of the server hosting the data
$segmentName
Dimension
STRING
Name of the segment containing the record
$docId
Dimension
INT
Document id of the record within the segment
Offline
Offline tables ingest pre-built pinot-segments from external data stores. This is generally used for batch ingestion.
Realtime
Realtime tables ingest data from streams (such as Kafka) and build segments from the consumed data.
Hybrid
A hybrid Pinot table has both realtime as well as offline tables under the hood. By default, all tables in Pinot are Hybrid in nature.
POST /tasks/schedule
Schedule tasks for all task types on all enabled tables
POST /tasks/schedule?taskType=myTask
Schedule tasks for the given task type on all enabled tables
POST /tasks/schedule?tableName=myTable_OFFLINE
Schedule tasks for all task types on the given table
POST /tasks/schedule?taskType=myTask&tableName=myTable_OFFLINE
Schedule tasks for the given task type on the given table
PinotTaskGenerator
@TaskGenerator
PinotTaskExecutorFactory
@TaskExecutorFactory
MinionEventObserverFactory
@EventObserverFactory
Component
Helix Mapping
Segment
Modeled as a Helix Partition. Each segment can have multiple copies referred to as Replicas.
Table
Modeled as a Helix Resource. Multiple segments are grouped into a table. All segments belonging to a Pinot Table have the same schema.
Controller
Embeds the Helix agent that drives the overall state of the cluster.
Server
Broker
Broker is modeled as a Helix Spectator that observes the cluster for changes in the state of segments and servers. In order to support multi-tenancy, brokers are also modeled as Helix Participants.
Minion
Pinot Minion is modeled as a Helix Participant.
Resource
Stored Properties
Controller
The controller that is assigned as the current leader
Servers/Brokers
A list of servers/brokers and their configuration
Health status
Tables
List of tables
Table configurations
Table schema information
List of segments within a table
Segment
Exact server location(s) of a segment (routing table)
State of each segment (online/offline/error/consuming)
Meta data about each segment
This section describes quick start commands that launch all Pinot components in a single process.
Pinot ships with QuickStart commands that launch Pinot components in a single process and import pre-built datasets. These QuickStarts are a good place if you're just getting started with Pinot.
Prerequisites
You will need to have installed Pinot locally or have Docker installed if you want to use the Pinot Docker image.
macOS Monterey Users
By default the Airplay receiver server runs on port 7000, which is also the port used by the Pinot Server in the Quick Start. You may see the following error when running these examples:
If you disable the Airplay receiver server and try again, you shouldn't see this error message anymore.
This example demonstrates how to do batch processing with Pinot. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the baseballStats
table
Launches a standalone data ingestion job that builds one segment for a given CSV data file for the baseballStats
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to import and query JSON documents in Pinot. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the githubEvents
table
Launches a standalone data ingestion job that builds one segment for a given JSON data file for the githubEvents
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to do batch processing in Pinot where the the data items have complex fields that need to be unnested. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates the githubEvents
table
Launches a standalone data ingestion job that builds one segment for a given JSON data file for the githubEvents
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This example demonstrates how to do stream processing with Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do stream processing with JSON documents in Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
This example demonstrates how to do stream processing in Pinot with RealtimeToOfflineSegmentsTask and MergeRollupTask minion tasks continuously optimizing segments as data gets ingested. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, Pinot Minion, and Pinot Server.
Creates githubEvents
table
Launches a GitHub events stream
Publishes data to a Kafka topic githubEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do stream processing in Pinot where the stream contains items that have complex fields that need to be unnested. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, Pinot Minion, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do stream processing with upsert with Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
This example demonstrates how to do stream processing with upsert with JSON documents in Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates meetupRsvp
table
Launches a meetup
stream
Publishes data to a Kafka topic meetupRSVPEvents
that is subscribed to by Pinot
Issues sample queries to Pinot
This example demonstrates how to do hybrid stream and batch processing with Pinot. The command:
Starts Apache Kafka, Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server.
Creates airlineStats
table
Launches a standalone data ingestion job that builds segments under a given directory of Avro files for the airlineStats
table and pushes the segments to the Pinot Controller.
Launches a stream of flights stats
Publishes data to a Kafka topic airlineStatsEvents
that is subscribed to by Pinot.
Issues sample queries to Pinot
This example demonstrates how to do joins in Pinot using the Lookup UDF. The command:
Starts Apache Zookeeper, Pinot Controller, Pinot Broker, and Pinot Server in the same container.
Creates the baseballStats
table
Launches a data ingestion job that builds one segment for a given CSV data file for the baseballStats
table and pushes the segment to the Pinot Controller.
Creates the dimBaseballTeams
table
Launches a data ingestion job that builds one segment for a given CSV data file for the dimBaseballStats
table and pushes the segment to the Pinot Controller.
Issues sample queries to Pinot
This section contains quick start guides to help you get up and running with Pinot.
To simplify the getting started experience, Pinot ships with quick start guides that launch Pinot components in a single process and import pre-built datasets.
For a full list of these guides, see Quick Start Examples.
Getting data into Pinot is easy. Take a look at these two quick start guides which will help you get up and running with sample data for offline and real-time tables.
Explore the data on our Pinot cluster
Once you have set up the Cluster, you can start exploring the data and the APIs using the Pinot Data Explorer.
Navigate to http://localhost:9000 in your browser to open the controller UI.
The first screen that you'll see when you open the Pinot Data Explorer is the Cluster Manager. The Cluster Manager provides a UI to operate and manage your cluster.
If you want to view the contents of a server, click on its instance name. You'll then see the following:
To view the baseballStats table, click on its name, which will show the following screen:
From this screen, we can edit or delete the table, edit or adjust its schema, as well as several other operations.
For example, if we want to add yearID to the list of inverted indexes, click on Edit Table, add the extra column, and click Save:
Let us run some queries on the data in the Pinot cluster. Head over to Query Console to see the querying interface.
We can see our baseballStats
table listed on the left (you will see meetupRSVP
or airlineStats
if you used the streaming or the hybrid quick start). Click on the table name to display all the names along with the data types of the columns of the table.
You can also execute a sample query select * from baseballStats limit 10
by typing it in the text box and clicking the Run Query button.
Cmd + Enter
can also be used to run the query when focused on the console.
You can also try out the following queries:
Pinot supports a subset of standard SQL. For more information, see Pinot Query Language.
The Pinot Admin UI contains all the APIs that you will need to operate and manage your cluster. It provides a set of APIs for Pinot cluster management including health check, instances management, schema and table management, data segments management.
Let's check out the tables in this cluster by going to Table -> List all tables in cluster, click Try it out, and then click Execute. We can see thebaseballStats
table listed here. We can also see the exact cURL call made to the controller API.
You can look at the configuration of this table by going to Tables -> Get/Enable/Disable/Drop a table, click Try it out, type baseballStats
in the table name, and then click Execute.
Let's check out the schemas in the cluster by going to Schema -> List all schemas in the cluster, click Try it out, and then click Execute. We can see a schema called baseballStats
in this list.
Take a look at the schema by going to Schema -> Get a schema, click Try it out, type baseballStats
in the schema name, and then click Execute.
Finally, let's check out the data segments in the cluster by going to Segment -> List all segments, click Try it out, type in baseballStats
in the table name, and then click Execute. There's 1 segment for this table, called baseballStats_OFFLINE_0
.
To learn how to upload your own data and schema, see Batch Ingestion or Stream ingestion.
This guide will show you to run a Pinot Cluster using Docker.
In this guide we will learn about running Pinot in Docker.
This guide assumes that you have installed Docker and have configured it with enough memory. A sample config is shown below:
The latest Pinot Docker image is published at apachepinot/pinot:latest
and you can see a list of all published tags on Docker Hub.
You can pull the Docker image onto your machine by running the following command:
Or if you want to use a specific version:
Now that we've downloaded the Pinot Docker image, it's time to set up a cluster. There are two ways to do this:
Pinot comes with quick-start commands that launch instances of Pinot components in the same process and import pre-built datasets.
For example, the following quick-start launches Pinot with a baseball dataset pre-loaded:
For a list of all the available quick starts, see the Quick Start Examples.
The quick start scripts launch Pinot with minimal resources. If you want to play with bigger datasets (more than a few MB), you can launch each of the Pinot components individually.
Create an isolated bridge network in docker
Start Zookeeper in daemon mode. This is a single node zookeeper setup. Zookeeper is the central metadata store for Pinot and should be set up with replication for production use. For more information, see Running Replicated Zookeeper.
Start Pinot Controller in daemon and connect to Zookeeper.
The command below expects a 4GB memory container. Tune-Xms
and-Xmx
if your machine doesn't have enough resources.
Start Pinot Broker in daemon and connect to Zookeeper.
The command below expects a 4GB memory container. Tune-Xms
and-Xmx
if your machine doesn't have enough resources.
Start Pinot Server in daemon and connect to Zookeeper.
The command below expects a 16GB memory container. Tune-Xms
and-Xmx
if your machine doesn't have enough resources.
Optionally, you can also start Kafka for setting up realtime streams. This brings up the Kafka broker on port 9092.
Now all Pinot related components are started as an empty cluster.
You can run the below command to check container status.
Sample Console Output
Create a file called docker-compose.yml that contains the following:
Run the following command to launch all the components:
You can run the below command to check the container status.
Sample Console Output
Once your cluster is up and running, you can head over to Exploring Pinot to learn how to run queries against the data.
If you have minikube or Docker Kubernetes installed, you could also try running the Kubernetes quick start.
Note: These are sample configs to be used as references. For production setup, you may want to customize it to your needs.
This quick start guide will help you bootstrap a Pinot standalone instance on your local machine.
In this guide, you'll learn how to download and install Apache Pinot as a standalone instance.
First, let's download the Pinot distribution for this tutorial. You can either download a packaged release or build a distribution from the source code.
Prerequisites
Install JDK11 or higher (JDK16 is not yet supported) For JDK 8 support use Pinot 0.7.1 or compile from the source code.
You can build from source or download the distribution:
Download the latest binary release from Apache Pinot, or use this command
Once you have the tar file:
You can find older versions of Apache Pinot at https://archive.apache.org/dist/pinot/. For example, if you wanted to download Pinot 0.10.0, you could run the following command:
Follow these steps to checkout code from Github and build Pinot locally
Prerequisites
Install Apache Maven 3.6 or higher
Add maven option -Djdk.version=8
when building with JDK 8
Note that Pinot scripts is located under pinot-distribution/target not target directory under root.
Currently Apache Pinot doesn't provide official binaries for M1 Mac. You can however build from source using the steps provided above. In addition to the steps, you will need to add the following in your ~/.m2/settings.xml
prior to the build.
Also make sure to install rosetta
softwareupdate --install-rosetta
Note that some installations of the JDK do not contain the JNI bindings that are necessary to run all tests, if you see any java.lang.UnsatisfiedLinkError
while running tests, you may need to change your JDK. If using Homebrew, you may install AdoptOpenJDK 11 using: brew install --cask adoptopenjdk11
Now that we've downloaded Pinot, it's time to set up a cluster. There are two ways to do this:
Pinot comes with quick-start commands that launch instances of Pinot components in the same process and import pre-built datasets.
For example, the following quick-start launches Pinot with a baseball dataset pre-loaded:
For a list of all the available quick starts, see the Quick Start Examples.
If you want to play with bigger datasets (more than a few MB), you can launch all the components individually.
The video below is a step-by-step walk through for launching the individual components of Pinot and scaling them to multiple instances.
You can find the commands that are shown in this video in the github.com/npawar/pinot-tutorial GitHub repository.
The examples below assume that you are using Java 8.
If you are using Java 11+ users, remove the GC settings insideJAVA_OPTS.
So, for example, instead of:
You'd have:
You can use Zooinspector to browse the Zookeeper instance.
Once your cluster is up and running, you can head over to Exploring Pinot to learn how to run queries against the data.
Starting a pinot component of interest in IntelliJ using debug mode can be useful for development purposes. You can set break points and inspect variables. Take debugging server for example, one can start zookeeper
, controller
, and broker
using the steps in Manual Cluster. Then use the following configuration put under $PROJECT_DIR$\.run
) to start server. This commit is an example of how it can be used. Please replace the metrics-core version and cluster name as needed.
The Docker instructions on this page are still WIP
So far, we setup our cluster, ran some queries on the demo tables and explored the admin endpoints. We also uploaded some sample batch data for transcript table.
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Now that we have our table and schema, let's upload them to the cluster. As soon as the realtime table is created, it will begin ingesting from the Kafka topic.
Here's a JSON file for transcript table data:
Push sample JSON into Kafka topic, using the Kafka script from the Kafka download
This starter provides a quick start for running Pinot on Google Cloud Platform (GCP)
For Mac User
Please check kubectl version after installation.
QuickStart scripts are tested under kubectl client version v1.16.3 and server version v1.13.12
For Mac User
Please check helm version after installation.
This QuickStart provides helm supports for helm v3.0.0 and v2.12.1. Please pick the script based on your helm version.
Install Google Cloud SDK
Restart your shell
Below script will create a 3 nodes cluster named pinot-quickstart in us-west1-b with n1-standard-2 machines for demo purposes.
Please modify the parameters in the example command below:
You can monitor cluster status by command:
Once the cluster is in RUNNING status, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
To verify the connection, you can run:
Step-by-step guide on pushing your own data into the Pinot cluster
Let's gather our data files and put them in pinot-quick-start/rawdata
.
Supported file formats are CSV, JSON, AVRO, PARQUET, THRIFT, ORC. If you don't have sample data, you can use this sample CSV.
Briefly, we categorize our columns into 3 types
For example, in our sample table, the studentID,firstName,lastName,gender,subject
columns are the dimensions, the score
column is the metric and timestampInEpoch
is the time column.
Once you have identified the dimensions, metrics and time columns, create a schema for your data, using the reference below.
Here's the table config for the sample CSV file. You can use this as a reference to build your own table config. Simply edit the tableName and schemaName.
Check the directory structure so far
Upload the table config using the following command
To generate a segment, we need to first create a job spec yaml file. JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. You can just copy over this job spec file. If you're using your own data, be sure to 1) replace transcript
with your table name 2) set the right recordReaderSpec
Use the following command to generate a segment and upload it
Sample output
Now, it's time to ingest from a sample stream into Pinot. The rest of the instructions assume you're using .
First, we need to setup a stream. Pinot has out-of-the-box realtime ingestion support for Kafka. Other streams can be plugged in, more details in .
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9876
using the same Zookeeper from the quick-start examples
Create a Kafka topic
Download the latest . Create a topic
If you followed the , you have already pushed a schema for your sample table. If not, head over to on that page, to learn how to create a schema for your sample data.
If you followed , you learnt how to push an offline table and schema. Similar to the offline table config, we will create a realtime table config for the sample. Here's the realtime table config for the transcript table. For a more detailed overview about table, checkout .
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the realtime data
This document provides the basic instruction to set up a Kubernetes Cluster on
Please follow this link () to install kubectl.
Please follow this link () to install helm.
Please follow this link () to install Google Cloud SDK.
Please follow this to deploy your Pinot Demo.
So far, we have set up our cluster, ran some queries, and explored the admin endpoints. Now, it's time to get our own data into Pinot. The rest of the instructions assume you're using .
Schema is used to define the columns and data types of the Pinot table. A detailed overview of the schema can be found in .
A table config is used to define the config related to the Pinot table. A detailed overview of the table can be found in .
Check out the table config and schema in the to make sure it was successfully uploaded.
A Pinot table's data is stored as Pinot segments. A detailed overview of the segment can be found in .
Check that your segment made it to the table using the
You're all set! You should see your table in the and be able to run queries against it now.
Dimensions
Typically used in filters and group by, for slicing and dicing into data
Metrics
Typically used in aggregations, represents the quantitative data
Time
Optional column, represents the timestamp associated with each row
While Pinot can work with segments of various sizes, for optimal use of Pinot, you want to get your segments sized in the 100MB to 500MB (un-tarred/uncompressed) range. Please note that having too many (thousands or more) of tiny segments for a single table just creates more overhead in terms of the metadata storage in Zookeeper as well as in the Pinot servers' heap. At the same time, having too few really large (GBs) segments reduces parallelism of query execution, as on the server side, the thread parallelism of query execution is at segment level.
Yes. Each table can be independently configured to consume from any given Kafka topic, regardless of whether there are other tables that are also consuming from the same Kafka topic.
Pinot automatically detects new partitions in Kafka topics. It checks for new partitions whenever RealtimeSegmentValidationManager periodic job runs and starts consumers for new partitions.
You can configure the interval for this job using thecontroller.realtime.segment.validation.frequencyPeriod
property in controller configuration.
Setup partitioner in the Kafka producer: https://docs.confluent.io/current/clients/producer.html
The partitioning logic in the stream should match the partitioning config in Pinot. Kafka uses murmur2
, and the equivalent in Pinot is Murmur
function.
Set partitioning config as below using same column used in Kafka
and also set
More details about how partitioner works in Pinot here.
For JSON, you can use hex encoded string to ingest BYTES
See the json_format(field) function which can store a top level json field as a STRING in Pinot.
Then you can use these json functions during query time, to extract fields from the json string.
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
Support for flattening during ingestion is on the roadmap: https://github.com/apache/pinot/issues/5264
To use explicit code points, you must double-quote (not single-quote) the string, and escape the code point via "\uHHHH", where HHHH is the four digit hex code for the character. See https://yaml.org/spec/spec.html#escaping/in%20double-quoted%20scalars/ for more details.
By default, Pinot limits the length of a String column to 512 bytes. If you want to overwrite this value, you can set the maxLength attribute in the schema as follows:
Events are available to queries as soon as they are ingested. This is because events are instantly indexed in memory upon ingestion.
The ingestion of events into the real-time table is not transactional, so replicas of the open segment are not immediately consistent. Pinot trades consistency for availability upon network partitioning (CAP theorem) to provide ultra-low ingestion latencies at high throughput.
However, when the open segment is closed and its in-memory indexes are flushed to persistent storage, all its replicas are guaranteed to be consistent, with the commit protocol.
This typically happens if
The consumer is lagging a lot
The consumer was down (server down, cluster down), and the stream moved on, resulting in offset not found when consumer comes back up.
In case of Kafka, to recover, set property "auto.offset.reset":"earliest" in the streamConfigs section and reset the CONSUMING segment. See Realtime table configs for more details about the config.
You can also also use the "Resume Consumption" endpoint with "resumeFrom" parameter set to "smallest" (or "largest" if you want). Refer to Pause Stream Ingestion for more details.
Inverted indexes are set in the tableConfig's tableIndexConfig -> invertedIndexColumns list. For documentation on table config, see Table Config Reference. For an example showing how to configure an inverted index, see Inverted Index.
Applying inverted indexes to a table config will generate an inverted index for all new segments. To apply the inverted indexes to all existing segments, see How to apply an inverted index to existing segments?
Add the columns you wish to index to the tableIndexConfig-> invertedIndexColumns list. To update the table config use the Pinot Swagger API: http://localhost:9000/help#!/Table/updateTableConfig
Invoke the reload API: http://localhost:9000/help#!/Segment/reloadAllSegments
Once you've done that, you can check whether the index has been applied by querying the segment metadata API at http://localhost:9000/help#/Segment/getServerMetadata. Don't forget to include the names of the column on which you have applied the index.
The output from this API should look something like the following:
Not all indexes can be retrospectively applied to existing segments.
If you want to add or change the sorted index column or adjust the dictionary encoding of the default forward index you will need to manually re-load any existing segments.
Star-tree indexes are configured in the table config under the tableIndexConfig -> starTreeIndexConfigs (list) and enableDefaultStarTree (boolean). Read more about how to configure star-tree indexes: https://docs.pinot.apache.org/basics/indexing/star-tree-index#index-generation
The new segments will have star-tree indexes generated after applying the star-tree index configs to the table config. Currently, Pinot does not support adding star-tree indexes to the existing segments.
Pinot does not require ordering of event time stamps. Out of order events are still consumed and indexed into the "currently consuming" segment. In a pathological case, if you have a 2 day old event come in "now", it will still be stored in the segment that is open for consumption "now". There is no strict time-based partitioning for segments, but star-indexes and hybrid tables will handle this as appropriate.
See the Components > Broker for more details about how hybrid tables handle this. Specifically, the time-boundary is computed as max(OfflineTIme) - 1 unit of granularity
. Pinot does store the min-max time for each segment and uses it for pruning segments, so segments with multiple time intervals may not be perfectly pruned.
When generating star-indexes, the time column will be part of the star-tree so the tree can still be efficiently queried for segments with multiple time intervals.
max(OfflineTime)
to determine the time-boundary, and instead using an offset?This lets you have an old event up come in without building complex offline pipelines that perfectly partition your events by event timestamps. With this offset, even if your offline data pipeline produces segments with a maximum timestamp, Pinot will not use the offline dataset for that last chunk of segments. The expectation is if you process offline the next time-range of data, your data pipeline will include any late events.
It might seem odd that segments are not strictly time-partitioned, unlike similar systems such as Apache Druid. This allows real-time ingestion to consume out-of-order events. Even though segments are not strictly time-partitioned, Pinot will still index, prune, and query segments intelligently by time intervals for the performance of hybrid tables and time-filtered data.
When generating offline segments, the segments generated such that segments only contain one time interval and are well partitioned by the time column.
FAQ for general questions around Pinot
When data is pushed in to Pinot, it makes a backup copy of the data and stores it on the configured deep-storage (S3/GCP/ADLS/NFS/etc). This copy is stored as tar.gz Pinot segments. Note, that pinot servers keep a (untarred) copy of the segments on their local disk as well. This is done for performance reasons.
Pinot uses Apache Helix for cluster management, which in turn is built on top of Zookeeper. Helix uses Zookeeper to store the cluster state, including Ideal State, External View, Participants, etc. Besides that, Pinot uses Zookeeper to store other information such as Table configs, schema, Segment Metadata, etc.
Please check the JDK version you are using. The release 0.8.0 binary is on JDK 11. You may be getting this error if you are using JDK8. In that case, please consider using JDK11, or you will need to download the source code for the release and build it locally.
This page has a collection of frequently asked questions with answers from the community.
This is a list of frequent questions most often asked in our troubleshooting channel on Slack. Please feel free to contribute your questions and answers here and make a pull request.
Below is an example of AWS EKS.
In the K8s cluster, check the storage class: in AWS, it should be gp2.
Then update StorageClass to ensure:
Once StorageClass is updated, it should be like:
Once the storage class is updated, then we can update PVC for the server disk size.
Now we want to double the disk size for pinot-server-3.
Below is an example of current disks:
Below is the output of data-pinot-server-3
Now, let's change the PVC size to 2T by editing the server PVC.
Once updated, the spec's PVC size is updated to 2T, but the status's PVC size is still 1T.
Restart pinot-server-3 pod:
Recheck PVC size:
Pinot offers various ways to assist with troubleshooting and debugging problems that might happen. It is recommended to start off with the debug api which may quickly surface some of the commonly occurring problems. The debug api provides information such as tableSize, ingestion status, any error messages related to state transition in server, among other things.
The table debug api can be invoked via the Swagger UI as follows:
It can also be invoked directly by accessing the URL as follows. The api requires the tableName
, and can optionally take tableType (offline|realtime)
and verbosity
level.
Pinot also provides a wide-variety of operational metrics that can be used for creating dashboards, alerting and monitoring. Also, all pinot components log debug information related to error conditions that can be used for troubleshooting.
Please use these steps:
If the query executes, look at the query result. Specifically look at numEntriesScannedInFilter
and numDocsScanned
.
If numEntriesScannedInFilter
is very high, consider adding indexes for the corresponding columns being used in the filter predicates. You should also think about partitioning the incoming data based on the dimension most heavily used in your filter queries.
If numDocsScanned
is very high, that means the selectivity for the query is low and lots of documents need to be processed after the filtering. Consider refining the filter to increase the selectivity of the query.
If the query is not executing, you can extend the query timeout by appending a timeoutMs
parameter to the query (eg: select * from mytable limit 10 option(timeoutMs=60000)
). Then you can repeat step 1.
You can also look at GC stats for the corresponding Pinot servers. If a particular server seems to be running full GC all the time, you can do a couple of things such as
Increase JVM heap (Xmx)
Consider using off-heap memory for segments
Decrease the total number of segments per server (by partitioning the data in a better way)
Pinot supports Apache spark as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.
We support both Spark 2.X and 3.X
If you do build Pinot from Source, you should consider opting into using the build-shaded-jar
jar profile with -Pbuild-shaded-jar
. While Pinot does not bundle spark into its jar, it does bundle certain hadoop libraries.
You can check out the sample job spec here.
To run Spark ingestion, you need the following jars in your classpath
pinot-batch-ingestion-spark
plugin jar - available in plugins-external
directory in the package
pinot-all
jar - available in lib
directory in the package
These jars can be specified using spark.driver.extraClassPath
or any other option.
For loading any other plugins that you want to use, you can use -
The complete spark-submit command should look as follows
Please ensure environment variables PINOT_ROOT_DIR
and PINOT_VERSION
are set properly.
Note: You should change the master
to yarn
and deploy-mode
to cluster
for production environments.
If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -
Build Pinot from source with option -DuseProvidedHadoop
Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.
Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files
argument in the command
Add --jars
options that contain the s3/hdfs paths to all the required plugin and pinot-all jar
Point classPath
to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file
Example
For Spark 3.x, replace pinot-batch-ingestion-spark-2.4
with pinot-batch-ingestion-spark-3.2
in all places in the commands.
Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark.
to
org.apache.pinot.plugin.ingestion.batch.spark3.
Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
Q - I am not able to find pinot-batch-ingestion-spark
jar.
For Pinot version prior to 0.10.0, the spark plugin is located in plugin
dir of binary distribution. For 0.10.0 and later, it is located in pinot-external
dir.
Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException
This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath
and spark.executor.classpath
are set. Also, the jars in driver.classpath
should be added to --jars
argument in spark-submit
so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\
but it can be different dependening on your cluster setup.
Q - Spark job failing while pushing the segments.
It can be because of misconfigured controllerURI
in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.
Q - My data gets overwritten during ingestion.
Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*
Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins
from spark.driver.extraJavaOptions
should fix this. As long as plugins are mentioned in classpath and jars
argument it should not be an issue.
Q - Getting Class not found:
exception
Please check if extraClassPath
arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars
argument. If both of these are correct, please check if the extraClassPath
contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.
You can follow the to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
Next, you need to change the execution config in the to the following -
We have stopped including spark-core
dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark
in case of any runtime issues. You can either or download latest master build jars.
Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the java8 version of pinot. Currently, you need to .
Set to APPEND
in the tableConfig.
If already set to APPEND
, this is likely due to a missing timeColumnName
in your table config. If you can't provide a time column, please use our in ingestion spec. Generally using inputFile
segment name generator should fix your issue.