Flink

Pinot supports Apache Flink as a processing framework to push segment files to the database.

Pinot distribution contains an Apache Flink SinkFunction that can be used as part of the Apache Flink application (Streaming or Batch) to directly write into a designated Pinot database.

Example

Here is an example code snippet to show how to utilize the PinotSinkFunction in a Flink streaming application:

// some environmental setup
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> srcRows = execEnv.addSource(new FlinkKafkaConsumer<Row>(...));
RowTypeInfo typeInfo = new RowTypeInfo(
  new TypeInformation[]{Types.FLOAT, Types.FLOAT, Types.STRING, Types.STRING},
  new String[]{"lon", "lat", "address", "name"});


// add processing logic for the data stream for example:
DataStream<Row> processedRows = srcRow.keyBy(r -> r.getField(0));
...

// configurations for PinotSinkFunction
Schema pinotSchema = ...
TableConfig pinotTableConfig = ...
processedRows.addSink(new PinotSinkFunction<>(
  new FlinkRowGenericRowConverter(typeInfo), 
  pinotTableConfig,
  pinotSchema);

// execute the program
execEnv.execute();

As the example shown above, the only required information from the Pinot side is the table schema and the table config.

For a more detail executable please refer to the quick start example.

Table Config

PinotSinkFunction uses mostly the TableConfig object to infer the batch ingestion configuration to start a SegmentWriter and SegmentUploader to communicate with the Pinot cluster.

Note that even though in the above example Flink application is running in streaming mode, the data is still batch together and flush/upload to Pinot once the flush threshold is reached. It is not a direct streaming write into Pinot.

Here is an example table config

{
  "tableName" : "tbl_OFFLINE",
  "tableType" : "OFFLINE",
  "segmentsConfig" : {
    // ...
  },
  "tenants" : {
    // ...
  },
  "tableIndexConfig" : {
    // ....
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "HOURLY", 
      "batchConfigMaps": [
        {
          "outputDirURI": "file://path/to/flink/segmentwriter/output/dir",
          "overwriteOutput": "false",
          "push.controllerUri": "https://target.pinot.cluster.controller.url"
        }
      ]
    }
  }
}

the only required configurations are:

  • "outputDirURI": where PinotSinkFunction should write the constructed segment file to

  • "push.controllerUri": which Pinot cluster (controller) URL PinotSinkFunction should communicate with.

The rest of the configurations are standard for any Pinot table.

Last updated