arrow-left

All pages
gitbookPowered by GitBook
1 of 2

Loading...

Loading...

GitHub Events Stream

Steps for setting up a Pinot cluster and a realtime table which consumes from the GitHub events stream.

hashtag
Pull Request Merged Events Stream

In this recipe, we will

  1. Set up a Pinot cluster, in the steps

    a. Start zookeeper

    b. Start controller

    c. Start broker

    d. Start server

  2. Set up a Kafka cluster

  3. Create a Kafka topic - pullRequestMergedEvents

  4. Create a realtime table - pullRequestMergedEvents and a schema

  5. Start a task which reads from and publishes events about merged pull requests to the topic.

  6. Query the realtime data

hashtag
Steps

hashtag
Using Docker images or Launcher Scripts

hashtag
Pull docker image

Get the latest Docker image.

hashtag
Long Version

hashtag
Kubernetes cluster

If you already have a Kubernetes cluster with Pinot and Kafka (see ), first create the topic and then setup the table and streaming using

hashtag
Query

Head over to the to checkout the data!

hashtag
Visualizing on SuperSet

You can use SuperSet to visualize this data. Some of the interesting insights we captures were

hashtag
Most Active organizations during the lockdown

Repositories by number of commits in the Apache organization

To integrate with SuperSet you can check out the page.

Recipes

Here you will find a collection of ready-made sample applications and examples for real-world data

hashtag
Set up the Pinot cluster

Follow the instructions in Advanced Pinot Setuparrow-up-right to setup the Pinot cluster with the components:

  1. Zookeeper

  2. Controller

  3. Broker

  4. Server

  5. Kafka

hashtag
Create a Kafka topic

Create a Kafka topic called pullRequestMergedEvents for the demo.

hashtag
Add Pinot table and schema

The schema is present at examples/stream/githubEvents/pullRequestMergedEvents_schema.json and is also pasted below

The table config is present at examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json and is also pasted below.

circle-info

Note If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

Add the table and schema using the following command

hashtag
Publish events

Start streaming GitHub events into the Kafka topic

circle-info

Prerequisites

Generate a personal access tokenarrow-up-right on GitHub.

hashtag
Short Version

For a single command to setup all the above steps, use the following command. Make sure to stop any previous running Pinot services.

hashtag
Get Pinot

Follow instructions in Build from sourcearrow-up-right to get the latest Pinot code

hashtag
Long Version

hashtag
Set up the Pinot cluster

Follow the instructions in to setup the Pinot cluster with the components:

  1. Zookeeper

  2. Controller

  3. Broker

hashtag
Create a Kafka topic

Download release.

Create a Kafka topic called pullRequestMergedEvents for the demo.

hashtag
Add Pinot table and schema

Schema can be found at /examples/stream/githubevents/ in the release, and is also pasted below:

Table config can be found at /examples/stream/githubevents/ in the release, and is also pasted below.

circle-info

Note

If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

Add the table and schema using the command

hashtag
Publish events

Start streaming GitHub events into the Kafka topic

circle-info

Prerequisites

Generate a on GitHub.

hashtag
Short Version

For a single command to setup all the above steps

export PINOT_VERSION=latest
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
GitHub events API arrow-up-right
Running Pinot in Kubernetes
Query Consolearrow-up-right
SuperSet Integrations
docker exec \
  -t kafka \
$ cd kubernetes/helm
$ kubectl apply -f pinot-github-realtime-events.yml
pullRequestMergedEvents_schema.json
{
  "schemaName": "pullRequestMergedEvents",
  "dimensionFieldSpecs": [
    {
      "name": "title",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "labels",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "userId",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "userType",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "authorAssociation",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "mergedBy",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "assignees",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "authors",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "committers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "requestedReviewers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "requestedTeams",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "reviewers",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "commenters",
      "dataType": "STRING",
      "singleValueField": false,
      "defaultNullValue": ""
    },
    {
      "name": "repo",
      "dataType": "STRING",
      "defaultNullValue": ""
    },
    {
      "name": "organization",
      "dataType": "STRING",
      "defaultNullValue": ""
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "count",
      "dataType": "LONG",
      "defaultNullValue": 1
    },
    {
      "name": "numComments",
      "dataType": "LONG"
    },
    {
      "name": "numReviewComments",
      "dataType": "LONG"
    },
    {
      "name": "numCommits",
      "dataType": "LONG"
    },
    {
      "name": "numLinesAdded",
      "dataType": "LONG"
    },
    {
      "name": "numLinesDeleted",
      "dataType": "LONG"
    },
    {
      "name": "numFilesChanged",
      "dataType": "LONG"
    },
    {
      "name": "numAuthors",
      "dataType": "LONG"
    },
    {
      "name": "numCommitters",
      "dataType": "LONG"
    },
    {
      "name": "numReviewers",
      "dataType": "LONG"
    },
    {
      "name": "numCommenters",
      "dataType": "LONG"
    },
    {
      "name": "createdTimeMillis",
      "dataType": "LONG"
    },
    {
      "name": "elapsedTimeMillis",
      "dataType": "LONG"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "mergedTimeMillis",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:TIMESTAMP",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
pullRequestMergedEvents_realtime_table_config.json
{
  "tableName": "pullRequestMergedEvents",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "mergedTimeMillis",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "60",
    "schemaName": "pullRequestMergedEvents",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": [
      "organization",
      "repo"
    ],
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "pullRequestMergedEvents",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "pinot-zookeeper:2181/kafka",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.time": "12h",
      "realtime.segment.flush.threshold.rows": "100000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
$ docker run \
    --network=pinot-demo \
    --name pinot-streaming-table-creation \
    ${PINOT_IMAGE} AddTable \
    -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
    -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 \
    -exec
Executing command: AddTable -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
Sending request: http://pinot-controller:9000/schemas to controller: 20c241022a96, version: Unknown
{"status":"Table pullRequestMergedEvents_REALTIME succesfully added"}
$ docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-github-events-into-kafka \
    -d ${PINOT_IMAGE} StreamGitHubEvents \
    -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
    -topic pullRequestMergedEvents \
    -personalAccessToken <your_github_personal_access_token> \
    -kafkaBrokerList kafka:9092
$ docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-github-events-quick-start \
     ${PINOT_IMAGE} GitHubEventsQuickStart \
    -personalAccessToken <your_github_personal_access_token> 
Server
  • Kafka

  • /opt/kafka/bin/kafka-topics.sh \
    --zookeeper pinot-zookeeper:2181/kafka \
    --partitions=1 --replication-factor=1 \
    --create --topic pullRequestMergedEvents
    Advanced Pinot Setuparrow-up-right
    Apache Kafkaarrow-up-right
    personal access tokenarrow-up-right
    $ bin/kafka-topics.sh \
      --create \
      --bootstrap-server localhost:19092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pullRequestMergedEvents
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "timeType": "MILLISECONDS",
          "timeFormat": "EPOCH",
          "dataType": "LONG",
          "name": "mergedTimeMillis"
        }
      }
    }
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    
    $ bin/pinot-admin.sh AddTable \
      -tableConfigFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
      -exec
    $ bin/pinot-admin.sh StreamGitHubEvents \
      -topic pullRequestMergedEvents \
      -personalAccessToken <your_github_personal_access_token> \
      -kafkaBrokerList localhost:19092 \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json
    $ bin/pinot-admin.sh GitHubEventsQuickStart \
      -personalAccessToken <your_github_personal_access_token>