Pinot allows user to easily create and use new plugins to support custom needs. The developer needs to implement a few standard interfaces specific to the the type of plugins in Java.
Once the code is ready, you can put the complete JAR file in pinot /plugins
directory. All the classes in plugins directory are loaded at pinot's startup.
We also encourage users to shaded commonly used dependencies such as guava
, jackson
You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar.
All you need to do is to implement a class that extends the interface of SegmentFetcher and provide config to Pinot Controller and Server as follows:
or
You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>
This page describes how to write your own stream ingestion plugin for Pinot.
You can write custom stream ingestion plugins to add support for your own streaming platforms such as Pravega, Kinesis etc.
Stream Ingestion Plugins can be of two types -
High Level - Consume data without control over the partitions
Low Level - Consume data from each partition with offset management
The stream should provide the following guarantees:
Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.
(Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.
Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.
The checkpoints should be recorded only when Pinot makes a call to do so.
The consumer should be able to start consumption from one of:
latest available data
earliest available data
last saved checkpoint
While consuming rows at a partition level, the stream should support the following properties:
Stream should provide a mechanism to get the current number of partitions.
Each event in a partition should have a unique offset that is not more than 64 bits long.
Refer to a partition as a number not exceeding 32 bits long.
Stream should provide the following mechanisms to get an offset for a given partition of the stream:
get the offset of the oldest event available (assuming events are aged out periodically) in the partition.
get the offset of the most recent event published in the partition
(optionally) get the offset of an event that was published at a specified time
Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.
Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1
, then the offset o2
of the following event should be such that o2 > o1
.
In addition, we have an operational requirement that the number of partitions should not be reduced over time.
In order to add a new type of stream (say, Foo) implement the following classes:
Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.
The properties for the stream implementation are to be set in the table configuration, inside streamConfigs
section.
Use the streamType
property to define the stream type. For example, for the implementation of stream foo
, set the property "streamType" : "foo"
.
The rest of the configuration properties for your stream should be set with the prefix "stream.foo"
. Be sure to use the same suffix for: (see examples below):
topic
consumer type
stream consumer factory
offset
decoder class name
decoder properties
connection timeout
fetch timeout
All values should be strings. For example:
You can have additional properties that are specific to your stream. For example:
In addition to these properties, you can define thresholds for the consuming segments:
rows threshold
time threshold
The properties for the thresholds are as follows:
FooConsumerFactory extends
FooPartitionLevelConsumer implements
FooStreamLevelConsumer implements
FooMetadataProvider implements
FooMessageDecoder implements
An example of this implementation can be found in the , which is an implementation for the kafka stream.
Pinot supports multiple input formats out of the box for batch ingestion. For realtime ingestion, currently only JSON is supported. However, due to pluggable architecture of pinot you can easily use any format by implementing standard interfaces.
All the Batch Input formats supported by Pinot utilise RecordReader to deserialize the data. You can also implement the RecordReader and RecordExtractor interface to add support for your own file formats.
To index the file into Pinot segment, simply implement the interface and plug it into the index engine - SegmentCreationDriverImpl. We use a 2-passes algorithm to index the file into Pinot segment, hence the rewind() method is required for the record reader.
GenericRow is the record abstraction which the index engine can read and index with. It is a map from column name (String) to column value (Object). For multi-valued column, the value should be an object array (Object[]).
There are several contracts for record readers that developers should follow when implementing their own record readers:
The output GenericRow should follow the table schema provided, in the sense that:
All the columns in the schema should be preserved (if column does not exist in the original record, put default value instead)
Columns not in the schema should not be included
Values for the column should follow the field spec from the schema (data type, single-valued/multi-valued)
For the time column (refer to TimeFieldSpec), record reader should be able to read both incoming and outgoing time (we allow incoming time - time value from the original data to outgoing time - time value stored in Pinot conversion during index creation).
If incoming and outgoing time column name are the same, use incoming time field spec
If incoming and outgoing time column name are different, put both of them as time field spec
We keep both incoming and outgoing time column to handle cases where the input file contains time values that are already converted
Pinot uses decoders to parse data available in realtime streams. Decoders are responsible for converting binary data in the streams to a GenericRow object.
You can write your own decoder by implementing the StreamMessageDecoder interface. You can also use the RecordExtractor from the batch input formats to extract fields to GenericRow from the parsed object.
Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.
Some examples of storage backends(other than local storage) currently supported are:
If the above two filesystems do not meet your needs, you can extend the current PinotFS to customize for your needs.
In order to add a new type of storage backend such as Amazon S3 we need to implement the PinotFS class. The interface expects common method related to accessing the file system such as mkdir
, list
etc.
Once the the class is ready, you can compile it and put it in the /plugins
directory of pinot.
You can set the configuration in realtime or batch tables by using base scheme of URI (scheme://host:port/path) as suffix.
e.g. for the path hdfs://user/yarn/path/to/dir
, the base scheme is hdfs
and all the config keys should have hdfs
in them such pinot.controller.storage.factory.hdfs
The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS
to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:
In the Pinot controller config, add the following new configs:
These properties for the stream implementation are to be set in your controller and server configurations.
In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME}
to the full path of the FS class you would like to include
You also need to configure pinot.controller.local.temp.dir
for the local dir on the controller machine.
For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.
All the following configs need to be prefixed with storage.factory.
e.g. AzurePinotFS requires the following configs according to your environment:
adl.accountId
, adl.authEndpoint,
adl.clientId
, adl.clientSecret
Sample Controller Config
Sample Server Config
You can find the parameters in your account as follows: https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lake
Please also make sure to set the following config with the value adl
To see how to upload segments to different storage systems, check Batch Segment Fetcher Plugin.