Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page contains guides related to importing data from Apache Kafka using stream ingestion.
Loading...
This section contains a collection of short guides to show you how to import from a Pinot supported file system.
Loading...
Loading...
Loading...
This section contains a collection of guides that will show you how to import data from a Pinot supported input format.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This section contains articles that provide technical and implementation details of Pinot features
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Here you will find a collection of ready-made sample applications and examples for real-world data
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Learn about the various components of Pinot and terminologies used to describe data stored in Pinot
Pinot is designed to deliver low latency queries on large datasets. In order to achieve this performance, Pinot stores data in a columnar format and adds additional indices to perform fast filtering, aggregation and group by.
Raw data is broken into small data shards and each shard is converted into a unit known as a segment. One or more segments together form a table, which is the logical container for querying Pinot using SQL/PQL.
Pinot uses a variety of terms which can refer to either abstractions that model the storage of data or infrastructure components that drive the functionality of the system.
Similar to traditional databases, Pinot has the concept of a table—a logical abstraction to refer to a collection of related data. As is the case with RDBMS, a table is a construct that consists of columns and rows (documents) that are queried using SQL. A table is associated with a schema which defines the columns in a table as well as their data types.
As opposed to RDBMS schemas, multiple tables can be created in Pinot (real-time or batch) that inherit a single schema definition. Tables are independently configured for concerns such as indexing strategies, partitioning, tenants, data sources, and/or replication.
Pinot has a distributed systems architecture that scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, all data needs to be distributed across multiple nodes. Pinot achieves this by breaking data into smaller chunks known as segments **(this is similar to shards/partitions in HA relational databases). Segments can also be seen as time-based partitions.
In order to support multi-tenancy, Pinot has first class support for tenants. A table is associated with a tenant. This allows all tables belonging to a particular logical namespace to be grouped under a single tenant name and isolated from other tenants. This isolation between tenants provides different namespaces for applications and teams to prevent sharing tables or schemas. Development teams building applications will never have to operate an independent deployment of Pinot. An organization can operate a single cluster and scale it out as new tenants increase the overall volume of queries. Developers can manage their own schemas and tables without being impacted by any other tenant on a cluster.
By default, all tables belong to a default tenant named "default". The concept of tenants is very important, as it satisfies the architectural principle of a "database per service/application" without having to operate many independent data stores. Further, tenants will schedule resources so that segments (shards) are able to restrict a table's data to reside only on a specified set of nodes. Similar to the kind of isolation that is ubiquitously used in Linux containers, compute resources in Pinot can be scheduled to prevent resource contention between tenants.
Logically, a cluster is simply a group of tenants. As with the classical definition of a cluster, it is also a grouping of a set of compute nodes. Typically, there is only one cluster per environment/data center. There is no needed to create multiple clusters since Pinot supports the concept of tenants. At LinkedIn, the largest Pinot cluster consists of 1000+ nodes distributed across a data center. The number of nodes in a cluster can be added in a way that will linearly increase performance and availability of queries. The number of nodes and the compute resources per node will reliably predict the QPS for a Pinot cluster, and as such, capacity planning can be easily achieved using SLAs that assert performance expectations for end-user applications.
Auto-scaling is also achievable, however, a set amount of nodes is recommended to keep QPS consistent when query loads vary in sudden unpredictable end-user usage scenarios.
A Pinot cluster is comprised of multiple distributed system components. These components are useful to understand for operators that are monitoring system usage or are debugging an issue with a cluster deployment.
Controller
Server
Broker
Minion (optional)
The benefits of scale that make Pinot linearly scalable for an unbounded number of nodes is made possible through its integration with Apache Zookeeper and Apache Helix.
Helix is a cluster management solution that was designed and created by the authors of Pinot at LinkedIn. Helix drives the state of a Pinot cluster from a transient state to an ideal state, acting as the fault-tolerant distributed state store that guarantees consistency. Helix is embedded as agents that operate within a controller, broker, and server, and does not exist as an independent and horizontally scaled component.
A controller is the core orchestrator that drives the consistency and routing in a Pinot cluster. Controllers are horizontally scaled as an independent component (container) and has visibility of the state of all other components in a cluster. The controller reacts and responds to state changes in the system and schedules the allocation of resources for tables, segments, or nodes. As mentioned earlier, Helix is embedded within the controller as an agent that is a participant responsible for observing and driving state changes that are subscribed to by other components.
In addition to cluster management, resource allocation, and scheduling, the controller is also the HTTP gateway for REST API administration of a Pinot deployment. A web-based query console is also provided for operators to quickly and easily run SQL/PQL queries.
A broker receives queries from a client and routes their execution to one or more Pinot servers before returning a consolidated response.
Servers host segments (shards) that are scheduled and allocated across multiple nodes and routed on an assignment to a tenant (there is a single tenant by default). Servers are independent containers that scale horizontally and are notified by Helix through state changes driven by the controller. A server can either be a real-time server or an offline server.
A real-time and offline server have very different resource usage requirements, where real-time servers are continually consuming new messages from external systems (such as Kafka topics) that are ingested and allocated on segments of a tenant. Because of this, resource isolation can be used to prioritize high-throughput real-time data streams that are ingested and then made available for query through a broker.
Pinot minion is an optional component that can be used to run background tasks such as "purge" for GDPR (General Data Protection Regulation). As Pinot is an immutable aggregate store, records containing sensitive private data need to be purged on a request-by-request basis. Minion provides a solution for this purpose that complies with GDPR while optimizing Pinot segments and building additional indices that guarantees performance in the presence of the possibility of data deletion. One can also write a custom task that runs on a periodic basis. While it's possible to perform these tasks on the Pinot servers directly, having a separate process (Minion) lessens the overall degradation of query latency as segments are impacted by mutable writes.
Introduction to Apache Pinot, a real-time distributed OLAP datastore.
Pinot is a real-time distributed OLAP datastore, built to deliver scalable real-time analytics with low latency. It can ingest from batch data sources (such as Hadoop HDFS, Amazon S3, Azure ADLS, Google Cloud Storage) as well as stream data sources (such as Apache Kafka).
Pinot was built by engineers at LinkedIn and Uber and is designed to scale up and out with no upper bound. Performance always remains constant based on the size of your cluster and an expected query per second (QPS) threshold.
Join us in our Slack channel for questions, troubleshooting, and feedback. We'd love to hear from you. https://communityinviter.com/apps/apache-pinot/apache-pinot
Our documentation is structured to let you quickly get to the content you need and is organized around the different concerns of users, operators, and developers. If you're new to Pinot and want to learn things by example, please take a look at our getting started section.
To start importing data into Pinot, check out our guides on batch import and stream ingestion based on our plugin architecture.
Pinot works very well for querying time series data with many dimensions and metrics over a vast unbounded space of records that scales linearly on a per node basis. Filters and aggregations are both easy and fast.
Pinot supports SQL for querying read-only data. Learn more about querying Pinot for time series data in our PQL (Pinot Query Language) guide.
Pinot may be deployed to and operated on a cloud provider or a local or virtual machine. You may get started either with a bare-metal installation or a Kubernetes one (either locally or in the cloud). To get immediately started with Pinot, check out these quick start guides for bootstrapping a Pinot cluster using Docker or Kubernetes.
For a high-level overview that explains how Pinot works, please take a look at our basic concepts section.
To understand the distributed systems architecture that explains Pinot's operating model, please take a look at our basic architecture section.
This section focuses on answering the most frequently asked questions for people exploring the newly evolving category of distributed OLAP engines. Pinot was created by authors at both Uber and LinkedIn and has been hardened and battle tested at the very highest of load and scale.
While Pinot doesn't match the typical mold of a database product, it is best understood based on your role as either an analyst, data scientist, or application developer.
Enterprise business intelligence
For analysts and data scientists, Pinot is best viewed as a highly-scalable data platform for business intelligence. In this view, Pinot converges big data platforms with the traditional role of a data warehouse, making it a suitable replacement for analysis and reporting.
Enterprise application development
For application developers, Pinot is best viewed as an immutable aggregate store that sources events from streaming data sources, such as Kafka, and makes it available for query using SQL.
As is the case with a microservice architecture, data encapsulation ends up requiring each application to provision its own data store, as opposed to sharing one OLTP database for reads and writes. In this case, it becomes difficult to query the complete view of a domain because it becomes stored in many different databases. This is costly in terms of performance, since it requires joins across multiple microservices that expose their data over HTTP under a REST API. To prevent this, Pinot can be used to aggregate all of the data across a microservice architecture into one easily queryable view of the domain.
Pinot tenants prevent any possibility of sharing ownership of database tables across microservice teams. Developers can create their own query models of data from multiple systems of record depending on their use case and needs. As with all aggregate stores, query models are eventually consistent and immutable.
Company
Notes
Pinot originated at LinkedIn and it powers more 50+ user facing applications such as Who Viewed My Profile, Talent Analytics, Company Analytics, Ad Analytics and many more. Pinot also serves as the backend for to visualize and monitor 10,000+ business metrics.
Pinot runs on 1000+ nodes serving 100k+ queries while ingesting 1.5M+ events per second.
Uber
Microsoft
Microsoft Teams uses Pinot for analytics on Teams product usage data.
Weibo uses Pinot for realtime analytics on CDN & Weibo Video data to make business decisions, optimize service performance and improve user experience.
Factual
A column-oriented database with various compression schemes such as Run Length, Fixed Bit Length
Pluggable indexing technologies - Sorted Index, Bitmap Index, Inverted Index
Ability to optimize query/execution plan based on query and segment metadata
Near real time ingestion from streams and batch ingestion from Hadoop
SQL-like language that supports selection, aggregation, filtering, group by, order by, distinct queries on data
Support for multi-valued fields
Horizontally scalable and fault-tolerant
Pinot is designed to execute OLAP queries with low latency. It is suited in contexts where fast analytics, such as aggregations, are needed on immutable data, possibly, with real-time data ingestion.
User facing Analytics Products
Pinot was originally built at LinkedIn to power rich interactive real-time analytic applications such as Who Viewed Profile, Company Analytics, Talent Insights, and many more. UberEats Restaurant Manager is another example of a customer facing Analytics App. At LinkedIn, Pinot powers 50+ user-facing products, ingesting millions of events per second and serving 100k+ queries per second at millisecond latency.
Real-time Dashboard for Business Metrics
Pinot can be also be used to perform typical analytical operations such as slice and dice, drill down, roll up, and pivot on large scale multi-dimensional data. For instance, at LinkedIn, Pinot powers dashboards for thousands of business metrics. One can connect various BI tools such Superset, Tableau, or PowerBI to visualize data in Pinot.
Instructions to connect Pinot with Superset can found here.
Anomaly Detection
In addition to visualizing data in Pinot, one can run Machine Learning Algorithms to detect Anomalies on the data stored in Pinot. See ThirdEye for more information on how to use Pinot for Anomaly Detection and Root Cause Analysis.
Learn about the different components and logical abstractions
This section is a reference for the definition of major components and logical abstractions used in Pinot. Please visit the Basic Concepts section to get a general overview that ties together all of the reference material in this section.
Cluster is a set a nodes comprising of servers, brokers, controllers and minions.
Pinot leverages Apache Helix for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.
Briefly, Helix divides nodes into three logical components based on their responsibilities
The nodes that host distributed, partitioned resources
The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.
Pinot Servers are modeled as Participants, more details about server nodes can be found in Server. Pinot Brokers are modeled as Spectators, more details about broker nodes can be found in Broker. Pinot Controllers are modeled as Controllers, more details about controller nodes can be found in Controller.
Another way to visualize the cluster is a logical view, wherein a cluster contains tenants, tenants contain tables, and tables contain segments.
Typically, there is only cluster per environment/data center. There is no needed to create multiple Pinot clusters since Pinot supports the concept of tenants. At LinkedIn, the largest Pinot cluster consists of 1000+ nodes.
To setup a Pinot cluster, we need to first start Zookeeper.
Create an isolated bridge network in docker
Start Zookeeper in daemon.
Start ZKUI to browse Zookeeper data at http://localhost:9090.
Download Pinot Distribution using instructions in Download
Install zooinspector to view the data in Zookeeper, and connect to localhost:2181
Once we've started Zookeeper, we can start other components to join this cluster. If you're using docker, pull the latest apachepinot/pinot
image.
You can try out pre-built Pinot all-in-one docker image.
(Optional) You can also follow the instructions here to build your own images.
To start other components to join the cluster
Explore your cluster via Pinot Data Explorer
This page covers everything you need to know about how queries are computed in Pinot's distributed systems architecture.
This page will introduce you to the guiding principles behind the design of Apache Pinot. Here you will learn the distributed systems architecture that allows Pinot to scale the performance of queries linearly based on the number of nodes in a cluster. You'll also be introduced to the two different types of tables used to ingest and query data in offline (batch) or real-time (stream) mode.
It's recommended that you read Basic Concepts to better understand the terms used in this guide.
Pinot was designed by engineers at LinkedIn and Uber to scale query performance based on the number of nodes in a cluster. As you add more nodes, query performance will always improve based on the expected query volume per second quota. To achieve horizontal scalability to an unbounded number of nodes and data storage, without performance degradation, the following guiding design principles were established.
Highly available: Pinot is built to serve low latency analytical queries for customer facing applications. By design, there is no single point of failure in Pinot. The system continues to serve queries when a node goes down.
Horizontally scalable: Ability to scale by adding new nodes as a workload changes.
Latency vs Storage: Pinot is built to provide low latency even at high-throughput. Features such as segment assignment strategy, routing strategy, star-tree indexing were developed to achieve this.
Immutable data: Pinot assumes that all data stored is immutable. For GDPR compliance, we provide an add-on solution for purging data while maintaining performance guarantees.
Dynamic configuration changes: Operations such as adding new tables, expanding a cluster, ingesting data, modifying indexing config, and re-balancing must be performed without impacting query availability or performance.
As described in the concepts, Pinot has multiple distributed system components: Controller, Broker, Server, and Minion.
Pinot uses Apache Helix for cluster management. Helix is embedded as an agent within the different components and uses Apache Zookeeper for coordination and maintaining the overall cluster state and health.
All Pinot servers and brokers are managed by Helix. Helix is a generic cluster management framework to manage partitions and replicas in a distributed system. It's helpful to think of Helix as an event-driven discovery service with push and pull notifications that drives the state of a cluster to an ideal configuration. A finite-state machine maintains a contract of stateful operations that drives the health of the cluster towards its optimal configuration. Query load is optimized as Helix updates routing configurations between nodes based on where data is stored in the cluster.
Helix divides nodes into three logical components based on their responsibilities:
Participant: These are the nodes in the cluster that actually host the distributed storage resources.
Spectator: These nodes observe the current state of each participant and routes requests accordingly. Routers, for example, need to know the instance on which a partition is hosted and its state in order to route the request to the appropriate endpoint. Routing is continually being changed to optimize cluster performance as storage primitives are added and changed.
Controller: The controller observes and manages the state of participant nodes. The controller is responsible for coordinating all state transitions in the cluster and ensures that state constraints are satisfied while maintaining cluster stability.
Helix uses Zookeeper to maintain cluster state. Each component in a Pinot cluster takes a Zookeeper address as a startup parameter. The various components that are distributed in a Pinot cluster will watch Zookeeper notifications and issue updates via its embedded Helix-defined agent.
Component
Helix Mapping
Segment
Table
Controller
Embeds the Helix agent that drives the overall state of the cluster.
Server
Broker
Broker is modeled as a Helix Spectator that observes the cluster for changes in the state of segments and servers. In order to support multi-tenancy, brokers are also modeled as Helix Participants.
Minion
Pinot Minion is modeled as a Helix Participant.
Helix agents use Zookeeper to store and update configurations, as well as for distributed coordination. Zookeeper stores the following information about the cluster:
Resource
Stored Properties
Controller
The controller that is assigned as the current leader
Servers/Brokers
A list of servers/brokers and their configuration
Health status
Tables
List of tables
Table configurations
Table schema information
List of segments within a table
Segment
Exact server location(s) of a segment (routing table)
State of each segment (online/offline/error/consuming)
Meta data about each segment
Knowing the ZNode layout structure in Zookeeper for Helix agents in a cluster is useful for operations and/or troubleshooting cluster state and health.
Pinot's controller acts as the driver of the cluster's overall state and health. Because of its role as a Helix participant and spectator, which drives the state of other components, it is the first component that is typically started after Zookeeper. Two parameters are required for starting a controller: Zookeeper address and cluster name. The controller will automatically create a cluster via Helix if it does not yet exist.
To achieve fault tolerance, one can start multiple controllers (typically three) and one of them will act as a leader. If the leader crashes or dies, another leader is automatically elected. Leader election is achieved using Apache Helix. Having at-least one controller is required to perform any DDL equivalent operation on the cluster, such as adding a table or a segment.
The controller does not interfere with query execution. Query execution is not impacted even when all controllers nodes are offline. If all controller nodes are offline, the state of the cluster will stay as it was when the last leader went down. When a new leader comes online, a cluster resumes re-balancing activity and can accept new tables or segments.
The controller provides a REST interface to perform CRUD operations on all logical storage resources (servers, brokers, tables, and segments).
See Pinot Data Explorer for more information on the web-based admin tool.
The responsibility of the broker is to route a given query to an appropriate server instance. A broker will collect and merge the responses from all servers into a final result and send it back to the requesting client. The broker provides HTTP endpoints that accept SQL queries and returns the response in JSON format.
Brokers need three key things to start.
Cluster name
Zookeeper address
Broker instance name
At the start, a broker registers as a Helix Participant and awaits notifications from other Helix agents. These notifications will be handled for table creation, a new segment being loaded, or a server starting up/or going down, in addition to any configuration changes.
Service Discovery/Routing Table
Irrespective of the kind of notification, the key responsibility of a broker is to maintain the query routing table. The query routing table is simply a mapping between segments and the servers that a segment resides on. Typically, a segment resides on more than one server. The broker computes multiple routing tables depending on the configured routing strategy for a table. The default strategy is to balance the query load across all available servers.
There are advanced routing strategies available such as ReplicaAware routing, partition-based routing, and minimal server selection routing. These strategies are meant for special or generic cases that are meant to serve very high throughput queries.
Query processing
For every query, a cluster's broker performs the following:
Fetches the routes that are computed for a query based on the routing strategy defined in a table's configuration.
Computes the list of segments to query from on each server.
Scatter-Gather: sends the requests to each server and gathers the responses.
Merge: merges the query results returned from each server.
Sends the query result to the client.
Fault tolerance
Broker instances scale horizontally without an upper bound. In a majority of cases, only three brokers are required. If most query results that are returned to a client are <1MB in size per query, one can run a broker and servers inside the same instance container. This lowers the overall footprint of a cluster deployment for use cases that do not need to guarantee a strict SLA on query performance in production.
Servers host segments and do most of the heavy lifting during query processing. Though the architecture shows that there are two kinds of servers, real-time and offline, a server does not really know if it's going to be a real-time server or an offline server. The responsibility of a server depends on the table assignment strategy.
In theory, a server can host both real-time segments and offline segments. However, in practice, we use different types of machine SKUs for real-time servers and offline servers. The advantage of separating real-time servers and offline servers is to allow each to scale independently.
Offline servers
Offline servers typically host segments that are immutable. In this case, segments are created outside of a cluster and uploaded via a shell-based curl request. Based on the replication factor and the segment assignment strategy, the controller picks one or more servers to host the segment. Servers are notified via Helix about the new segments. Servers fetch the segments from deep store and loads them before being ready to serve query requests. At this point, the cluster's broker detects that new segments are available and starts including them in query responses.
Real-time servers
Real-time servers are different from the offline servers. Real-time server nodes ingest data from streaming sources, such as Kafka, and generate the indexed segments in-memory (flushing segments to disk periodically). In memory segments are also known as consuming segments. These consuming segments get flushed periodically based on completion threshold (based on number of rows, time or segment size). At this point, they are known as completed segments. Completed segments are similar to the offline server's segments. Queries go over the in-flight (consuming) segments and the completed segments.
Minion is an optional component and is not required to get started with Pinot. Minion is used for purging data from a Pinot cluster (for reasons such as GDPR compliance in the UK).
Within Pinot, a logical table is modeled as one of two types of physical tables: offline or real-time. The reason for having two types of tables is because each one follows a different state model.
A real-time and offline table provide different configuration options for indexing and, in the case of real-time, the connector properties for the stream data source (i.e. Kafka). Table types also allow users to use different containers for real-time and offline server nodes. For instance, offline servers might use virtual machines with larger storage capacity where real-time servers might need higher system memory and/or more CPU cores.
The two types of tables also scale differently.
Real-time tables have a smaller retention period and scales query performance based on the ingestion rate.
Offline tables have larger retention and scales performance based on the size of stored data.
There are a few things to keep in mind when configuring the different types of tables for your workloads. When ingesting data from the same source, you can have two tables that ingest the same data that are configured differently for real-time and offline queries. Even though the two tables have the same data, performance will scale differently for queries based on your requirements. In this scenario, real-time and offline tables must share the same schema.
Tables for real-time and offline can be configured differently depending on usage requirements. For example, you can choose to enable star-tree indexing for an offline table, while the real-time table with the same schema may not need it.
In batch mode, data is ingested into Pinot via an ingestion job. An ingestion job transforms a raw data source (such as a CSV file) into segments. Once segments are generated for the imported data, an ingestion job stores them into the cluster's segment store (a.k.a deep store) and notifies the controller. The notification is processed and the result is that the Helix agent on the controller updates the ideal state configuration in Zookeeper. Helix will then notify the offline server that there are new segments available. In response to the notification from the controller, the offline server downloads the newly created segments directly from the cluster's segment store. The cluster's broker, which watches for state changes in Helix, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
At table creation, a controller creates a new entry in Zookeeper for the consuming segment. Helix notices the new segment and notifies the real-time server, which start consuming data from the streaming source. The broker, which watches for changes, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
Whenever the segment is complete (i.e. full), the real-time server notifies the Controller, which checks with all replicas and picks a winner to commit the segment to. The winner commits the segment and uploads it to the cluster's segment store, updating the state of the segment from "consuming" to "online". The controller then prepares a new segment in a "consuming" state.
Queries are received by brokers—which checks the request against the segment-to-server routing table—scattering the request between real-time and offline servers.
The two tables then process the request by filtering and aggregating the queried data, which is then returned back to the broker. Finally, the broker gathers together all of the pieces of the query response and responds back to the client with the result.
Servers host the data segments and serve queries off the data they host. There's two types of servers
Offline Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.
Realtime Real time servers directly ingest from a real time stream (such as Kafka, EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.
Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).
Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a server
>
USAGE
Brokers are the components that handle Pinot queries. They accept queries from clients and forward them to the right servers. They collect results back from the servers and consolidate them into a single response, to sent it back to the client.
Pinot Brokers are modeled as Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried. The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed. Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.
In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once, by performing offline and realtime federation. Let's take this example, we have realtime data for 5 days - March 23 to March 27, and offline data has been pushed until Mar 25, which is 2 days behind realtime. The brokers maintain this time boundary.
Suppose, we get a query to this table : select sum(metric) from table
. The broker will split the query into 2 queries based on this time boundary - one for offline and one for realtime. This query becomes - select sum(metric) from table_REALTIME where date >= Mar 25
and
select sum(metric) from table_OFFLINE where date < Mar 25
The broker then merges results from both these queries before returning back to the client.
Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a broker
Pinot Minion is a new component which leverages the . It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. It's a generic and single place for running background jobs. They help offload computationally intensive tasks—such as adding indexes to segments and merging segments—from other components.
A tenant is a logical component, defined as a group of server/broker nodes with the same Helix tag.
In order to support multi-tenancy, Pinot has first class support for tenants. Every table is associated with a server tenant and a broker tenant. This controls the nodes that will be used by this table as servers and brokers. This allows all tables belonging to a particular use case to be grouped under a single tenant name.
The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A
and Table B
in the same Pinot cluster.
We can configure Table A
with server tenant Tenant A
and Table B
with server tenant Tenant B
. We can tag some of the server nodes for Tenant A
and some for Tenant B
. This will ensure that segments of Table A
only reside on servers tagged with Tenant A
, and segment of Table B
only reside on servers tagged with Tenant B
. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
No need to create separate clusters for every table or use case!
This section contains 2 main fields broker
and server
which decide the tenants used for the broker and server components of this table.
In the above example,
The table will be served by brokers that have been tagged as brokerTenantName_BROKER
in Helix.
If this were an offline table, the offline segments for the table will be hosted in pinot servers tagged in helix as serverTenantName_OFFLINE
If this were a realtime table, the realtime segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in helix as serverTenantName_REALTIME
.
Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant
by tagging 3 untagged broker nodes as sampleBrokerTenant_BROKER
.
To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances
.
Here's a sample server tenant config. This will create a server tenant sampleServerTenant
by tagging 1 untagged server node as sampleServerTenant_OFFLINE
and 1 untagged server node as sampleServerTenant_REALTIME
.
To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances
+ realtimeInstances
.
A table is a logical abstraction to refer to a collection of related data. It consists of columns and rows (documents).
Data in Pinot tables is sharded into . A Pinot table is modeled as a Helix resource. Each segment of a table is modeled as a Helix Partition.
A table is typically associated with a , which is used to define the names, data types and other information of the columns of the table.
There are 3 types of a Pinot table
Note that the query does not know the existence of offline or realtime tables. It only specifies the table name in the query. For example, regardless of whether we have an offline table myTable_OFFLINE
, or a realtime table myTable_REALTIME
or a hybrid table containing both of these, the query will simply use mytable
as select count(*) from myTable
.
A table config file is used to define the table properties, such as name, type, indexing, routing, retention etc. It is written in JSON format, and stored in the property store in Zookeeper, along with the table schema.
Here's an example table config for an offline table
We will now discuss each section of the table config in detail.
Here's an example table config for a realtime table. All the fields from the offline table config are valid for the realtime table. Additionally, realtime tables use some extra fields.
We will now discuss the sections have some behavior differences for realtime tables.
replicasPerPartition The number of replicas per partition for the realtime stream
completionConfig
Holds information related to realtime segment completion. There is just one field in this config as of now, which is the completionMode
. The value of the completioMode
decides how non-committers servers should replace the in-memory segment during realtime segment completion. By default, if the in memory segment in the non-winner server is equivalent to the committed segment, then the non-committer server builds and replaces the segment, else it download the segment from the controller.
Currently, the supported value for completionMode
is
DOWNLOAD
: In certain scenarios, segment build can get very memory intensive. It might become desirable to enforce the non-committer servers to just download the segment from the controller, instead of building it again. Setting this completionMode ensures that the non-committer servers always download the segment.
sortedColumn Indicates the column which should be sorted when creating the realtime segment
aggregateMetrics
Aggregate the realtime stream data as it is consumed, where applicable, in order to reduce segment sizes. We sum the metric column values of all rows that have the same value for dimension columns and create one row in a realtime segment for all such rows. This feature is only available on REALTIME tables. Only supported aggregation right now is sum
. Also note that for this to work, all metrics should be listed in noDictionaryColumns
and there should not be any multi value dimensions.
Here is a minimal example of what the streamConfigs
section may look like:
There are some configurations that are generic to all stream types, and others that are specific to stream types.
realtime.segment.flush.threshold.size
: Maximum number of rows to consume before persisting the consuming segment.
Note that in the example above, it is set to 0. In this case, Pinot automatically computes the row limit using the value of realtime.segment.flush.desired.size described below. If the consumer type is HighLevel, then this value will be the maximum per consuming segment. If the consumer type is LowLevel then this value will be divided across all consumers being hosted on any one pinot-server.
Default is 5000000.
realtime.segment.flush.threshold.time
: Maximum elapsed time after which a consuming segment should be persisted.
The value can be set as a human readable string, such as
"1d"
,"4h30m"
, etc. This value should be set such that it is not below the retention of messages in the underlying stream, but is not so long that it may cause the server to run out of memory.Default is
"6h"
realtime.segment.flush.desired.size
: Desired size of the completed segments.
This setting is supported only if consumer type is set to
LowLevel
. This value can be set as a human readable string such as"150M"
, or"1.1G"
, etc. This value is used whenrealtime.segment.flush.threshold.size
is set to 0. Pinot learns and then estimates the number of rows that need to be consumed so that the persisted segment is approximately of this size. The learning phase starts by setting the number of rows to 100,000 (can be changed with the settingrealtime.segment.flush.autotune.initialRows
). and increasing to reach the desired segment size. Segment size may go over the desired size significantly during the learning phase. Pinot corrects the estimation as it goes along, so it is not guaranteed that the resulting completed segments are of the exact size as configured. You should set this value to optimize the performance of queries (i.e. neither too small nor too large)Default is
"200M"
realtime.segment.flush.autotune.initialRows
: Initial number of rows for learning.
This value is used only if
realtime.segment.flush.threshold.size
is set o0
and the consumer type isLowLevel
. Seerealtime.segment.flush.desired.size
above.Default is
"100K"
All of these configuration items have the prefix stream.<streamType>
. In the example above, the prefix is stream.kafka
.
Important ones to note here are:
stream.kafka.consumer.type
: This should have a value of LowLevel
(recommended) or HighLevel
.
stream.kafka.topic.name
: Name of the topic from which to consume.
stream.kafka.consumer.prop.auto.offset.reset
: Indicates where to start consumption from in the stream.
If the consumer type is
LowLevel
, This configuration is used only when the table is first provisioned. InHighLevel
consumer type, it will also be used when new servers are rolled in, or when existing servers are replaced with new ones. You can specify values such assmallest
orlargest
, or even3d
if your stream supports it. If you specifylargest
, the consumption starts from the most recent events in the data stream. This is the recommended way to create a new table. If you specifysmallest
then the consumption starts from the earliest event available in the data stream.
tagOverrideConfig
A tagOverrideConfig
can be added under the tenants
section for realtime tables, to override tags for consuming and completed segments. For example:
A hybrid table is simply a table composed of 2 tables, 1 of type offline and 1 of type realtime, which share the same name. In such a table, offline segments may be pushed periodically (say, once a day). The retention on the offline table can be set to a high value (say, a few years) since segments are coming in on a periodic basis, whereas the retention on the realtime part can be small (say, a few days). Once an offline segment is pushed to cover a recent time period, the brokers automatically switch to using the offline table for segments in that time period, and use realtime table only to cover later segments for which offline data may not be available yet.
Here's a sample table config for a hybrid table.
Note that creating a hybrid table has to be done in 2 separate steps of creating an offline and realtime table individually.
Prerequisites
Sample Console Output
Start Kafka
Create a Kafka Topic
Create a Streaming table
Sample output
Start Kafka-Zookeeper
Start Kafka
Create stream table
This section contains quick start guides to help you get up and running with Pinot.
We want your experience getting started with Pinot to be both low effort and high reward. Here you'll find a collection of quick start guides that contain starter distributions of the Pinot platform.
This video will show you a step-by-step walk through for launching the individual components of Pinot and scaling them to multiple instances. This is an excellent resource for developers and operators that want to understand setting up each component and debugging a cluster.
You can find the commands that are shown in this video on GitHub
We also have a step-by-step guide for manually setting up a Pinot cluster using Docker or shell scripts.
Schema is used to define the names, data types and other information for the columns of a Pinot table.
Columns in a Pinot table can be broadly categorized into three categories
A Pinot schema is written in JSON format. Here's an example which shows all the fields of a schema
The Pinot schema is composed of
Below is a detailed description of each type of field spec.
A dimensionFieldSpec is defined for each dimension column. Here's a list of the fields in the dimensionFieldSpec
A metricFieldSpec is defined for each metric column. Here's a list of fields in the metricFieldSpec
A dateTimeFieldSpec is used to define time columns of the table. Here's a list of the fields in a dateTimeFieldSpec
This has been deprecated. Older schemas containing timeFieldSpec will be supported. But for new schemas, use DateTimeFieldSpec instead.
A timeFieldSpec is defined for the time column. A timeFieldSpec is composed of an incomingGranularitySpec and an outgoingGranularitySpec. IncomingGranularitySpec in combination with outgoingGranularitySpec can be used to transform the time column from incoming format to the outgoing format. If both of them are specified, the segment creation process will convert the time column from the incoming format to the outgoing format. If no time column transformation is required, you can specify just the incomingGranularitySpec.
The incoming and outgoing granularitySpec are defined as:
Apart from these, there's some advanced fields. These are common to all field specs.
Transform functions can be defined on columns in the schema. For example:
Currently, we have support for 2 kinds of functions
Groovy functions
Inbuilt functions
Note
Currently, the arguments must be from the source data. They cannot be columns from the Pinot schema which have been created through transformations.
Groovy functions can be defined using the syntax:
Here's some examples of commonly needed functions. Any valid Groovy expression can be used.
Concat firstName
and lasName
to get fullName
Find max value in array bids
Convert timestamp
from MILLISECONDS
to HOURS
Simply change name of the column from user_id
to userId
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
We have several inbuilt functions that can be used directly in as ingestion transform functions
These are functions which enable commonly needed time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
fromEpochXXX
Converts from an epoch granularity to milliseconds.
Simple date format
Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.
This page has a collection of frequently asked questions with answers from the community.
This is a list of frequent questions most often asked in our troubleshooting channel on Slack. Please feel free to contribute your questions and answers here and make a pull request.
We have toJsonStr(key)
function which can store a top level json field as a STRING in Pinot.
Then you can use jsonExtractScalar(JSON_STRING_FIELD, JSON_PATH, OUTPUT_FORMAT)
function during query time to fetch the desired field from the json string. For example
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
Support for flattening during ingestion is on the roadmap:
"timestamp" is a reserved keyword in SQL. Escape timestamp with double quotes.
Other commonly encountered reserved keywords are date, time, table.
For filtering on STRING columns, use single quotes
The fields in the ORDER BY
clause must be one of the group by clauses or aggregations, BEFORE applying the alias. Therefore, this will not work
Instead, this will work
A rebalance is run to reassign all the segments of a table to the available servers. This is typically done when capacity changes are done i.e. adding more servers or removing servers from a table.
Offline
Realtime
You can check the status of the rebalance by
Checking the controller logs
Running rebalance again after a while, you should receive status "status": "NO_OP"
Checking the External View of the table, to see the changes in capacity/replicas have taken effect.
Yes, replica groups work for realtime. There's 2 parts to enabling replica groups:
Replica groups segment assignment
Replica group query routing
Replica group segment assignment
Replica group segment assignment is achieved in realtime, if number of servers is a multiple of number of replicas. The partitions get uniformly sprayed across the servers, creating replica groups.
For example, consider we have 6 partitions, 2 replicas, and 4 servers.
Replica group query routing
This quick start guide will help you bootstrap a Pinot standalone instance on your local machine.
In this guide you'll learn how to download and install Apache Pinot as a standalone instance.
This is a quickstart guide that will show you how to quickly start an example recipe in a standalone instance and is meant for learning. To run Pinot in cluster mode, please take a look at .
First, let's download the Pinot distribution for this tutorial. You can either build the distribution from source or download a packaged release.
Prerequisites
Install JDK8 or higher.
Follow these steps to checkout code from and build Pinot locally
Prerequisites
Install 3.6 or higher
Note that Pinot scripts is located under pinot-distribution/target not target directory under root.
Download the latest binary release from , or use this command
Once you have the tar file,
We'll be using a quick-start script, which does the following:
Sets up the Pinot cluster QuickStartCluster
Creates a sample table and loads sample data
There's 3 kinds of quick start
Batch quick start creates the pinot cluster, creates an offline table baseballStats
and pushes sample offline data to the table.
Streaming quick start sets up a Kafka cluster and pushes sample data to a Kafka topic. Then, it creates the Pinot cluster and creates a realtime table meetupRSVP
which ingests data from the Kafka topic.
Hybrid quick start sets up a Kafka cluster and pushes sample data to a Kafka topic. Then, it creates the Pinot cluster and creates a hybrid table airlineStats
. The realtime table ingests data from the Kafka topic. Lastly, sample data is pushed into the offline table.
This quick start guide will show you how to run a Pinot cluster using Docker.
Prerequisites
Install
You can also try if you already have a local cluster installed or setup.
Create an isolated bridge network in docker
We'll be using our docker image apachepinot/pinot:latest
to run this quick start, which does the following:
Sets up the Pinot cluster
Creates a sample table and loads sample data
There are 3 types of quick start examples.
Batch example
Streaming example
Hybrid example
In this example we demonstrate how to do batch processing with Pinot.
Starts Pinot deployment by starting
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
baseballStats
Launches a standalone data ingestion job
Builds one Pinot segment for a given CSV data file for table baseballStats
Pushes the built segment to the Pinot controller
Issues sample queries to Pinot
Once the Docker container is running, you can view the logs by running the following command.
That's it! We've spun up a Pinot cluster.
It may take a while for all the Pinot components to start and for the sample data to be loaded.
Use the below command to check the status in the container logs.
Your cluster is ready once you see the cluster setup completion messages and sample queries, as demonstrated below.
In this example we demonstrate how to do stream processing with Pinot.
Starts Pinot deployment by starting
Apache Kafka
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
meetupRsvp
Launches a meetup
**stream
Publishes data to a Kafka topic meetupRSVPEvents
to be subscribed to by Pinot
Issues sample queries to Pinot
In this example we demonstrate how to do hybrid stream and batch processing with Pinot.
Starts Pinot deployment by starting
Apache Kafka
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
airlineStats
Launches a standalone data ingestion job
Builds Pinot segments under a given directory of Avro files for table airlineStats
Pushes built segments to Pinot controller
Launches a **stream of flights stats
Publishes data to a Kafka topic airlineStatsEvents
to be subscribed to by Pinot
Issues sample queries to Pinot
The Pinot Controller is responsible for a number of things
Controllers maintain the global metadata (e.g. configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.
Controllers host Helix Controller and is responsible for managing other pinot components (brokers, servers, minions)
They maintain the mapping of which servers are responsible for which segments. This mapping is used by the servers, to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.
Controller has admin endpoints for viewing, creating, updating and deleting configs which help us manage and operate the cluster.
Controllers also have endpoints for segment uploads which are used in offline data pushes. They are responsible for initializing realtime consumption and coordination of persisting the realtime segments into the segment store periodically.
They undertake other management activities such as managing retention of segments, validations.
There can be multiple instances of Pinot controller for redundancy. If there are multiple controllers, Pinot expects that all of them are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .
Make sure you've . If you're using docker, make sure to . To start a controller
This page contains multiple quick start guides for deploying Pinot to a public cloud provider.
The following quick start guides will show you how to run an Apache Pinot cluster using Kubernetes on different public cloud providers.
This starter provides a quick start for running Pinot on Google Cloud Platform (GCP)
This document provides the basic instruction to set up a Kubernetes Cluster on
Please follow this link () to install kubectl.
For Mac User
Please check kubectl version after installation.
QuickStart scripts are tested under kubectl client version v1.16.3 and server version v1.13.12
Please follow this link () to install helm.
For Mac User
Please check helm version after installation.
This QuickStart provides helm supports for helm v3.0.0 and v2.12.1. Please pick the script based on your helm version.
__
Install Google Cloud SDK
Restart your shell
Below script will create a 3 nodes cluster named pinot-quickstart in us-west1-b with n1-standard-2 machines for demo purposes.
Please modify the parameters in the example command below:
You can monitor cluster status by command:
Once the cluster is in RUNNING status, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
To verify the connection, you can run:
This starter guide provides a quick start for running Pinot on Microsoft Azure
This document provides the basic instruction to set up a Kubernetes Cluster on
Please follow this link () to install kubectl.
For Mac User
Please check kubectl version after installation.
QuickStart scripts are tested under kubectl client version v1.16.3 and server version v1.13.12
Please follow this link () to install helm.
For Mac User
Please check helm version after installation.
This QuickStart provides helm supports for helm v3.0.0 and v2.12.1. Please pick the script based on your helm version.
For Mac User
Below script will open default browser to sign-in to your Azure Account.
Below script will create a resource group in location eastus.
Below script will create a 3 nodes cluster named pinot-quickstart for demo purposes.
Please modify the parameters in the example command below:
Once the command is succeed, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
To verify the connection, you can run:
This guide provides a quick start for running Pinot on Amazon Web Services (AWS).
This document provides the basic instruction to set up a Kubernetes Cluster on
Please follow this link () to install kubectl.
For Mac User
Please check kubectl version after installation.
QuickStart scripts are tested under kubectl client version v1.16.3 and server version v1.13.12
Please follow this link () to install helm.
For Mac User
Please check helm version after installation.
This QuickStart provides helm supports for helm v3.0.0 and v2.12.1. Please pick the script based on your helm version.
__
For Mac User
For Mac User
Environment variables AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
will override AWS configuration stored in file ~/.aws/credentials
Below script will create a 3 nodes cluster named pinot-quickstart in us-west-2 with t3.small machines for demo purposes.
Please modify the parameters in the example command below:
You can monitor cluster status by command:
Once the cluster is in ACTIVE status, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
To verify the connection, you can run:
Pinot quick start in Kubernetes
This quick start assumes the existence of a Kubernetes cluster. Please follow the links below to setup your Kubernetes cluster.
Before continuing, please make sure that you've downloaded Apache Pinot. The scripts for the setup in this guide can be found in our open source project on GitHub.
The scripts can be found in the Pinot source at ./incubator-pinot/kubernetes/helm
Pinot repo has pre-packaged HelmCharts for Pinot and Presto. Helm Repo index file is .
For Helm v2.12.1
If your Kubernetes cluster is recently provisioned, ensure Helm is initialized by running:
Then deploy a new HA Pinot cluster using the following command:
For Helm v3.0.0
Error: Please run the below command if encountering the following issue:
Resolution:
Error: Please run the command below if encountering a permission issue:
Error: release pinot failed: namespaces "pinot-quickstart" is forbidden: User "system:serviceaccount:kube-system:default" cannot get resource "namespaces" in API group "" in the namespace "pinot-quickstart"
Resolution:
Ensure the Kafka deployment is ready before executing the scripts in the following next steps.
The scripts below will create two Kafka topics for data ingestion:
The script below will deploy 3 batch jobs.
Ingest 19492 JSON messages to Kafka topic flights-realtime
at a speed of 1 msg/sec
Ingest 19492 Avro messages to Kafka topic flights-realtime-avro
at a speed of 1 msg/sec
Upload Pinot schema airlineStats
Create Pinot table airlineStats
to ingest data from JSON encoded Kafka topic flights-realtime
Create Pinot table airlineStatsAvro
to ingest data from Avro encoded Kafka topic flights-realtime-avro
Please use the script below to perform local port-forwarding, which will also open Pinot query console in your default web browser.
This script can be found in the Pinot source at ./incubator-pinot/kubernetes/helm
You can run below command to navigate superset in your browser with the previous admin credential.
You can open the imported dashboard by clicking Dashboards
banner and then click on AirlineStats
.
You can run the command below to deploy a customized Presto with Pinot plugin installed.
Once Presto is deployed, you can run the command below.
List all catalogs
List All tables
Show schema
Count total documents
This quick start guide will show you how to set up a Pinot cluster manually.
You can try out pre-built Pinot all-in-one docker image.
(Optional) You can also follow the instructions to build your own images.
Create an isolated bridge network in docker
Start Zookeeper in daemon mode. This is a single node zookeeper setup. Zookeeper is the central metadata store for Pinot and should be set up with replication for production use. See for more information.
Start Pinot Controller in daemon and connect to Zookeeper.
Start Pinot Broker in daemon and connect to Zookeeper.
Start Pinot Server in daemon and connect to Zookeeper.
Optionally, you can also start Kafka for setting up realtime streams. This brings up the Kafka broker on port 9092.
Now all Pinot related components are started as an empty cluster.
You can run below command to check container status.
Sample Console Output
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
The Docker instructions on this page are still WIP
So far, we setup our cluster, ran some queries on the demo tables and explored the admin endpoints. We also uploaded some sample batch data for transcript table.
Now, it's time to ingest from a sample stream into Pinot.
First, we need to setup a stream. Pinot has out-of-the-box realtime ingestion support for Kafka. Other streams can be plugged in, more details in .
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
Create a Kafka Topic
Start Kafka
Start Kafka cluster on port 9876
using the same Zookeeper from the quick-start examples
Create a Kafka topic
Download the latest . Create a topic
Now that we have our table and schema, let's upload them to the cluster. As soon as the realtime table is created, it will begin ingesting from the Kafka topic.
Here's a JSON file for transcript table data:
Push sample JSON into Kafka topic, using the Kafka script from the Kafka download
This section is an overview of the various options for importing data into Pinot.
There are multiple options for importing data into Pinot. These guides are ready-made examples that show you step-by-step instructions for importing records into Pinot, supported by our . These guides are meant to get you up and running with imported data as quick as possible. Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a ready-made dataset so you can see how things work without needing to bring your own dataset.
These guides will show you how to import data from a supported file system.
These guides will show you how to import data from a Pinot supported input format.
This guide will show you how to import data using stream ingestion from Apache Kafka topics.
This guide shows you how to import data from files stored in Azure Data Lake Storage (ADLS)
Step-by-step guide on pushing your own data into the Pinot cluster
So far, we setup our cluster, ran some queries, explored the admin endpoints. Now, it's time to get our own data into Pinot
Let's gather our data files and put it in pinot-quick-start/rawdata
.
Supported file formats are CVS, JSON, AVRO, PARQUET, THRIFT, ORC. If you don't have sample data, you can use this sample CSV.
Schema is used to define the columns and data types of the Pinot table. A detailed overview of the schema can be found in .
Briefly, we categorize our columns into 3 types
For example, in our sample table, the playerID, yearID, teamID, league, playerName
columns are the dimensions, the playerStint, numberOfgames, numberOfGamesAsBatter, AtBatting, runs, hits, doules, triples, homeRuns, runsBattedIn, stolenBases, caughtStealing, baseOnBalls, strikeouts, intentionalWalks, hitsByPitch, sacrificeHits, sacrificeFlies, groundedIntoDoublePlays, G_old
columns are the metrics and there is no time column.
Once you have identified the dimensions, metrics and time columns, create a schema for your data, using the reference below.
Here's the table config for the sample CSV file. You can use this as a reference to build your own table config. Simply edit the tableName and schemaName.
Check the directory structure so far
Upload the table config using the following command
To generate a segment, we need to first create a job spec yaml file. JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. You can just copy over this job spec file. If you're using your own data, be sure to 1) replace transcript
with your table name 2) set the right recordReaderSpec
Use the following command to generate a segment and upload it
Sample output
Pinot has the concept of , which is a logical abstraction to refer to a collection of related data. Pinot has a distributed architecture and scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, the entire data needs to be distributed across multiple nodes. Pinot achieve this by breaking the data into smaller chunks known as segment (this is similar to shards/partitions in relational databases). Segments can also be seen as time based partitions.
Thus, a segment is a horizontal shard representing a chunk of table data with some number of rows. The segment stores data for all columns of the table. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. The segment is laid out in a columnar format so that it can be directly mapped into memory for serving queries.
Columns may be single or multi-valued. Column types may be STRING, INT, LONG, FLOAT, DOUBLE or BYTES. Columns may be declared to be metric or dimension (or specifically as a time dimension) in the schema. Columns can have default null value. For example, the default null value of a integer column can be 0. Note: The default value of byte column has to be hex-encoded before adding to the schema.
Pinot uses dictionary encoding to store values as a dictionary ID. Columns may be configured to be “no-dictionary” column in which case raw values are stored. Dictionary IDs are encoded using minimum number of bits for efficient storage (e.g. a column with cardinality of 3 will use only 2 bits for each dictionary ID).
There is a forward index built for each column and compressed appropriately for efficient memory use. In addition, optional inverted indices can be configured for any set of columns. Inverted indices, while take up more storage, offer better query performance. Specialized indexes like Star-Tree index is also supported. Check out for more details.
Once the table is configured, we can load some data. Loading data involves generating pinot segments from raw data and pushing them to the pinot cluster. Data can be loaded in batch mode or streaming mode. See page for details.
Below are instructions to generate and push segments to Pinot via standalone scripts. For a production setup, you should use frameworks such as Hadoop or Spark. See this for more details on setting up Data Ingestion Jobs.
To generate a segment, we need to first create a job spec yaml file. JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. Note that this assumes that the controller is RUNNING to fetch the table config and schema. If not, you will have to configure the spec to point at their location.
where,
To create and push the segment in one go, use
Sample Console Output
Alternately, you can separately create and then push, by changing the jobType to SegmentCreation
or SegmenTarPush
.
Ingestion job spec supports templating with Groovy Syntax.
This would be convenient for users to generate one ingestion job template file and schedule it in a daily basis with extra parameters updated daily.
E.g. users can set inputDirURI
with parameters to indicate date, so that ingestion job only process the data for a particular date.
Below is an example to specify the date templating for input and output path.
Then specify the value of ${year}, ${month}, ${day}
when kicking off the ingestion job with arguments: -values $param=value1 $param2=value2
...
This ingestion job only generates segment for date 2014-01-03
Prerequisites
Below is an example of how to publish sample data to your stream. As soon as data is available to the realtime stream, it starts getting consumed by the realtime servers
Run below command to stream JSON data into Kafka topic: flights-realtime
Run below command to stream JSON data into Kafka topic: flights-realtime
Pinot powers many internal and external dashboards as well as external site facing analytics applications like .
Insight Product -
Modeled as a Helix Partition. Each can have multiple copies referred to as Replicas.
Modeled as a Helix Resource. Multiple segments are grouped into a . All segments belonging to a Pinot Table have the same schema.
is modeled as a Helix Participant and hosts .
This tenant is defined in the section of the table config.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
For more details on why this is needed, check out
streamConfigs
This section is where the bulk of settings specific to the realtime stream and consumption are found. This section is specific to tables of type REALTIME
and is ignored if the table type is any other. See section on for an overview of how realtime ingestion works.
The streamType
field is mandatory. In this case, it is set to kafka
. StreamType of kafka
is supported natively in Pinot. You can use default decoder classes and consumer factory classes. Pinot allows you to use other stream types with their own consumer factory and decoder classes (or, even other decoder and consumer factory for kafka
if your installation formats kafka messages differently). See .
All the configurations that are prefixed with the streamType are expected to be used by the underlying stream. So, you can set any of the configurations described in the can be set using the prefix stream.kafka
and Kafka should pay attention to it.
More options are explained in the section.
Similar to the offline table, this section defines the server and broker tenant used for this table. More details about tenant can be found in .
In the above example, the consuming segments will still be assigned to serverTenantName_REALTIME
hosts, but once they are completed, the segments will be moved to serverTeantnName_OFFLINE
. It is possible to specify the full name of any tag in this section (so, for example, you could decide that completed segments for this table should be in pinot servers tagged as allTables_COMPLETED
). Refer to section for learning more about this config.
Create a table config for your data, or see for all possible batch/streaming tables.
S
Check out the table config in the to make sure it was successfully uploaded.
Check out the table config in the to make sure it was successfully uploaded.
Getting data into Pinot is easy. Take a look at these two quick start guides which will help you get up and running with sample data for offline and real-time .
Create a schema for your data, or see for examples. Make sure you've
Note: schema can also be created as part of table creation, refer to .
Check out the schema in the to make sure it was successfully uploaded
Inverted indexes are set in the tableConfig's tableIndexConfig -> invertedIndexColumns list. Here's the documentation for tableIndexConfig: along with a sample table that has set inverted indexes on some columns.
Applying inverted indexes to a table config will generate inverted index to all new segments. In order to apply the inverted indexes to all existing segments, follow steps in
Add the columns you wish to index to the tableIndexConfig-> invertedIndexColumns list. This sample table config show inverted indexes set: To update the table config use the Pinot Swagger API:
Invoke the reload API:
Right now, there’s no easy way to confirm that reload succeeded. One way it to check out the index_map file inside the segment metadata, you should see inverted index entries for the new columns. An API for this is coming soon:
Here's the page explaining the Pinot response format:
You can change the number of replicas by updating the table config's section. Make sure you have at least as many servers as the replication.
For OFFLINE table, update
For REALTIME table update
After changing the replication, run a .
Use the rebalance API from the Swagger APIs on the controller , with tableType OFFLINE
Use the rebalance API from the Swagger APIs on the controller , with tableType REALTIME.
A realtime table has 2 components, the consuming segments and the completed segments. By default, only the completed segments will get rebalanced. The consuming segments will pick the right assignment once they complete. But you can enforce the consuming segments to also be included in the rebalance, by setting the param includeConsuming
to true. Note that rebalancing the consuming segments would mean the consuming segment will drop the consumed data so far, and restart consumption from the last offset, which may lead to a short duration of data staleness.
As you can see, the set (S0, S2) contains r1 of every partition, and (s1, S3) contains r2 of every partition. The query will only be routed to one of the sets, and not span every server. If you are are adding/removing servers from an existing table setup, you have to run for segment assignment changes to take effect.
Once replica group segment assignment is in effect, the query routing can take advantage of it. For replica group based query routing, set the following in the table config's section, and then restart brokers
That's it! We've spun up a Pinot cluster. You can continue playing with other types of quick start, or simply head on to to check out the data in the baseballStats
table.
We now have a Pinot cluster with a realtime table! You can head over to to check out the data in the meetupRSVP
table.
Let's head over to to check out the data we pushed to the airlineStats
table.
You can head over to to check out the data in the baseballStats
table.
Once the cluster is up, you can head over to to check out the data in the meetupRSVPEvents
table.
Once the cluster is up, you can head over to to check out the data in the airlineStats
table.
Please follow this link () to install Google Cloud SDK.
Please follow this to deploy your Pinot Demo.
Please follow this link () to install Azure CLI.
Please follow this to deploy your Pinot Demo.
Please follow this link () to install AWS CLI.
Please follow this link () to install AWS CLI.
For first time AWS user, please register your account at .
Once created the account, you can go to to create a user and create access keys under Security Credential tab.
Please follow this to deploy your Pinot Demo.
Start to browse Zookeeper data at .
Alternately, you can use .
Follow instruction in to get Pinot
You can use to browse the Zookeeper instance.
Now it's time to start adding data to the cluster. Check out some of the or follow the and for instructions on loading your own data.
If you followed the , you have already pushed a schema for your sample table. If not, head over to on that page, to learn how to create a schema for your sample data.
If you followed , you learnt how to push an offline table and schema. Similar to the offline table config, we will create a realtime table config for the sample. Here's the realtime table config for the transcript table. For a more detailed overview about table, checkout .
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the to checkout the realtime data
A table config is used to define the config related to the Pinot table. A detailed overview of the table can be found in .
Check out the table config and schema in the to make sure it was successfully uploaded.
A Pinot table's data is stored as Pinot segments. A detailed overview of the segment can be found in .
Check that your segment made it to the table using the
You're all set! You should see your table in the and be able to run queries against it now.
Top level field
Description
tableName
Specifies the name of the table. Should only contain alpha-numeric characters, hyphens (‘-‘), or underscores (‘’). (Using a double-underscore (‘_’) is not allowed and reserved for other features within Pinot)
tableType
Defines the table type - OFFLINE
for offline table, REALTIME
for realtime table. A hybrid table is essentially 2 table configs one of each type, with the same table name.
quota
This section defines properties related to quotas, such as storage quota and query quota. For more details scroll down to quota
routing
This section defines the properties related to configuring how the broker selects the servers to route, and how segments can be pruned by the broker based on segment metadata. For more details, scroll down to routing
segmentsConfig
This section defines the properties related to the segments of the table, such as segment push frequency, type, retention, schema, time column etc. For more details scroll down to segmentsConfig
tableIndexConfig
This section helps configure indexing and dictionary encoding related information for the Pinot table. For more details head over to tableIndexConfig
tenants
Define the server and broker tenant used for this table. More details about tenant can be found in Tenant.
metadata
This section is for keeping custom configs, which are expressed as key value pairs.
quota fields
Description
storage
The maximum storage space the table is allowed to use, before replication. For example, in the above table, the storage is 140G and replication is 3. Therefore, the maximum storage the table is allowed to use is 140*3=420G. The space used by the table is calculated by adding up the sizes of all segments from every server hosting this table. Once this limit is reached, offline segment push throws a 403
exception with message, Quota check failed for segment: segment_0 of table: pinotTable
.
maxQueriesPerSecond
The maximum queries per second allowed to execute on this table. If query volume exceeds this, a 429
exception with message,Request 123 exceeds query quota for table:pinotTable, query:select count(*) from pinotTable