arrow-left

All pages
gitbookPowered by GitBook
1 of 1

Loading...

GitHub Events Stream

Steps for setting up a Pinot cluster and a real-time table which consumes from the GitHub events stream.

In this recipe you will set up an Apache Pinot cluster and a real-time table which consumes data flowing from a GitHub events stream. The stream is based on GitHub pull requests and uses Kafka.

In this recipe you will perform the following steps:

  1. Set up a Pinot cluster, to do which you will:

    a. Start zookeeper.

    b. Start the controller.

    c. Start the broker.

    d. Start the server.

  2. Set up a Kafka cluster.

  3. Create a Kafka topic, which will be called pullRequestMergedEvents.

  4. Create a real-time table called pullRequestMergedEvents and a schema.

  5. Start a task which reads from the and publishes events about merged pull requests to the topic.

  6. Query the real-time data.

hashtag
Steps

hashtag
Use either Docker images or launcher scripts

Pull the Docker image

Get the latest Docker image.

Long version

Set up the Pinot cluster

Follow the instructions in to set up a Pinot cluster with the components:

hashtag
Kubernetes cluster

If you already have a Kubernetes cluster with Pinot and Kafka (see ), first create the topic, then set up the table and streaming using

hashtag
Query

Browse to the to view the data.

hashtag
Visualize with SuperSet

You can use SuperSet to visualize this data. Some of the interesting insights we captures were

hashtag
List the most active organizations during the lockdown

Repositories by number of commits in the Apache organization

To integrate with SuperSet you can check out the page.

Zookeeper
  • Controller

  • Broker

  • Server

  • Kafka

  • Create a Kafka topic

    Create a Kafka topic called pullRequestMergedEvents for the demo.

    Add a Pinot table and schema

    The schema is present at examples/stream/githubEvents/pullRequestMergedEvents_schema.json and is also pasted below

    The table config is present at examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json and is also pasted below.

    circle-info

    Note If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

    Add the table and schema using the following command:

    Publish events

    Start streaming GitHub events into the Kafka topic:

    circle-info

    Prerequisites

    Generate a personal access tokenarrow-up-right on GitHub.

    Short version

    The short method of setting things up is to use the following command. Make sure to stop any previously running Pinot services.

    Get Pinot

    Follow the instructions in Build from sourcearrow-up-right to get the latest Pinot code

    Long version

    Set up the Pinot cluster

    Follow the instructions in Advanced Pinot Setuparrow-up-right to set up the Pinot cluster with the components:

    • Zookeeper

    • Controller

    • Broker

    • Server

    • Kafka

    Create a Kafka topic

    Download .

    Create a Kafka topic called pullRequestMergedEvents for the demo.

    Add a Pinot table and schema

    Schema can be found at /examples/stream/githubevents/ in the release, and is also pasted below:

    The table config can be found at /examples/stream/githubevents/ in the release, and is also pasted below.

    circle-info

    Note

    If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

    Add the table and schema using the command:

    Publish events

    Start streaming GitHub events into the Kafka topic

    circle-info

    Prerequisites

    Generate a on GitHub.

    Short version

    For a single command to setup all the above steps

    GitHub events API arrow-up-right
    Advanced Pinot Setuparrow-up-right
    Running Pinot in Kubernetes
    Query Consolearrow-up-right
    SuperSet Integrations
    export PINOT_VERSION=latest
    export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
    docker pull ${PINOT_IMAGE}
    docker exec \
      -t kafka \
    
    $ cd kubernetes/helm
    $ kubectl apply -f pinot-github-realtime-events.yml
    pullRequestMergedEvents_schema.json
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "mergedTimeMillis",
          "dataType": "TIMESTAMP",
          "format": "1:MILLISECONDS:TIMESTAMP",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }
    pullRequestMergedEvents_realtime_table_config.json
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "pinot-zookeeper:2181/kafka",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    $ docker run \
        --network=pinot-demo \
        --name pinot-streaming-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
        -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: 20c241022a96, version: Unknown
    {"status":"Table pullRequestMergedEvents_REALTIME succesfully added"}
    $ docker run --rm -ti \
        --network=pinot-demo \
        --name pinot-github-events-into-kafka \
        -d ${PINOT_IMAGE} StreamGitHubEvents \
        -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
        -topic pullRequestMergedEvents \
        -personalAccessToken <your_github_personal_access_token> \
        -kafkaBrokerList kafka:9092
    $ docker run --rm -ti \
        --network=pinot-demo \
        --name pinot-github-events-quick-start \
         ${PINOT_IMAGE} GitHubEventsQuickStart \
        -personalAccessToken <your_github_personal_access_token> 
    /opt/kafka/bin/kafka-topics.sh \
    --zookeeper pinot-zookeeper:2181/kafka \
    --partitions=1 --replication-factor=1 \
    --create --topic pullRequestMergedEvents
    Apache Kafkaarrow-up-right
    personal access tokenarrow-up-right
    $ bin/kafka-topics.sh \
      --create \
      --bootstrap-server localhost:19092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pullRequestMergedEvents
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "timeType": "MILLISECONDS",
          "timeFormat": "EPOCH",
          "dataType": "LONG",
          "name": "mergedTimeMillis"
        }
      }
    }
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    
    $ bin/pinot-admin.sh AddTable \
      -tableConfigFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
      -exec
    $ bin/pinot-admin.sh StreamGitHubEvents \
      -topic pullRequestMergedEvents \
      -personalAccessToken <your_github_personal_access_token> \
      -kafkaBrokerList localhost:19092 \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json
    $ bin/pinot-admin.sh GitHubEventsQuickStart \
      -personalAccessToken <your_github_personal_access_token>