githubEdit

Flink

Batch ingestion of data into Apache Pinot using Apache Flink.

Apache Pinot supports using Apache Flink as a processing framework to generate and upload segments. The Pinot distribution includes a PinotSinkFunctionarrow-up-right that can be integrated into Flink applications (streaming or batch) to directly write data as segments into Pinot tables.

The PinotSinkFunction supports offline tables, realtime tables, and upsert tables (full upsert only). Data is buffered in memory and flushed as segments when the configured threshold is reached, then uploaded to the Pinot cluster.

Maven Dependency

To use the Pinot Flink Connector in your Flink job, add the following dependency to your pom.xml:

<dependency>
  <groupId>org.apache.pinot</groupId>
  <artifactId>pinot-flink-connector</artifactId>
  <version>1.5.0-SNAPSHOT</version>
</dependency>

Replace 1.5.0-SNAPSHOT with the Pinot version you're using. For the latest stable version, check the Apache Pinot releasesarrow-up-right.

Note: The connector transitively includes dependencies for:

  • pinot-controller - For controller client APIs

  • pinot-segment-writer-file-based - For segment generation

  • flink-streaming-java and flink-java - Flink core dependencies

Offline Table Ingestion

Quick Start Example

Table Configuration

The PinotSinkFunction uses the TableConfig to determine batch ingestion settings for segment generation and upload. Here's an example table configuration:

Required configurations:

  • outputDirURI - Directory where segments are written before upload

  • push.controllerUri - Pinot controller URL for segment upload

For a complete executable example, refer to FlinkQuickStart.javaarrow-up-right.

Realtime Table Ingestion

Non-Upsert Realtime Tables

For standard realtime tables without upsert, use the same approach as offline tables, but specify REALTIME as the table type:

Upsert Tables

Full Upsert Tables

Flink connector supports backfilling full upsert tables where each record contains all columns. The uploaded segments will correctly participate in upsert semantics based on the comparison column value.

Requirements:

  1. Partitioning: Data must be partitioned using the same strategy as the upstream stream (e.g., Kafka)

  2. Parallelism: Flink job parallelism must match the number of upstream stream/table partitions

  3. Comparison Column: The values of the comparison column must have ordering consistent with the upstream stream. This ensures that Pinot can correctly resolve which record is the latest for a given key. See Pinot upsert comparison column docsarrow-up-right for important considerations.

Example:

How Partitioning Works:

When uploading segments for upsert tables, Pinot uses a special segment naming convention UploadedRealtimeSegmentNamearrow-up-right that encodes the partition ID. The format is:

Example: flink__myTable__0__1724045187__1

Each Flink subtask generates segments for a specific partition based on its subtask index. The segments are then assigned to the same server instances that handle that partition for stream-consumed segments, ensuring correct upsert behavior across all segments.

Configuration Options:

You can customize segment generation using additional constructor parameters:

Partial Upsert Tables

WARNING: Flink-based upload is not recommended for partial upsert tables.

In partial upsert tables, uploaded segments contain only a subset of columns or an intermdiate row for a primary key. If the uploaded row is not in its final state and subsequent updates arrive via the stream, the partial upsert merger may produce inconsistent results between replicas. This can lead to data inconsistency that is difficult to detect and resolve.

For partial upsert tables, prefer stream-based ingestion only or ensure uploaded data represents the final state for each primary key.

Advanced Configuration

Segment Flush Control

Control when segments are flushed and uploaded:

Segment Naming

Customize segment naming and upload time for better organization:

Additional Resources

Last updated

Was this helpful?