Spark-Pinot Connector

Use the Spark-Pinot connector to read data from and write data to Pinot.

Spark-pinot connector to read data from Pinot.

Detailed read model documentation is here: Spark Pinot Connector Read Model

The write model is Experimental and the documentation is here: Spark Pinot Connector Write Model

Features

  • Query realtime, offline or hybrid tables

  • Distributed, parallel scan

  • Streaming reads using gRPC (optional)

  • SQL support instead of PQL

  • Column and filter push down to optimize performance

  • Overlap between realtime and offline segments is queried exactly once for hybrid tables

  • Schema discovery

    • Dynamic inference

    • Static analysis of case class

  • Supports query options

  • HTTPS/TLS support for secure connections

Quick Start

import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession
      .builder()
      .appName("spark-pinot-connector-test")
      .master("local")
      .getOrCreate()

import spark.implicits._

val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .load()
  .filter($"DestStateName" === "Florida")

data.show(100)

Security Configuration

You can secure both HTTP and gRPC using a unified switch or explicit flags.

  • Unified: set secureMode=true to enable HTTPS and gRPC TLS together (recommended)

  • Explicit: set useHttps for REST and grpc.use-plain-text=false for gRPC

Quick examples

// Unified secure mode (enables HTTPS + gRPC TLS by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("secureMode", "true")
  .load()

// Explicit HTTPS only (gRPC remains plaintext by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("useHttps", "true")
  .load()

// Explicit gRPC TLS only (REST remains HTTP by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.use-plain-text", "false")
  .load()

HTTPS Configuration

When HTTPS is enabled (either via secureMode=true or useHttps=true), you can configure keystore/truststore as needed:

val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("useHttps", "true")
  .option("keystorePath", "/path/to/keystore.jks")
  .option("keystorePassword", "keystorePassword")
  .option("truststorePath", "/path/to/truststore.jks")
  .option("truststorePassword", "truststorePassword")
  .load()

HTTPS Configuration Options

Option
Description
Required
Default

secureMode

Unified switch to enable HTTPS and gRPC TLS

No

false

useHttps

Enable HTTPS connections (overrides secureMode for REST)

No

false

keystorePath

Path to client keystore file (JKS format)

No

None

keystorePassword

Password for the keystore

No

None

truststorePath

Path to truststore file (JKS format)

No

None

truststorePassword

Password for the truststore

No

None

Note: If no truststore is provided when HTTPS is enabled, the connector will trust all certificates (not recommended for production use).

Authentication Support

The connector supports custom authentication headers for secure access to Pinot clusters:

// Using Bearer token authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authToken", "my-jwt-token")  // Automatically adds "Authorization: Bearer my-jwt-token"
  .load()

// Using custom authentication header
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authHeader", "Authorization")
  .option("authToken", "Bearer my-custom-token")
  .load()

// Using API key authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authHeader", "X-API-Key")
  .option("authToken", "my-api-key")
  .load()

Authentication Configuration Options

Option
Description
Required
Default

authHeader

Custom authentication header name

No

Authorization (when authToken is provided)

authToken

Authentication token/value

No

None

Note: If only authToken is provided without authHeader, the connector will automatically use Authorization: Bearer <token>.

Pinot Proxy Support

The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.

Proxy Configuration Examples

// Basic proxy configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")  // Proxy endpoint
  .option("proxy.enabled", "true")
  .load()

// Proxy with authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")
  .option("proxy.enabled", "true")
  .option("authToken", "my-proxy-token")
  .load()

// Proxy with gRPC configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")
  .option("proxy.enabled", "true")
  .option("grpc.proxy-uri", "pinot-proxy:8094")  // gRPC proxy endpoint
  .load()

Proxy Configuration Options

Option
Description
Required
Default

proxy.enabled

Use Pinot Proxy for controller and broker requests

No

false

Note: When proxy is enabled, the connector adds FORWARD_HOST and FORWARD_PORT headers to route requests to the actual Pinot services.

gRPC Configuration

The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.

gRPC Configuration Examples

// Basic gRPC configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.port", "8091")
  .option("grpc.max-inbound-message-size", "256000000")  // 256MB
  .load()

// gRPC with TLS (explicit)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.use-plain-text", "false")
  .option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
  .option("grpc.tls.keystore-password", "keystore-password")
  .option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
  .option("grpc.tls.truststore-password", "truststore-password")
  .load()

// gRPC with proxy
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("proxy.enabled", "true")
  .option("grpc.proxy-uri", "pinot-proxy:8094")
  .load()

gRPC Configuration Options

Option
Description
Required
Default

grpc.port

Pinot gRPC port

No

8090

grpc.max-inbound-message-size

Max inbound message bytes when init gRPC client

No

128MB

grpc.use-plain-text

Use plain text for gRPC communication (overrides secureMode for gRPC)

No

true

grpc.tls.keystore-type

TLS keystore type for gRPC connection

No

JKS

grpc.tls.keystore-path

TLS keystore file location for gRPC connection

No

None

grpc.tls.keystore-password

TLS keystore password

No

None

grpc.tls.truststore-type

TLS truststore type for gRPC connection

No

JKS

grpc.tls.truststore-path

TLS truststore file location for gRPC connection

No

None

grpc.tls.truststore-password

TLS truststore password

No

None

grpc.tls.ssl-provider

SSL provider

No

JDK

grpc.proxy-uri

Pinot Rest Proxy gRPC endpoint URI

No

None

Note: When using gRPC with proxy, the connector automatically adds FORWARD_HOST and FORWARD_PORT metadata headers for proper request routing.

Example run with spark-shell

There are examples under https://github.com/apache/pinot/tree/master/pinot-connectors/pinot-spark-3-connector/examples .

Prerequisites

  • Apache Spark 3.x installed and spark-shell available in your PATH.

  • Setup PINOT_HOME env variable:

    export PINOT_HOME=/path/to/pinot
  • The Pinot Spark 3 Connector shaded JAR built and available at:

    $PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar
  • Example Scala script located at:

    $PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala

Scala Script to read data from Pinot Proxy

read_pinot_from_proxy_with_auth_token.scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("read-pinot-airlineStats").master("local[*]").getOrCreate()

val df = spark.read.
  format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
  option("table", "myTable").
  option("tableType", "offline").
  option("controller", "pinot-proxy:8080").
  option("secureMode", "true").
  option("authToken", "st-xxxxxxx").
  option("proxy.enabled", "true").
  option("grpc.proxy-uri", "pinot-proxy:8094").
  option("useGrpcServer", "true").
  load()

println("Schema:")
df.printSchema()

println("Sample rows:")
df.show(10, truncate = false)

println(s"Total rows: ${df.count()}")

spark.stop()

Run with spark-shell

Launch the example in spark-shell with the following command:

spark-shell 
    --master 'local[*]' \
    --name read-pinot \
    --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"

Sample output

spark-shell --master 'local[*]' --name read-pinot --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"

25/09/04 07:59:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://xiang-mac-home.wyvern-sun.ts.net:4040
Spark context available as 'sc' (master = local[*], app id = local-1756997971428).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.15)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala>

scala> val spark = SparkSession.builder().appName("read-pinot-table").master("local[*]").getOrCreate()
25/09/04 07:59:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c377641

scala>

scala> val df = spark.read.
     |   format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
     |   option("table", "api_gateway_agg_monthly").
     |   option("tableType", "REALTIME").
     |   option("controller", "pinot.xxx.yyy.startree.cloud").
     |   option("broker", "broker.pinot.xxx.yyy.startree.cloud").
     |   option("secureMode", "true").
     |   option("authToken", "st-xxx-yyy").
     |   option("proxy.enabled", "true").
     |   option("grpc.proxy-uri", "proxy-grpc.pinot.xxx.yyy.startree.cloud").
     |   option("useGrpcServer", "true").
     |   load()
25/09/04 07:59:35 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
df: org.apache.spark.sql.DataFrame = [api_calls_count: bigint, developer_account_id: string ... 1 more field]

scala>

scala> println("Schema:")
Schema:

scala> df.printSchema()
root
 |-- api_calls_count: long (nullable = true)
 |-- developer_account_id: string (nullable = true)
 |-- monthsSinceEpoch: long (nullable = true)


scala>

scala> println("Sample rows:")
Sample rows:

scala> df.show(10, truncate = false)
25/09/04 07:59:39 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
+---------------+------------------------------------+----------------+
|api_calls_count|developer_account_id                |monthsSinceEpoch|
+---------------+------------------------------------+----------------+
|276            |000e2e63-12ef-e353-af76-6fe98d2e8747|1748736000000   |
|287            |00101768-41ba-0f01-36da-2dfee58f4703|1748736000000   |
|287            |00121c0f-3825-364e-18e4-dac8dd0ea8d1|1748736000000   |
|290            |00124c8f-2a3d-fe73-3090-482c3757af09|1748736000000   |
|299            |00128fa3-a508-5e39-0ab9-c3f2039a878a|1748736000000   |
|275            |0014e7b0-9d48-2ef9-9111-e830e74f32c4|1748736000000   |
|293            |0017b314-b083-9240-0117-515519c22fd4|1748736000000   |
|306            |0026de58-e04a-ec08-fe94-beb363324f30|1748736000000   |
|297            |002dff23-dbd7-8ef3-a55f-f225523519b4|1748736000000   |
|277            |002f40b0-409c-b3e8-bb69-049b3e321589|1748736000000   |
+---------------+------------------------------------+----------------+
only showing top 10 rows


scala>

scala> println(s"Total rows: ${df.count()}")
25/09/04 08:00:38 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
Total rows: 60000

scala>

scala> spark.stop()

scala> :quit

Example run with spark-submit

You can run the examples locally (e.g. using your IDE) in a standalone mode by starting a local Pinot cluster. See: https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally

You can also run the tests in cluster mode using following command:

export SPARK_CLUSTER=<YOUR_YARN_OR_SPARK_CLUSTER>

# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command
spark-submit \
    --class org.apache.pinot.connector.spark.v3.datasource.ExampleSparkPinotConnectorTest \
    --jars ./target/pinot-spark-3-connector-1.3.0-shaded.jar \
    --master $SPARK_CLUSTER \
    --deploy-mode cluster \
  ./target/pinot-spark-3-connector-1.3.0-tests.jar

This example demonstrates how to use the Pinot Spark 3 Connector to read data from a Pinot cluster via a proxy with authentication token support.

Security Best Practices

Production HTTPS Configuration

  • Always use HTTPS in production environments

  • Store certificates in secure locations with appropriate file permissions

  • Use proper certificate validation with valid truststore

  • Rotate certificates regularly

Production Authentication

  • Use service accounts with minimal required permissions

  • Store authentication tokens securely (environment variables, secret management systems)

  • Implement token rotation policies

  • Monitor authentication failures

Production gRPC Configuration

  • Enable TLS for gRPC communication in production

  • Use certificate-based authentication when possible

  • Configure appropriate message size limits based on your data

  • Use connection pooling for high-throughput scenarios

Future Works

  • Add integration tests for read operation

  • Add write support(pinot segment write logic will be changed in later versions of pinot)

Last updated

Was this helpful?