# Spark-Pinot Connector

Spark-pinot connector to read data from Pinot.

Detailed read model documentation is here: [Spark Pinot Connector Read Model](/release-1.4.0/integrations/spark-pinot-connector/spark-pinot-connector-read-model.md)

The write model is Experimental and the documentation is here: [Spark Pinot Connector Write Model](/release-1.4.0/integrations/spark-pinot-connector/spark-pinot-connector-write-model.md)

### Features

* Query realtime, offline or hybrid tables
* Distributed, parallel scan
* Streaming reads using gRPC (optional)
* SQL support instead of PQL
* Column and filter push down to optimize performance
* Overlap between realtime and offline segments is queried exactly once for hybrid tables
* Schema discovery
  * Dynamic inference
  * Static analysis of case class
* Supports query options
* HTTPS/TLS support for secure connections

### Quick Start

```
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession
      .builder()
      .appName("spark-pinot-connector-test")
      .master("local")
      .getOrCreate()

import spark.implicits._

val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .load()
  .filter($"DestStateName" === "Florida")

data.show(100)
```

### Security Configuration

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

You can secure both HTTP and gRPC using a unified switch or explicit flags.

* Unified: set `secureMode=true` to enable HTTPS and gRPC TLS together (recommended)
* Explicit: set `useHttps` for REST and `grpc.use-plain-text=false` for gRPC

#### Quick examples

```
// Unified secure mode (enables HTTPS + gRPC TLS by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("secureMode", "true")
  .load()

// Explicit HTTPS only (gRPC remains plaintext by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("useHttps", "true")
  .load()

// Explicit gRPC TLS only (REST remains HTTP by default)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.use-plain-text", "false")
  .load()
```

#### HTTPS Configuration

When HTTPS is enabled (either via `secureMode=true` or `useHttps=true`), you can configure keystore/truststore as needed:

```
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("useHttps", "true")
  .option("keystorePath", "/path/to/keystore.jks")
  .option("keystorePassword", "keystorePassword")
  .option("truststorePath", "/path/to/truststore.jks")
  .option("truststorePassword", "truststorePassword")
  .load()
```

#### HTTPS Configuration Options

| Option               | Description                                                | Required | Default |
| -------------------- | ---------------------------------------------------------- | -------- | ------- |
| `secureMode`         | Unified switch to enable HTTPS and gRPC TLS                | No       | `false` |
| `useHttps`           | Enable HTTPS connections (overrides `secureMode` for REST) | No       | `false` |
| `keystorePath`       | Path to client keystore file (JKS format)                  | No       | None    |
| `keystorePassword`   | Password for the keystore                                  | No       | None    |
| `truststorePath`     | Path to truststore file (JKS format)                       | No       | None    |
| `truststorePassword` | Password for the truststore                                | No       | None    |

**Note:** If no truststore is provided when HTTPS is enabled, the connector will trust all certificates (not recommended for production use).

### Authentication Support

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

The connector supports custom authentication headers for secure access to Pinot clusters:

```
// Using Bearer token authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authToken", "my-jwt-token")  // Automatically adds "Authorization: Bearer my-jwt-token"
  .load()

// Using custom authentication header
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authHeader", "Authorization")
  .option("authToken", "Bearer my-custom-token")
  .load()

// Using API key authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("authHeader", "X-API-Key")
  .option("authToken", "my-api-key")
  .load()
```

#### Authentication Configuration Options

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

| Option       | Description                       | Required | Default                                        |
| ------------ | --------------------------------- | -------- | ---------------------------------------------- |
| `authHeader` | Custom authentication header name | No       | `Authorization` (when `authToken` is provided) |
| `authToken`  | Authentication token/value        | No       | None                                           |

**Note:** If only `authToken` is provided without `authHeader`, the connector will automatically use `Authorization: Bearer <token>`.

### Pinot Proxy Support

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.

#### Proxy Configuration Examples

```
// Basic proxy configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")  // Proxy endpoint
  .option("proxy.enabled", "true")
  .load()

// Proxy with authentication
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")
  .option("proxy.enabled", "true")
  .option("authToken", "my-proxy-token")
  .load()

// Proxy with gRPC configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("controller", "pinot-proxy:8080")
  .option("proxy.enabled", "true")
  .option("grpc.proxy-uri", "pinot-proxy:8094")  // gRPC proxy endpoint
  .load()
```

#### Proxy Configuration Options

| Option          | Description                                        | Required | Default |
| --------------- | -------------------------------------------------- | -------- | ------- |
| `proxy.enabled` | Use Pinot Proxy for controller and broker requests | No       | `false` |

**Note:** When proxy is enabled, the connector adds `FORWARD_HOST` and `FORWARD_PORT` headers to route requests to the actual Pinot services.

### gRPC Configuration

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.

#### gRPC Configuration Examples

```
// Basic gRPC configuration
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.port", "8091")
  .option("grpc.max-inbound-message-size", "256000000")  // 256MB
  .load()

// gRPC with TLS (explicit)
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("grpc.use-plain-text", "false")
  .option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
  .option("grpc.tls.keystore-password", "keystore-password")
  .option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
  .option("grpc.tls.truststore-password", "truststore-password")
  .load()

// gRPC with proxy
val data = spark.read
  .format("pinot")
  .option("table", "airlineStats")
  .option("tableType", "offline")
  .option("proxy.enabled", "true")
  .option("grpc.proxy-uri", "pinot-proxy:8094")
  .load()
```

#### gRPC Configuration Options

| Option                          | Description                                                             | Required | Default |
| ------------------------------- | ----------------------------------------------------------------------- | -------- | ------- |
| `grpc.port`                     | Pinot gRPC port                                                         | No       | `8090`  |
| `grpc.max-inbound-message-size` | Max inbound message bytes when init gRPC client                         | No       | `128MB` |
| `grpc.use-plain-text`           | Use plain text for gRPC communication (overrides `secureMode` for gRPC) | No       | `true`  |
| `grpc.tls.keystore-type`        | TLS keystore type for gRPC connection                                   | No       | `JKS`   |
| `grpc.tls.keystore-path`        | TLS keystore file location for gRPC connection                          | No       | None    |
| `grpc.tls.keystore-password`    | TLS keystore password                                                   | No       | None    |
| `grpc.tls.truststore-type`      | TLS truststore type for gRPC connection                                 | No       | `JKS`   |
| `grpc.tls.truststore-path`      | TLS truststore file location for gRPC connection                        | No       | None    |
| `grpc.tls.truststore-password`  | TLS truststore password                                                 | No       | None    |
| `grpc.tls.ssl-provider`         | SSL provider                                                            | No       | `JDK`   |
| `grpc.proxy-uri`                | Pinot Rest Proxy gRPC endpoint URI                                      | No       | None    |

**Note:** When using gRPC with proxy, the connector automatically adds `FORWARD_HOST` and `FORWARD_PORT` metadata headers for proper request routing.

### Example run with spark-shell

There are examples under <https://github.com/apache/pinot/tree/master/pinot-connectors/pinot-spark-3-connector/examples> .&#x20;

#### Prerequisites

* Apache Spark 3.x installed and `spark-shell` available in your PATH.
* Setup PINOT\_HOME env variable:

  ```
  export PINOT_HOME=/path/to/pinot
  ```
* The Pinot Spark 3 Connector shaded JAR built and available at:

  ```
  $PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar
  ```
* Example Scala script located at:

  ```
  $PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala
  ```

#### Scala Script to read data from Pinot Proxy

{% code title="read\_pinot\_from\_proxy\_with\_auth\_token.scala" lineNumbers="true" %}

```scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("read-pinot-airlineStats").master("local[*]").getOrCreate()

val df = spark.read.
  format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
  option("table", "myTable").
  option("tableType", "offline").
  option("controller", "pinot-proxy:8080").
  option("secureMode", "true").
  option("authToken", "st-xxxxxxx").
  option("proxy.enabled", "true").
  option("grpc.proxy-uri", "pinot-proxy:8094").
  option("useGrpcServer", "true").
  load()

println("Schema:")
df.printSchema()

println("Sample rows:")
df.show(10, truncate = false)

println(s"Total rows: ${df.count()}")

spark.stop()
```

{% endcode %}

#### Run with spark-shell

Launch the example in `spark-shell` with the following command:

```bash
spark-shell 
    --master 'local[*]' \
    --name read-pinot \
    --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"
```

#### Sample output

```
spark-shell --master 'local[*]' --name read-pinot --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"

25/09/04 07:59:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://xiang-mac-home.wyvern-sun.ts.net:4040
Spark context available as 'sc' (master = local[*], app id = local-1756997971428).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.15)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala>

scala> val spark = SparkSession.builder().appName("read-pinot-table").master("local[*]").getOrCreate()
25/09/04 07:59:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c377641

scala>

scala> val df = spark.read.
     |   format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
     |   option("table", "api_gateway_agg_monthly").
     |   option("tableType", "REALTIME").
     |   option("controller", "pinot.xxx.yyy.startree.cloud").
     |   option("broker", "broker.pinot.xxx.yyy.startree.cloud").
     |   option("secureMode", "true").
     |   option("authToken", "st-xxx-yyy").
     |   option("proxy.enabled", "true").
     |   option("grpc.proxy-uri", "proxy-grpc.pinot.xxx.yyy.startree.cloud").
     |   option("useGrpcServer", "true").
     |   load()
25/09/04 07:59:35 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
df: org.apache.spark.sql.DataFrame = [api_calls_count: bigint, developer_account_id: string ... 1 more field]

scala>

scala> println("Schema:")
Schema:

scala> df.printSchema()
root
 |-- api_calls_count: long (nullable = true)
 |-- developer_account_id: string (nullable = true)
 |-- monthsSinceEpoch: long (nullable = true)


scala>

scala> println("Sample rows:")
Sample rows:

scala> df.show(10, truncate = false)
25/09/04 07:59:39 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
+---------------+------------------------------------+----------------+
|api_calls_count|developer_account_id                |monthsSinceEpoch|
+---------------+------------------------------------+----------------+
|276            |000e2e63-12ef-e353-af76-6fe98d2e8747|1748736000000   |
|287            |00101768-41ba-0f01-36da-2dfee58f4703|1748736000000   |
|287            |00121c0f-3825-364e-18e4-dac8dd0ea8d1|1748736000000   |
|290            |00124c8f-2a3d-fe73-3090-482c3757af09|1748736000000   |
|299            |00128fa3-a508-5e39-0ab9-c3f2039a878a|1748736000000   |
|275            |0014e7b0-9d48-2ef9-9111-e830e74f32c4|1748736000000   |
|293            |0017b314-b083-9240-0117-515519c22fd4|1748736000000   |
|306            |0026de58-e04a-ec08-fe94-beb363324f30|1748736000000   |
|297            |002dff23-dbd7-8ef3-a55f-f225523519b4|1748736000000   |
|277            |002f40b0-409c-b3e8-bb69-049b3e321589|1748736000000   |
+---------------+------------------------------------+----------------+
only showing top 10 rows


scala>

scala> println(s"Total rows: ${df.count()}")
25/09/04 08:00:38 WARN HttpUtils: No truststore configured, trusting all certificates (not recommended for production)
Total rows: 60000

scala>

scala> spark.stop()

scala> :quit
```

### Example run with spark-submit

You can run the examples locally (e.g. using your IDE) in a standalone mode by starting a local Pinot cluster. See: <https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally>

You can also run the tests in *cluster mode* using following command:

```
export SPARK_CLUSTER=<YOUR_YARN_OR_SPARK_CLUSTER>

# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command
spark-submit \
    --class org.apache.pinot.connector.spark.v3.datasource.ExampleSparkPinotConnectorTest \
    --jars ./target/pinot-spark-3-connector-1.3.0-shaded.jar \
    --master $SPARK_CLUSTER \
    --deploy-mode cluster \
  ./target/pinot-spark-3-connector-1.3.0-tests.jar
```

This example demonstrates how to use the Pinot Spark 3 Connector to read data from a Pinot cluster via a proxy with authentication token support.

### Security Best Practices

{% hint style="warning" %}
This feature is supported after 1.4.0 release, please use current master or wait for 1.5.0
{% endhint %}

#### Production HTTPS Configuration

* Always use HTTPS in production environments
* Store certificates in secure locations with appropriate file permissions
* Use proper certificate validation with valid truststore
* Rotate certificates regularly

#### Production Authentication

* Use service accounts with minimal required permissions
* Store authentication tokens securely (environment variables, secret management systems)
* Implement token rotation policies
* Monitor authentication failures

#### Production gRPC Configuration

* Enable TLS for gRPC communication in production
* Use certificate-based authentication when possible
* Configure appropriate message size limits based on your data
* Use connection pooling for high-throughput scenarios

### Future Works

* Add integration tests for read operation
* Add write support(pinot segment write logic will be changed in later versions of pinot)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pinot.apache.org/release-1.4.0/integrations/spark-pinot-connector.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
