Flink
Batch ingestion of data into Apache Pinot using Apache Flink.
Apache Pinot supports using Apache Flink as a processing framework to generate and upload segments. The Pinot distribution includes a PinotSinkFunction that can be integrated into Flink applications (streaming or batch) to directly write data as segments into Pinot tables.
The PinotSinkFunction supports offline tables, realtime tables, and upsert tables (full upsert only). Data is buffered in memory and flushed as segments when the configured threshold is reached, then uploaded to the Pinot cluster.
Maven Dependency
To use the Pinot Flink Connector in your Flink job, add the following dependency to your pom.xml:
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-flink-connector</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>Replace 1.5.0-SNAPSHOT with the Pinot version you're using. For the latest stable version, check the Apache Pinot releases.
Note: The connector transitively includes dependencies for:
pinot-controller- For controller client APIspinot-segment-writer-file-based- For segment generationflink-streaming-javaandflink-java- Flink core dependencies
Offline Table Ingestion
Quick Start Example
Table Configuration
The PinotSinkFunction uses the TableConfig to determine batch ingestion settings for segment generation and upload. Here's an example table configuration:
Required configurations:
outputDirURI- Directory where segments are written before uploadpush.controllerUri- Pinot controller URL for segment upload
For a complete executable example, refer to FlinkQuickStart.java.
Realtime Table Ingestion
Non-Upsert Realtime Tables
For standard realtime tables without upsert, use the same approach as offline tables, but specify REALTIME as the table type:
Upsert Tables
Full Upsert Tables
Flink connector supports backfilling full upsert tables where each record contains all columns. The uploaded segments will correctly participate in upsert semantics based on the comparison column value.
Requirements:
Partitioning: Data must be partitioned using the same strategy as the upstream stream (e.g., Kafka)
Parallelism: Flink job parallelism must match the number of upstream stream/table partitions
Comparison Column: The values of the comparison column must have ordering consistent with the upstream stream. This ensures that Pinot can correctly resolve which record is the latest for a given key. See Pinot upsert comparison column docs for important considerations.
Example:
How Partitioning Works:
When uploading segments for upsert tables, Pinot uses a special segment naming convention UploadedRealtimeSegmentName that encodes the partition ID. The format is:
Example: flink__myTable__0__1724045187__1
Each Flink subtask generates segments for a specific partition based on its subtask index. The segments are then assigned to the same server instances that handle that partition for stream-consumed segments, ensuring correct upsert behavior across all segments.
Configuration Options:
You can customize segment generation using additional constructor parameters:
Partial Upsert Tables
WARNING: Flink-based upload is not recommended for partial upsert tables.
In partial upsert tables, uploaded segments contain only a subset of columns or an intermdiate row for a primary key. If the uploaded row is not in its final state and subsequent updates arrive via the stream, the partial upsert merger may produce inconsistent results between replicas. This can lead to data inconsistency that is difficult to detect and resolve.
For partial upsert tables, prefer stream-based ingestion only or ensure uploaded data represents the final state for each primary key.
Advanced Configuration
Segment Flush Control
Control when segments are flushed and uploaded:
Segment Naming
Customize segment naming and upload time for better organization:
Additional Resources
Design Proposal - Original design motivation
PR #13107 - Externally partitioned segments for upsert tables
PR #13837 - Flink connector enhancements for upsert backfill
Last updated
Was this helpful?

