Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of SegmentFetcher and provides config to Pinot Controller and Server as follows:
or
You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>
In your Pinot controller/server configuration, you will need to provide the following configs:
or
This path should point the local folder containing core-site.xml
and hdfs-site.xml
files from your Hadoop installation
or
These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.
You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.
To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.
For example, the following curl requests to Controller will notify it to download segment files to the proper table:
This page describes how to write your own streams to plug to Pinot. Two modes are available: high and low level.
The stream should provide the following guarantees:
Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.
(Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.
Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.
The checkpoints should be recorded only when Pinot makes a call to do so.
The consumer should be able to start consumption from one of:
latest avaialble data
earliest available data
last saved checkpoint
While consuming rows at a partition level, the stream should support the following properties:
Stream should provide a mechanism to get the current number of partitions.
Each event in a partition should have a unique offset that is not more than 64 bits long.
Refer to a partition as a number not exceeding 32 bits long.
Stream should provide the following mechanisms to get an offset for a given partition of the stream:
get the offset of the oldest event available (assuming events are aged out periodically) in the partition.
get the offset of the most recent event published in the partition
(optionally) get the offset of an event that was published at a specified time
Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.
Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1
, then the offset o2
of the following event should be such that o2 > o1
.
In addition, we have an operational requirement that the number of partitions should not be reduced over time.
In order to add a new type of stream (say, Foo) implement the following classes:
FooConsumerFactory extends StreamConsumerFactory
FooPartitionLevelConsumer implements PartitionLevelConsumer
FooStreamLevelConsumer implements StreamLevelConsumer
FooMetadataProvider implements StreamMetadataProvider
FooMessageDecoder implements StreamMessageDecoder
Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.
The properties for the stream implementation are to be set in the table configuration, inside streamConfigs section.
Use the streamType
property to define the stream type. For example, for the implementation of stream foo
, set the property "streamType" : "foo"
.
The rest of the configuration properties for your stream should be set with the prefix "stream.foo"
. Be sure to use the same suffix for: (see examples below):
topic
consumer type
stream consumer factory
offset
decoder class name
decoder properties
connection timeout
fetch timeout
All values should be strings. For example:
You can have additional properties that are specific to your stream. For example:
In addition to these properties, you can define thresholds for the consuming segments:
rows threshold
time threshold
The properties for the thresholds are as follows:
An example of this implementation can be found in the KafkaConsumerFactory, which is an implementation for the kafka stream.
To consume in realtime, we simply need to create a table with the same name as the schema and point to the Kafka topic to consume from, using a table definition such as this one:
First, we’ll start a local instance of Kafka and start streaming data into it:Untitled
This will stream one event per second from the Avro file to the Kafka topic. Then, we’ll create a realtime table, which will start consuming from the Kafka topic.
We can then query the table with the following query to see the events stream in:
Repeating the query multiple times should show the events slowly being streamed into the table.
When pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to pinot Controller and Server:
push segment to shared NFS and let pinot pull segment files from the location of that NFS.
push segment to a Web server and let pinot pull segment files from the Web server with http/https link.
push segment to HDFS and let pinot pull segment files from HDFS with hdfs location uri.
push segment to other system and implement your own segment fetcher to pull data from those systems.
The first two options should be supported out of the box with pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.
Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from realtime resources (such as Kafka).
Offline Pinot workflow
To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:
Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory
Create segments
Upload segments to the Pinot cluster
Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.
Create a job properties configuration file, such as one below:
The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.
You can then use the SegmentTarPush job to push segments via the controller REST API.
Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.
Follow the steps described in the section on Compiling the code to build pinot. Locate pinot-admin.sh
in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh
.
Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.
The file name extensions are expected to be the same as the format name (i.e .csv
, .json
or .avro
), and are case insensitive. Note that the converter expects the .csv
extension even if the data is delimited using tabs or spaces instead.
Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.
Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.
Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.
To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:
fileFormat: Specify one of the following. Default is EXCEL.
EXCEL
MYSQL
RFC4180
TDF
header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.
delimiter: Use this to specify a delimiter character. The default value is “,”.
multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.
Below is a sample config file.
Sample Schema:
You can use curl to push a segment to pinot:
Alternatively you can use the pinot-admin.sh utility to upload one or more segments:
The command uploads all the segments found in segmentDirectoryPath
. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath
) or uncompressed (in which case it is a directory under segmentDirectoryPath
).
This page describes how to connect Kafka to Pinot
Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.
Use Kafka Stream(High) Level Consumer
Below is a sample streamConfigs
used to create a realtime table with Kafka Stream(High) level consumer.
Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
in config stream.kafka.consumer.factory.class.name
.
Use Kafka Partition(Low) Level Consumer
Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:
Please note:
Config replicasPerPartition
under segmentsConfig
is required to specify table replication.
Config stream.kafka.consumer.type
should be specified as LowLevel
to use partition level consumer. (The use of simple
instead of LowLevel
is deprecated)
Configs stream.kafka.zk.broker.url
and stream.kafka.broker.list
are required under tableIndexConfig.streamConfigs
to provide kafka related information.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In pinot-connector-kafka-2.0/pom.xml
change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
Note
This section is a pre-read if you are planning to develop plug-ins for streams other than Kafka. Pinot supports Kafka out of the box.
Prior to commit ba9f2d, Pinot was only able to support consuming from Kafka stream.
Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to Issue #2583)
Some of the streams for which plug-ins can be added are:
You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.
Refer to Consuming and Indexing rows in Realtime for details on how Pinot consumes streaming data.