arrow-left

All pages
gitbookPowered by GitBook
1 of 2

Loading...

Loading...

Recipes

Here you will find a collection of ready-made sample applications and examples for real-world data

GitHub Events Stream

Steps for setting up a Pinot cluster and a real-time table which consumes from the GitHub events stream.

In this recipe you will set up an Apache Pinot cluster and a real-time table which consumes data flowing from a GitHub events stream. The stream is based on GitHub pull requests and uses Kafka.

In this recipe you will perform the following steps:

  1. Set up a Pinot cluster, to do which you will:

    a. Start zookeeper.

    b. Start the controller.

    c. Start the broker.

    d. Start the server.

  2. Set up a Kafka cluster.

  3. Create a Kafka topic, which will be called pullRequestMergedEvents.

  4. Create a real-time table called pullRequestMergedEvents and a schema.

  5. Start a task which reads from the and publishes events about merged pull requests to the topic.

  6. Query the real-time data.

hashtag
Steps

hashtag
Use either Docker images or launcher scripts

Pull the Docker image

Get the latest Docker image.

hashtag
Long version

Set up the Pinot cluster

Follow the instructions in to set up a Pinot cluster with the components:

hashtag
Kubernetes cluster

If you already have a Kubernetes cluster with Pinot and Kafka (see ), first create the topic, then set up the table and streaming using

hashtag
Query

Browse to the to view the data.

hashtag
Visualize with SuperSet

You can use SuperSet to visualize this data. Some of the interesting insights we captures were

hashtag
List the most active organizations during the lockdown

Repositories by number of commits in the Apache organization

To integrate with SuperSet you can check out the page.

  • Zookeeper

  • Controller

  • Broker

  • Server

  • Kafka

Create a Kafka topic

Create a Kafka topic called pullRequestMergedEvents for the demo.

Add a Pinot table and schema

The schema is present at examples/stream/githubEvents/pullRequestMergedEvents_schema.json and is also pasted below

The table config is present at examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json and is also pasted below.

circle-info

Note If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

Add the table and schema using the following command:

Publish events

Start streaming GitHub events into the Kafka topic:

circle-info

Prerequisites

Generate a personal access tokenarrow-up-right on GitHub.

hashtag
Short version

The short method of setting things up is to use the following command. Make sure to stop any previously running Pinot services.

Get Pinot

Follow the instructions in Build from sourcearrow-up-right to get the latest Pinot code

hashtag
Long version

Set up the Pinot cluster

Follow the instructions in Advanced Pinot Setuparrow-up-right to set up the Pinot cluster with the components:

  • Zookeeper

  • Controller

  • Broker

Create a Kafka topic

Download .

Create a Kafka topic called pullRequestMergedEvents for the demo.

Add a Pinot table and schema

Schema can be found at /examples/stream/githubevents/ in the release, and is also pasted below:

The table config can be found at /examples/stream/githubevents/ in the release, and is also pasted below.

circle-info

Note

If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

Add the table and schema using the command:

Publish events

Start streaming GitHub events into the Kafka topic

circle-info

Prerequisites

Generate a on GitHub.

hashtag
Short version

For a single command to setup all the above steps

GitHub events API arrow-up-right
Advanced Pinot Setuparrow-up-right
Running Pinot in Kubernetes
Query Consolearrow-up-right
SuperSet Integrations
export PINOT_VERSION=latest
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
docker exec \
  -t kafka \
$ cd kubernetes/helm
$ kubectl apply -f pinot-github-realtime-events.yml
pullRequestMergedEvents_schema.json
{
  "schemaName": "pullRequestMergedEvents",
  "dimensionFieldSpecs": [
    {
      "name": "title",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "labels",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "userId",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "userType",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "authorAssociation",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "mergedBy",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "assignees",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "authors",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "committers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "requestedReviewers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "requestedTeams",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "reviewers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "commenters",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "repo",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "organization",
      "dataType": "STRING",
      "defaultNullValue": ""
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "count",
      "dataType": "LONG",
      "defaultNullValue": 1
    },
    {
      "name": "numComments",
      "dataType": "LONG"
    },
    {
      "name": "numReviewComments",
      "dataType": "LONG"
    },
    {
      "name": "numCommits",
      "dataType": "LONG"
    },
    {
      "name": "numLinesAdded",
      "dataType": "LONG"
    },
    {
      "name": "numLinesDeleted",
      "dataType": "LONG"
    },
    {
      "name": "numFilesChanged",
      "dataType": "LONG"
    },
    {
      "name": "numAuthors",
      "dataType": "LONG"
    },
    {
      "name": "numCommitters",
      "dataType": "LONG"
    },
    {
      "name": "numReviewers",
      "dataType": "LONG"
    },
    {
      "name": "numCommenters",
      "dataType": "LONG"
    },
    {
      "name": "createdTimeMillis",
      "dataType": "LONG"
    },
    {
      "name": "elapsedTimeMillis",
      "dataType": "LONG"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "mergedTimeMillis",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:TIMESTAMP",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
pullRequestMergedEvents_realtime_table_config.json
{
  "tableName": "pullRequestMergedEvents",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "mergedTimeMillis",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "60",
    "schemaName": "pullRequestMergedEvents",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": [
      "organization",
      "repo"
    ],
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "pullRequestMergedEvents",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "pinot-zookeeper:2181/kafka",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.time": "12h",
      "realtime.segment.flush.threshold.rows": "100000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
$ docker run \
    --network=pinot-demo \
    --name pinot-streaming-table-creation \
    ${PINOT_IMAGE} AddTable \
    -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
    -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 \
    -exec
Executing command: AddTable -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
Sending request: http://pinot-controller:9000/schemas to controller: 20c241022a96, version: Unknown
{"status":"Table pullRequestMergedEvents_REALTIME succesfully added"}
$ docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-github-events-into-kafka \
    -d ${PINOT_IMAGE} StreamGitHubEvents \
    -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
    -topic pullRequestMergedEvents \
    -personalAccessToken <your_github_personal_access_token> \
    -kafkaBrokerList kafka:9092
$ docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-github-events-quick-start \
     ${PINOT_IMAGE} GitHubEventsQuickStart \
    -personalAccessToken <your_github_personal_access_token> 
Server
  • Kafka

  • /opt/kafka/bin/kafka-topics.sh \
    --zookeeper pinot-zookeeper:2181/kafka \
    --partitions=1 --replication-factor=1 \
    --create --topic pullRequestMergedEvents
    Apache Kafkaarrow-up-right
    personal access tokenarrow-up-right
    $ bin/kafka-topics.sh \
      --create \
      --bootstrap-server localhost:19092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pullRequestMergedEvents
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "timeType": "MILLISECONDS",
          "timeFormat": "EPOCH",
          "dataType": "LONG",
          "name": "mergedTimeMillis"
        }
      }
    }
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    
    $ bin/pinot-admin.sh AddTable \
      -tableConfigFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
      -exec
    $ bin/pinot-admin.sh StreamGitHubEvents \
      -topic pullRequestMergedEvents \
      -personalAccessToken <your_github_personal_access_token> \
      -kafkaBrokerList localhost:19092 \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json
    $ bin/pinot-admin.sh GitHubEventsQuickStart \
      -personalAccessToken <your_github_personal_access_token>