arrow-left

All pages
gitbookPowered by GitBook
1 of 20

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Pinot Architecture

Pinot Architecture Overview

hashtag
Terminology

First, a bit of naming notions. Pinot has has different components, and different ways of representing the data. In particular, data is represented by:

hashtag
Table

A table is a logical abstraction to refer to a collection of related data. It consists of columns and rows (documents).

hashtag
Segment

Data in table is divided into (horizontal) shards referred to as segments.

hashtag
Pinot Components

hashtag
Pinot Controller

Manages other pinot components (brokers, servers) as well as controls assignment of tables/segments to servers.

hashtag
Pinot Server

Hosts one or more segments and serves queries from those segments.

hashtag
Pinot Broker

Accepts queries from clients and routes them to one or more servers, and returns consolidated response to the client.

Pinot leverages for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.

Briefly, Helix divides nodes into three logical components based on their responsibilities:

hashtag
Participant

The nodes that host distributed, partitioned resources.

hashtag
Spectator

The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).

hashtag
Controller

The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability

Pinot Controller hosts Helix Controller, in addition to hosting REST APIs for Pinot cluster administration and data ingestion. There can be multiple instances of Pinot controller for redundancy. If there are multiple controllers, Pinot expects that all of them are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .

Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).

Pinot Brokers are modeled as Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried. The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers (or servers) may optimize to prune some of the segments as long as accuracy is not satisfied. In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once. Helix provides the framework by which spectators can learn the location (i.e. participant) in which each partition of a resource resides. The brokers use this mechanism to learn the servers that host specific segments of a table.

Tutorials

Apache Helixarrow-up-right
ADLSarrow-up-right

Batch Tables

hashtag
Configurations for Batch Tables

These properties for the stream implementation are to be set in your controller and server configurations.

In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include

You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.

For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.

All the following configs need to be prefixed with storage.factory.

AzurePinotFS requires the following configs according to your environment:

adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret

Sample Controller Config

Sample Server Config

You can find the parameters in your account as follows:

Please also make sure to set the following config with the value “adl”

To see how to upload segments to different storage systems, check ../segment_fetcher.rst.

HadoopPinotFS requires the following configs according to your environment:

hadoop.kerberos.principle, hadoop.kerberos.keytab, hadoop.conf.path

Please make sure to also set the following config with the value “hdfs”

Batch

hashtag
Segment Fetchers

When pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to pinot Controller and Server:

  1. push segment to shared NFS and let pinot pull segment files from the location of that NFS.

  2. push segment to a Web server and let pinot pull segment files from the Web server with http/https link.

  3. push segment to HDFS and let pinot pull segment files from HDFS with hdfs location uri.

  4. push segment to other system and implement your own segment fetcher to pull data from those systems.

The first two options should be supported out of the box with pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.

Amazon Kinesis

AWS S3

Azure Storage

Google Pub/Sub

Google Cloud Storage

Azure EventHub

"pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.controller.storage.factory.adl.accountId": "xxxx"
"pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.segment.fetcher.protocols": "adl"
https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lakearrow-up-right
"pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.server.storage.factory.adl.accountId": "xxxx"
"pinot.server.storage.factory.adl.authEndpoint": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.segment.fetcher.protocols": "adl"
"segment.fetcher.protocols" : "adl"
"segment.fetcher.protocols" : "hdfs"

Ingest Data

How to turn on the water valve

There are two ways to get data ingested into Pinot:

Write your batch

hashtag
Implement your own segment fetcher for other systems

You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of and provides config to Pinot Controller and Server as follows:

or

You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>

Batch
Streaming
pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
SegmentFetcherarrow-up-right

HDFS

hashtag
HDFS segment fetcher configs

In your Pinot controller/server configuration, you will need to provide the following configs:

pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>

or

pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>

This path should point the local folder containing core-site.xml and hdfs-site.xml files from your Hadoop installation

or

These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.

You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

hashtag
Push HDFS segment to Pinot Controller

To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.

For example, the following curl requests to Controller will notify it to download segment files to the proper table:

Store Data

hashtag
Pinot Persistent Layers

By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of system crash. In order to persistently store the generated segments, you will need a storage layer.

Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.

Some examples of storage backends(other than local storage) currently supported are:

If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.

hashtag
New Storage Type implementation

In order to add a new type of storage backend (say, Amazon s3) implement the following class:

S3FS extends

Kafka

This page describes how to connect Kafka to Pinot

hashtag
Kafka 2.x Plugin

Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.

hashtag

Creating Pinot Segments

hashtag
Realtime segment generation

To consume in realtime, we simply need to create a table with the same name as the schema and point to the Kafka topic to consume from, using a table definition such as this one:

First, we’ll start a local instance of Kafka and start streaming data into it:Untitled

This will stream one event per second from the Avro file to the Kafka topic. Then, we’ll create a realtime table, which will start consuming from the Kafka topic.

Streaming Tables

hashtag
Configurations for Streaming Tables

The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:

In the Pinot controller config, add the following new configs:

Note: currently there is a bug in the controller (issue <), for now you can cherrypick the PR to fix the issue as tested already. The PR is under review now.

HadoopFSarrow-up-right
Azure Data Lakearrow-up-right
PinotFSarrow-up-right
PinotFSarrow-up-right
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
"controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
"controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
"pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
"pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
"controller.enable.split.commit": "true"
"pinot.server.instance.enable.split.commit": "true"
https://github.com/apache/incubator-pinot/issues/3847>\arrow-up-right
https://github.com/apache/incubator-pinot/pull/3849arrow-up-right
How to build and release Pinot package with Kafka 2.x connector

hashtag
How to use Kafka 2.x connector

  • Use Kafka Stream(High) Level Consumer

Below is a sample streamConfigs used to create a realtime table with Kafka Stream(High) level consumer.

Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory in config stream.kafka.consumer.factory.class.name.

  • Use Kafka Partition(Low) Level Consumer

Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:

Please note:

  1. Config replicasPerPartition under segmentsConfig is required to specify table replication.

  2. Config stream.kafka.consumer.type should be specified as LowLevel to use partition level consumer. (The use of simple instead of LowLevel is deprecated)

  3. Configs stream.kafka.zk.broker.url and stream.kafka.broker.list are required under tableIndexConfig.streamConfigs to provide kafka related information.

hashtag
Upgrade from Kafka 0.9 connector to Kafka 2.x connector

  • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

  • If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g. localhost:9092.

hashtag
How to use this plugin with higher Kafka version?

This connector is also suitable for Kafka lib version higher than 2.0.0. In pinot-connector-kafka-2.0/pom.xml change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

We can then query the table with the following query to see the events stream in:

Repeating the query multiple times should show the events slowly being streamed into the table.

{
  "tableName": "flights",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "7",
    "segmentPushFrequency": "daily",
    "segmentPushType": "APPEND",
    "replication": "1",
    "timeColumnName": "daysSinceEpoch",
    "timeType": "DAYS",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [
      "flightNumber",
      "tags",
      "daysSinceEpoch"
    ],
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "highLevel",
      "stream.kafka.topic.name": "flights-realtime",
      "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.zk.broker.url": "localhost:2181",
      "stream.kafka.hlc.zk.connect.string": "localhost:2181"
    }
  },
  "tenants": {
    "broker": "brokerTenant",
    "server": "serverTenant"
  },
  "metadata": {
  }
}
curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.
mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
"streamConfigs": {
  "streamType": "kafka",
  "stream.kafka.consumer.type": "highLevel",
  "stream.kafka.topic.name": "meetupRSVPEvents",
  "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
  "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
  "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
  "stream.kafka.zk.broker.url": "localhost:2191/kafka",
  "stream.kafka.hlc.bootstrap.server": "localhost:19092"
}
{
  "tableName": "meetupRsvp",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "mtime",
    "timeType": "MILLISECONDS",
    "segmentPushType": "APPEND",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "schemaName": "meetupRsvp",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "LowLevel",
      "stream.kafka.topic.name": "meetupRSVPEvents",
      "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
      "stream.kafka.broker.list": "localhost:19092"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
bin/pinot-admin.sh StartKafka &
bin/pinot-admin.sh StreamAvroIntoKafka -avroFile flights-2014.avro -kafkaTopic flights-realtime &
bin/pinot-admin.sh AddTable -filePath flights-definition-realtime.json
SELECT COUNT(*) FROM flights

Streaming

hashtag
Pluggable Streams

Note

This section is a pre-read if you are planning to develop plug-ins for streams other than Kafka. Pinot supports Kafka out of the box.

Prior to commit , Pinot was only able to support consuming from stream.

Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to Issue #2583arrow-up-right)

Some of the streams for which plug-ins can be added are:

  • Amazon kinesisarrow-up-right

  • Azure Event Hubsarrow-up-right

  • LogDevicearrow-up-right

You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.

Refer to Consuming and Indexing rows in Realtimearrow-up-right for details on how Pinot consumes streaming data.

ba9f2darrow-up-right
Kafkaarrow-up-right

Creating Pinot Segments

hashtag
Creating Pinot segments

Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from realtime resources (such as Kafka).

hashtag

Pravegaarrow-up-right
Pulsararrow-up-right
Creating segments using hadoop
_images/Pinot-Offline-only-flow.png

Offline Pinot workflow

To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:

  1. Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory

  2. Create segments

  3. Upload segments to the Pinot cluster

Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.

hashtag
Configuring the job

Create a job properties configuration file, such as one below:

hashtag
Executing the job

The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.

You can then use the SegmentTarPush job to push segments via the controller REST API.

hashtag
Creating Pinot segments outside of Hadoop

Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.

  1. Follow the steps described in the section on Compiling the codearrow-up-right to build pinot. Locate pinot-admin.sh in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh.

  2. Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.

  3. The file name extensions are expected to be the same as the format name (i.e .csv, .json or .avro), and are case insensitive. Note that the converter expects the .csv extension even if the data is delimited using tabs or spaces instead.

  4. Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.

  5. Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.

Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.

To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:

  1. fileFormat: Specify one of the following. Default is EXCEL.

    1. EXCEL

    2. MYSQL

    3. RFC4180

    4. TDF

  2. header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.

  3. delimiter: Use this to specify a delimiter character. The default value is “,”.

  4. multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.

Below is a sample config file.

Sample Schema:

hashtag
Pushing offline segments to Pinot

You can use curl to push a segment to pinot:

Alternatively you can use the pinot-admin.sh utility to upload one or more segments:

The command uploads all the segments found in segmentDirectoryPath. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath) or uncompressed (in which case it is a directory under segmentDirectoryPath).

# === Index segment creation job config ===

# path.to.input: Input directory containing Avro files
path.to.input=/user/pinot/input/data

# path.to.output: Output directory containing Pinot segments
path.to.output=/user/pinot/output

# path.to.schema: Schema file for the table, stored locally
path.to.schema=flights-schema.json

# segment.table.name: Name of the table for which to generate segments
segment.table.name=flights

# === Segment tar push job config ===

# push.to.hosts: Comma separated list of controllers host names to which to push
push.to.hosts=controller_host_0,controller_host_1

# push.to.port: The port on which the controller runs
push.to.port=8888
mvn clean install -DskipTests -Pbuild-shaded-jar
hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentCreation job.properties
hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentTarPush job.properties
bin/pinot-admin.sh CreateSegment -dataDir <input_data_dir> [-format [CSV/JSON/AVRO]] [-readerConfigFile <csv_config_file>] [-generatorConfigFile <generator_config_file>] -segmentName <segment_name> -schemaFile <input_schema_file> -tableName <table_name> -outDir <output_data_dir> [-overwrite]
{
  "fileFormat": "EXCEL",
  "header": "col1,col2,col3,col4",
  "delimiter": "\t",
  "multiValueDelimiter": ","
}
{
  "schemaName": "flights",
  "dimensionFieldSpecs": [
    {
      "name": "flightNumber",
      "dataType": "LONG"
    },
    {
      "name": "tags",
      "dataType": "STRING",
      "singleValueField": false
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "price",
      "dataType": "DOUBLE"
    }
  ],
  "timeFieldSpec": {
    "incomingGranularitySpec": {
      "name": "daysSinceEpoch",
      "dataType": "INT",
      "timeType": "DAYS"
    }
  }
}
curl -X POST -F segment=@<segment-tar-file-path> http://controllerHost:controllerPort/segments
pinot-tools/target/pinot-tools-pkg/bin//pinot-admin.sh UploadSegment -controllerHost <hostname> -controllerPort <port> -segmentDir <segmentDirectoryPath>