arrow-left

All pages
gitbookPowered by GitBook
1 of 5

Loading...

Loading...

Loading...

Loading...

Loading...

Write Custom Plugins

Pinot allows user to easily create and use new plugins to support custom needs. The developer needs to implement a few standard interfaces specific to the the type of plugins in Java.

Once the code is ready, you can put the complete JAR file in pinot /plugins directory. All the classes in plugins directory are loaded at pinot's startup. We also encourage users to shaded commonly used dependencies such as guava, jackson

Input Format Plugin

Pinot supports multiple input formats out of the box for batch ingestion. For real-time ingestion, currently only JSON is supported. However, due to pluggable architecture of pinot you can easily use any format by implementing standard interfaces.

hashtag
Batch Record Reader Plugin

All the Batch Input formats supported by Pinot utilise RecordReaderarrow-up-right to deserialize the data. You can also implement the RecordReader and RecordExtractorarrow-up-right interface to add support for your own file formats.

To index the file into Pinot segment, simply implement the interface and plug it into the index engine - . We use a 2-passes algorithm to index the file into Pinot segment, hence the rewind() method is required for the record reader.

hashtag
Generic Row

is the record abstraction which the index engine can read and index with. It is a map from column name (String) to column value (Object). For multi-valued column, the value should be an object array (Object[]).

hashtag
Contracts for Record Reader

There are several contracts for record readers that developers should follow when implementing their own record readers:

  • The output GenericRow should follow the table schema provided, in the sense that:

    • All the columns in the schema should be preserved (if column does not exist in the original record, put default value instead)

    • Columns not in the schema should not be included

hashtag
Stream Decoder Plugin

Pinot uses decoders to parse data available in real-time streams. Decoders are responsible for converting binary data in the streams to a GenericRow object.

You can write your own decoder by implementing the interface. You can also use the from the batch input formats to extract fields to GenericRow from the parsed object.

Values for the column should follow the field spec from the schema (data type, single-valued/multi-valued)

  • For the time column (refer to TimeFieldSpecarrow-up-right), record reader should be able to read both incoming and outgoing time (we allow incoming time - time value from the original data to outgoing time - time value stored in Pinot conversion during index creation).

    • If incoming and outgoing time column name are the same, use incoming time field spec

    • If incoming and outgoing time column name are different, put both of them as time field spec

    • We keep both incoming and outgoing time column to handle cases where the input file contains time values that are already converted

  • SegmentCreationDriverImplarrow-up-right
    GenericRowarrow-up-right
    StreamMessageDecoderarrow-up-right
    RecordExtractorarrow-up-right

    Filesystem Plugin

    Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for real-time and offline segments.

    Some examples of storage backends(other than local storage) currently supported are:

    • HadoopFSarrow-up-right

    • Azure Data Lakearrow-up-right

    If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.

    hashtag
    New Storage Type implementation

    In order to add a new type of storage backend such as Amazon S3 we need to implement the class. The interface expects common method related to accessing the file system such as mkdir, list etc.

    Once the the class is ready, you can compile it and put it in the /plugins directory of pinot.

    hashtag
    Using plugin in real-time table

    You can set the configuration in real time or batch tables by using base scheme of URI (scheme://host:port/path) as suffix. e.g. for the path hdfs://user/yarn/path/to/dir , the base scheme is hdfs and all the config keys should have hdfs in them such pinot.controller.storage.factory.hdfs

    The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store real-time segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:

    In the Pinot controller config, add the following new configs:

    hashtag
    Configurations for Offline Tables

    These properties for the stream implementation are to be set in your controller and server configurations.

    In your controller and server configs, set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include

    You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.

    For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.

    All the following configs need to be prefixed with storage.factory.

    e.g. AzurePinotFS requires the following configs according to your environment:

    adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret

    Sample Controller Config

    Sample Server Config

    You can find the parameters in your account as follows:

    Also make sure to set the following config with the value adl

    To see how to upload segments to different storage systems, check .

    PinotFSarrow-up-right
    PinotFSarrow-up-right
    https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lakearrow-up-right
    Batch Segment Fetcher Plugin
    "controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
    "controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
    "pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
    "pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
    "pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
    "pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
    "controller.enable.split.commit": "true"
    "pinot.server.instance.enable.split.commit": "true"
    "pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
    "pinot.controller.storage.factory.adl.accountId": "xxxx"
    "pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
    "pinot.controller.storage.factory.adl.clientId": "xxxx"
    "pinot.controller.segment.fetcher.protocols": "adl"
    "pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
    "pinot.server.storage.factory.adl.accountId": "xxxx"
    "pinot.server.storage.factory.adl.authEndpoint": "xxxx"
    "pinot.server.storage.factory.adl.clientId": "xxxx"
    "pinot.server.segment.fetcher.protocols": "adl"
    "segment.fetcher.protocols" : "adl"

    Batch Segment Fetcher Plugin

    You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar.

    All you need to do is to implement a class that extends the interface of SegmentFetcherarrow-up-right and provide config to Pinot Controller and Server as follows:

    pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

    or

    pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

    You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>

    Stream Ingestion Plugin

    This page describes how to write your own stream ingestion plugin for Pinot.

    You can write custom stream ingestion plugins to add support for your own streaming platforms such as Pravega, Kinesis etc.

    Stream Ingestion Plugins can be of two types -

    • High Level - Consume data without control over the partitions

    • Low Level - Consume data from each partition with offset management

    hashtag
    Requirements to support Stream Level (High Level) consumers

    The stream should provide the following guarantees:

    • Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.

    • (Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.

    • Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.

    hashtag
    Requirements to support Partition Level (Low Level) consumers

    While consuming rows at a partition level, the stream should support the following properties:

    • Stream should provide a mechanism to get the current number of partitions.

    • Each event in a partition should have a unique offset that is not more than 64 bits long.

    • Refer to a partition as a number not exceeding 32 bits long.

    In addition, we have an operational requirement that the number of partitions should not be reduced over time.

    hashtag
    Stream plug-in implementation

    In order to add a new type of stream (say, Foo) implement the following classes:

    1. FooConsumerFactory extends

    2. FooPartitionLevelConsumer implements

    3. FooStreamLevelConsumer implements

    Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.

    The properties for the stream implementation are to be set in the table configuration, inside streamConfigs section.

    Use the streamType property to define the stream type. For example, for the implementation of stream foo, set the property "streamType" : "foo".

    The rest of the configuration properties for your stream should be set with the prefix "stream.foo". Be sure to use the same suffix for: (see examples below):

    • topic

    • consumer type

    • stream consumer factory

    All values should be strings. For example:

    You can have additional properties that are specific to your stream. For example:

    In addition to these properties, you can define thresholds for the consuming segments:

    • rows threshold

    • time threshold

    The properties for the thresholds are as follows:

    An example of this implementation can be found in the , which is an implementation for the kafka stream.

    The checkpoints should be recorded only when Pinot makes a call to do so.

  • The consumer should be able to start consumption from one of:

    • latest available data

    • earliest available data

    • last saved checkpoint

  • Stream should provide the following mechanisms to get an offset for a given partition of the stream:

    • get the offset of the oldest event available (assuming events are aged out periodically) in the partition.

    • get the offset of the most recent event published in the partition

    • (optionally) get the offset of an event that was published at a specified time

  • Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.

  • Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1, then the offset o2 of the following event should be such that o2 > o1.

  • FooMetadataProvider implements StreamMetadataProviderarrow-up-right

  • FooMessageDecoder implements StreamMessageDecoderarrow-up-right

  • offset
  • decoder class name

  • decoder properties

  • connection timeout

  • fetch timeout

  • StreamConsumerFactoryarrow-up-right
    PartitionLevelConsumerarrow-up-right
    StreamLevelConsumerarrow-up-right
    KafkaConsumerFactoryarrow-up-right
    "streamType" : "foo",
    "stream.foo.topic.name" : "SomeTopic",
    "stream.foo.consumer.type": "LowLevel",
    "stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
    "stream.foo.consumer.prop.auto.offset.reset": "largest",
    "stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
    "stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
    "stream.foo.connection.timeout.millis" : "10000", // default 30_000
    "stream.foo.fetch.timeout.millis" : "10000" // default 5_000
    "stream.foo.some.buffer.size" : "24g"
    "realtime.segment.flush.threshold.rows" : "100000"
    "realtime.segment.flush.threshold.time" : "6h"