Spark-Pinot Connector
Use the Spark-Pinot connector to read data from and write data to Pinot.
Spark-pinot connector to read data from Pinot.
Detailed read model documentation is here: Spark Pinot Connector Read Model
The write model is Experimental and the documentation is here: Spark Pinot Connector Write Model
Features
Query realtime, offline or hybrid tables
Distributed, parallel scan
Streaming reads using gRPC (optional)
SQL support instead of PQL
Column and filter push down to optimize performance
Overlap between realtime and offline segments is queried exactly once for hybrid tables
Schema discovery
Dynamic inference
Static analysis of case class
Supports query options
HTTPS/TLS support for secure connections
Quick Start
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("spark-pinot-connector-test")
.master("local")
.getOrCreate()
import spark.implicits._
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.load()
.filter($"DestStateName" === "Florida")
data.show(100)
Security Configuration
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
You can secure both HTTP and gRPC using a unified switch or explicit flags.
Unified: set
secureMode=true
to enable HTTPS and gRPC TLS together (recommended)Explicit: set
useHttps
for REST andgrpc.use-plain-text=false
for gRPC
Quick examples
// Unified secure mode (enables HTTPS + gRPC TLS by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("secureMode", "true")
.load()
// Explicit HTTPS only (gRPC remains plaintext by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("useHttps", "true")
.load()
// Explicit gRPC TLS only (REST remains HTTP by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.use-plain-text", "false")
.load()
HTTPS Configuration
When HTTPS is enabled (either via secureMode=true
or useHttps=true
), you can configure keystore/truststore as needed:
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("useHttps", "true")
.option("keystorePath", "/path/to/keystore.jks")
.option("keystorePassword", "keystorePassword")
.option("truststorePath", "/path/to/truststore.jks")
.option("truststorePassword", "truststorePassword")
.load()
HTTPS Configuration Options
secureMode
Unified switch to enable HTTPS and gRPC TLS
No
false
useHttps
Enable HTTPS connections (overrides secureMode
for REST)
No
false
keystorePath
Path to client keystore file (JKS format)
No
None
keystorePassword
Password for the keystore
No
None
truststorePath
Path to truststore file (JKS format)
No
None
truststorePassword
Password for the truststore
No
None
Note: If no truststore is provided when HTTPS is enabled, the connector will trust all certificates (not recommended for production use).
Authentication Support
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
The connector supports custom authentication headers for secure access to Pinot clusters:
// Using Bearer token authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authToken", "my-jwt-token") // Automatically adds "Authorization: Bearer my-jwt-token"
.load()
// Using custom authentication header
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authHeader", "Authorization")
.option("authToken", "Bearer my-custom-token")
.load()
// Using API key authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authHeader", "X-API-Key")
.option("authToken", "my-api-key")
.load()
Authentication Configuration Options
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
authHeader
Custom authentication header name
No
Authorization
(when authToken
is provided)
authToken
Authentication token/value
No
None
Note: If only authToken
is provided without authHeader
, the connector will automatically use Authorization: Bearer <token>
.
Pinot Proxy Support
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.
Proxy Configuration Examples
// Basic proxy configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080") // Proxy endpoint
.option("proxy.enabled", "true")
.load()
// Proxy with authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080")
.option("proxy.enabled", "true")
.option("authToken", "my-proxy-token")
.load()
// Proxy with gRPC configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080")
.option("proxy.enabled", "true")
.option("grpc.proxy-uri", "pinot-proxy:8094") // gRPC proxy endpoint
.load()
Proxy Configuration Options
proxy.enabled
Use Pinot Proxy for controller and broker requests
No
false
Note: When proxy is enabled, the connector adds FORWARD_HOST
and FORWARD_PORT
headers to route requests to the actual Pinot services.
gRPC Configuration
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.
gRPC Configuration Examples
// Basic gRPC configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.port", "8091")
.option("grpc.max-inbound-message-size", "256000000") // 256MB
.load()
// gRPC with TLS (explicit)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.use-plain-text", "false")
.option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
.option("grpc.tls.keystore-password", "keystore-password")
.option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
.option("grpc.tls.truststore-password", "truststore-password")
.load()
// gRPC with proxy
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("proxy.enabled", "true")
.option("grpc.proxy-uri", "pinot-proxy:8094")
.load()
gRPC Configuration Options
grpc.port
Pinot gRPC port
No
8090
grpc.max-inbound-message-size
Max inbound message bytes when init gRPC client
No
128MB
grpc.use-plain-text
Use plain text for gRPC communication (overrides secureMode
for gRPC)
No
true
grpc.tls.keystore-type
TLS keystore type for gRPC connection
No
JKS
grpc.tls.keystore-path
TLS keystore file location for gRPC connection
No
None
grpc.tls.keystore-password
TLS keystore password
No
None
grpc.tls.truststore-type
TLS truststore type for gRPC connection
No
JKS
grpc.tls.truststore-path
TLS truststore file location for gRPC connection
No
None
grpc.tls.truststore-password
TLS truststore password
No
None
grpc.tls.ssl-provider
SSL provider
No
JDK
grpc.proxy-uri
Pinot Rest Proxy gRPC endpoint URI
No
None
Note: When using gRPC with proxy, the connector automatically adds FORWARD_HOST
and FORWARD_PORT
metadata headers for proper request routing.
Example run with spark-shell
There are examples under https://github.com/apache/pinot/tree/master/pinot-connectors/pinot-spark-3-connector/examples .
Prerequisites
Apache Spark 3.x installed and
spark-shell
available in your PATH.Setup PINOT_HOME env variable:
export PINOT_HOME=/path/to/pinot
The Pinot Spark 3 Connector shaded JAR built and available at:
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar
Example Scala script located at:
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala
Scala Script to read data from Pinot Proxy
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("read-pinot-airlineStats").master("local[*]").getOrCreate()
val df = spark.read.
format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
option("table", "myTable").
option("tableType", "offline").
option("controller", "pinot-proxy:8080").
option("secureMode", "true").
option("authToken", "st-xxxxxxx").
option("proxy.enabled", "true").
option("grpc.proxy-uri", "pinot-proxy:8094").
option("useGrpcServer", "true").
load()
println("Schema:")
df.printSchema()
println("Sample rows:")
df.show(10, truncate = false)
println(s"Total rows: ${df.count()}")
spark.stop()
Run with spark-shell
Launch the example in spark-shell
with the following command:
spark-shell
--master 'local[*]' \
--name read-pinot \
--jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"
Sample output
spark-shell --master 'local[*]' --name read-pinot --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"
25/09/04 07:59:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://xiang-mac-home.wyvern-sun.ts.net:4040
Spark context available as 'sc' (master = local[*], app id = local-1756997971428).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.1
/_/
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.15)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala>
scala> val spark = SparkSession.builder().appName("read-pinot-table").master("local[*]").getOrCreate()
25/09/04 07:59:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c377641
scala>
scala> val df = spark.read.
| format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
| option("table", "api_gateway_agg_monthly").
| option("tableType", "REALTIME").
| option("controller", "pinot.xxx.yyy.startree.cloud").
| option("broker", "broker.pinot.xxx.yyy.startree.cloud").
| option("secureMode", "true").
| option("authToken", "st-xxx-yyy").
| option("proxy.enabled", "true").
| option("grpc.proxy-uri", "proxy-grpc.pinot.xxx.yyy.startree.cloud").
| option("useGrpcServer", "true").
| load()
25/09/04 07:59:35 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
df: org.apache.spark.sql.DataFrame = [api_calls_count: bigint, developer_account_id: string ... 1 more field]
scala>
scala> println("Schema:")
Schema:
scala> df.printSchema()
root
|-- api_calls_count: long (nullable = true)
|-- developer_account_id: string (nullable = true)
|-- monthsSinceEpoch: long (nullable = true)
scala>
scala> println("Sample rows:")
Sample rows:
scala> df.show(10, truncate = false)
25/09/04 07:59:39 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
+---------------+------------------------------------+----------------+
|api_calls_count|developer_account_id |monthsSinceEpoch|
+---------------+------------------------------------+----------------+
|276 |000e2e63-12ef-e353-af76-6fe98d2e8747|1748736000000 |
|287 |00101768-41ba-0f01-36da-2dfee58f4703|1748736000000 |
|287 |00121c0f-3825-364e-18e4-dac8dd0ea8d1|1748736000000 |
|290 |00124c8f-2a3d-fe73-3090-482c3757af09|1748736000000 |
|299 |00128fa3-a508-5e39-0ab9-c3f2039a878a|1748736000000 |
|275 |0014e7b0-9d48-2ef9-9111-e830e74f32c4|1748736000000 |
|293 |0017b314-b083-9240-0117-515519c22fd4|1748736000000 |
|306 |0026de58-e04a-ec08-fe94-beb363324f30|1748736000000 |
|297 |002dff23-dbd7-8ef3-a55f-f225523519b4|1748736000000 |
|277 |002f40b0-409c-b3e8-bb69-049b3e321589|1748736000000 |
+---------------+------------------------------------+----------------+
only showing top 10 rows
scala>
scala> println(s"Total rows: ${df.count()}")
25/09/04 08:00:38 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
Total rows: 60000
scala>
scala> spark.stop()
scala> :quit
Example run with spark-submit
You can run the examples locally (e.g. using your IDE) in a standalone mode by starting a local Pinot cluster. See: https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally
You can also run the tests in cluster mode using following command:
export SPARK_CLUSTER=<YOUR_YARN_OR_SPARK_CLUSTER>
# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command
spark-submit \
--class org.apache.pinot.connector.spark.v3.datasource.ExampleSparkPinotConnectorTest \
--jars ./target/pinot-spark-3-connector-1.3.0-shaded.jar \
--master $SPARK_CLUSTER \
--deploy-mode cluster \
./target/pinot-spark-3-connector-1.3.0-tests.jar
This example demonstrates how to use the Pinot Spark 3 Connector to read data from a Pinot cluster via a proxy with authentication token support.
Security Best Practices
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
Production HTTPS Configuration
Always use HTTPS in production environments
Store certificates in secure locations with appropriate file permissions
Use proper certificate validation with valid truststore
Rotate certificates regularly
Production Authentication
Use service accounts with minimal required permissions
Store authentication tokens securely (environment variables, secret management systems)
Implement token rotation policies
Monitor authentication failures
Production gRPC Configuration
Enable TLS for gRPC communication in production
Use certificate-based authentication when possible
Configure appropriate message size limits based on your data
Use connection pooling for high-throughput scenarios
Future Works
Add integration tests for read operation
Add write support(pinot segment write logic will be changed in later versions of pinot)
Last updated
Was this helpful?