Ingest streaming data from Apache Pulsar

This guide shows you how to ingest a stream of records from an Apache Pulsar topic into a Pinot table.

Pinot supports consuming data from Apache Pulsar via the pinot-pulsar plugin. You need to enable this plugin so that Pulsar specific libraries are present in the classpath.

Enable the Pulsar plugin with the following config at the time of Pinot setup: -Dplugins.include=pinot-pulsar

The pinot-pulsar plugin is not part of official 0.10.0 binary. You can download the plugin from our external repository and add it to the libs or plugins directory in pinot.

Set up Pulsar table

Here is a sample Pulsar stream config. You can use the streamConfigs section from this sample and make changes for your corresponding table.

{
  "tableName": "pulsarTable",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "pulsar",
      "stream.pulsar.topic.name": "<your pulsar topic name>",
      "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650,pulsar://localhost:6651",
      "stream.pulsar.consumer.prop.auto.offset.reset" : "smallest",
      "stream.pulsar.consumer.type": "lowlevel",
      "stream.pulsar.fetch.timeout.millis": "30000",
      "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
      "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
      "realtime.segment.flush.threshold.rows": "1000000",
      "realtime.segment.flush.threshold.time": "6h"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}

Pulsar configuration options

You can change the following Pulsar specifc configurations for your tables

Property
Description

streamType

This should be set to "pulsar"

stream.pulsar.topic.name

Your pulsar topic name

stream.pulsar.bootstrap.servers

Comma-separated broker list for Apache Pulsar

stream.pulsar.metadata.populate

set to true to populate metadata

stream.pulsar.metadata.fields

set to comma separated list of metadata fields

Authentication

The Pinot-Pulsar connector supports authentication using security tokens. To generate a token, follow the instructions in Pulsar documentation. Once generated, add the following property to streamConfigs to add an authentication token for each request:

"stream.pulsar.authenticationToken":"your-auth-token"

OAuth2 Authentication

The Pinot-Pulsar connector supports authentication using OAuth2, for example, if connecting to a StreamNative Pulsar cluster. For more information, see how to Configure OAuth2 authentication in Pulsar clients. Once configured, you can add the following properties to streamConfigs:

"stream.pulsar.issuerUrl": "https://auth.streamnative.cloud"
"stream.pulsar.credsFilePath": "file:///path/to/private_creds_file
"stream.pulsar.audience": "urn:sn:pulsar:test:test-cluster"

TLS support

The Pinot-pulsar connector also supports TLS for encrypted connections. You can follow the official pulsar documentation to enable TLS on your pulsar cluster. Once done, you can enable TLS in pulsar connector by providing the trust certificate file location generated in the previous step.

"stream.pulsar.tlsTrustCertsFilePath": "/path/to/ca.cert.pem"

Also, make sure to change the brokers url from pulsar://localhost:6650 to pulsar+ssl://localhost:6650 so that secure connections are used.

For other table and stream configurations, you can headover to Table configuration Reference

Supported Pulsar versions

Pinot currently relies on Pulsar client version 2.7.2. Make sure the Pulsar broker is compatible with the this client version.

Extract record headers as Pinot table columns

Pinot's Pulsar connector supports automatically extracting record headers and metadata into the Pinot table columns. Pulsar supports a large amount of per-record metadata. Reference the official Pulsar documentation for the meaning of the metadata fields.

The following table shows the mapping for record header/metadata to Pinot table column names:

Pulsar Message
Pinot table Column
Comments
Available By Default

key : String

__key : String

Yes

properties : Map<String, String>

Each header key is listed as a separate column: __header$HeaderKeyName : String

Yes

publishTime : Long

__metadata$publishTime : String

publish time as determined by the producer

Yes

brokerPublishTime: Optional

__metadata$brokerPublishTime : String

publish time as determined by the broker

Yes

eventTime : Long

__metadata$eventTime : String

Yes

messageId : MessageId -> String

__metadata$messageId : String

String representation of the MessagId field. The format is ledgerId:entryId:partitionIndex

messageId : MessageId -> bytes

__metadata$messageBytes : String

Base64 encoded version of the bytes returned from calling MessageId.toByteArray()

producerName : String

__metadata$producerName : String

schemaVersion : byte[]

__metadata$schemaVersion : String

Base64 encoded value

sequenceId : Long

__metadata$sequenceId : String

orderingKey : byte[]

__metadata$orderingKey : String

Base64 encoded value

size : Integer

__metadata$size : String

topicName : String

__metadata$topicName : String

index : String

__metadata$index : String

redeliveryCount : Integer

__metadata$redeliveryCount : String

In order to enable the metadata extraction in a Pulsar table, set the stream config metadata.populate to true. The fields eventTime, publishTime, brokerPublishTime, and key are populated by default. If you would like to extract additional fields from the Pulsar Message, populate the metadataFields config with a comma separated list of fields to populate. The fields are referenced by the field name in the Pulsar Message. For example, setting:


"streamConfigs": {
  ...
        "stream.pulsar.metadata.populate": "true",
        "stream.pulsar.metadata.fields": "messageId,messageIdBytes,eventTime,topicName",
  ...
}

Will make the __metadata$messageId, __metadata$messageBytes, __metadata$eventTime, and __metadata$topicName, fields available for mapping to columns in the Pinot schema.

In addition to this, if you want to use any of these columns in your table, you have to list them explicitly in your table's schema.

For example, if you want to add only the offset and key as dimension columns in your Pinot table, it can listed in the schema as follows:

  "dimensionFieldSpecs": [
    {
      "name": "__key",
      "dataType": "STRING"
    },
    {
      "name": "__metadata$messageId",
      "dataType": "STRING"
    },
    ...
  ],

Once the schema is updated, these columns are similar to any other pinot column. You can apply ingestion transforms and / or define indexes on them.

Remember to follow the schema evolution guidelines when updating schema of an existing table!