This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.
Pinot supports consuming data from Apache Pulsar via the pinot-pulsar
plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.
Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar
The pinot-pulsar
plugin is not part of official 0.10.0 binary. You can download the plugin from our external repository and add it to the libs
or plugins
directory in pinot.
Here is a sample Pulsar stream config. You can use the streamConfigs
section from this sample and make changes for your corresponding table.
You can change the following Pulsar specifc configurations for your tables
The Pinot-Pulsar connector supports authentication using security tokens. To generate a token, follow the instructions in Pulsar documentation. Once generated, add the following property to streamConfigs
to add an authentication token for each request:
The Pinot-Pulsar connector supports authentication using OAuth2, for example, if connecting to a StreamNative Pulsar cluster. For more information, see how to Configure OAuth2 authentication in Pulsar clients. Once configured, you can add the following properties to streamConfigs
:
The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow the official pulsar documentation to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.
Also, make sure to change the brokers url from pulsar://localhost:6650
to pulsar+ssl://localhost:6650
so that secure connections are used.
For other table and stream configurations, you can headover to Table configuration Reference
Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.
Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Reference the official Pulsar documentation for the meaning of the metadata fields.
The following table shows the mapping for record header/metadata to Pinot table column names:
In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate
to true
. The fields eventTime
, publishTime
, brokerPublishTime
, and key
are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields
config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:
Will make the __metadata$messageId
, __metadata$messageBytes
, __metadata$eventTime
, and __metadata$topicName
, fields available for mapping to columns in the Pinot schema.
In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.
For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:
Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.
Remember to follow the schema evolution guidelines when updating schema of an existing table!
Property | Description |
---|---|
Pulsar Message | Pinot table Column | Comments | Available By Default |
---|---|---|---|
streamType
This should be set to "pulsar"
stream.pulsar.topic.name
Your pulsar topic name
stream.pulsar.bootstrap.servers
Comma-separated broker list for Apache Pulsar
stream.pulsar.metadata.populate
set to true
to populate metadata
stream.pulsar.metadata.fields
set to comma separated list of metadata fields
key : String
__key
: String
Yes
properties : Map<String, String>
Each header key is listed as a separate column: __header$HeaderKeyName
: String
Yes
publishTime : Long
__metadata$publishTime
: String
publish time as determined by the producer
Yes
brokerPublishTime: Optional
__metadata$brokerPublishTime
: String
publish time as determined by the broker
Yes
eventTime : Long
__metadata$eventTime
: String
Yes
messageId : MessageId -> String
__metadata$messageId
: String
String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex
messageId : MessageId -> bytes
__metadata$messageBytes
: String
Base64 encoded version of the bytes returned from calling MessageId.toByteArray()
producerName : String
__metadata$producerName
: String
schemaVersion : byte[]
__metadata$schemaVersion
: String
Base64 encoded value
sequenceId : Long
__metadata$sequenceId
: String
orderingKey : byte[]
__metadata$orderingKey
: String
Base64 encoded value
size : Integer
__metadata$size
: String
topicName : String
__metadata$topicName
: String
index : String
__metadata$index
: String
redeliveryCount : Integer
__metadata$redeliveryCount
: String