GitHub Events Stream
Steps for setting up a Pinot cluster and a realtime table which consumes from the GitHub events stream.

Pull Request Merged Events Stream

In this recipe, we will
    1.
    Set up a Pinot cluster, in the steps
    a. Start zookeeper
    b. Start controller
    c. Start broker
    d. Start server
    2.
    Set up a Kafka cluster
    3.
    Create a Kafka topic - pullRequestMergedEvents
    4.
    Create a realtime table - pullRequestMergedEvents and a schema
    5.
    Start a task which reads from GitHub events API and publishes events about merged pull requests to the topic.
    6.
    Query the realtime data

Steps

Using Docker images or Launcher Scripts

Docker
Launcher scripts

Pull docker image

Get the latest Docker image.
1
export PINOT_VERSION=latest
2
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
3
docker pull ${PINOT_IMAGE}
Copied!

Long Version

Set up the Pinot cluster

Follow the instructions in Advanced Pinot Setup to setup the Pinot cluster with the components:
    1.
    Zookeeper
    2.
    Controller
    3.
    Broker
    4.
    Server
    5.
    Kafka

Create a Kafka topic

Create a Kafka topic called pullRequestMergedEvents for the demo.
1
docker exec \
2
-t kafka \
3
/opt/kafka/bin/kafka-topics.sh \
4
--zookeeper pinot-zookeeper:2181/kafka \
5
--partitions=1 --replication-factor=1 \
6
--create --topic pullRequestMergedEvents
Copied!

Add Pinot table and schema

The schema is present at examples/stream/githubEvents/pullRequestMergedEvents_schema.json and is also pasted below
pullRequestMergedEvents_schema.json
1
{
2
"schemaName": "pullRequestMergedEvents",
3
"dimensionFieldSpecs": [
4
{
5
"name": "title",
6
"dataType": "STRING",
7
"defaultNullValue": ""
8
},
9
{
10
"name": "labels",
11
"dataType": "STRING",
12
"singleValueField": false,
13
"defaultNullValue": ""
14
},
15
{
16
"name": "userId",
17
"dataType": "STRING",
18
"defaultNullValue": ""
19
},
20
{
21
"name": "userType",
22
"dataType": "STRING",
23
"defaultNullValue": ""
24
},
25
{
26
"name": "authorAssociation",
27
"dataType": "STRING",
28
"defaultNullValue": ""
29
},
30
{
31
"name": "mergedBy",
32
"dataType": "STRING",
33
"defaultNullValue": ""
34
},
35
{
36
"name": "assignees",
37
"dataType": "STRING",
38
"singleValueField": false,
39
"defaultNullValue": ""
40
},
41
{
42
"name": "authors",
43
"dataType": "STRING",
44
"singleValueField": false,
45
"defaultNullValue": ""
46
},
47
{
48
"name": "committers",
49
"dataType": "STRING",
50
"singleValueField": false,
51
"defaultNullValue": ""
52
},
53
{
54
"name": "requestedReviewers",
55
"dataType": "STRING",
56
"singleValueField": false,
57
"defaultNullValue": ""
58
},
59
{
60
"name": "requestedTeams",
61
"dataType": "STRING",
62
"singleValueField": false,
63
"defaultNullValue": ""
64
},
65
{
66
"name": "reviewers",
67
"dataType": "STRING",
68
"singleValueField": false,
69
"defaultNullValue": ""
70
},
71
{
72
"name": "commenters",
73
"dataType": "STRING",
74
"singleValueField": false,
75
"defaultNullValue": ""
76
},
77
{
78
"name": "repo",
79
"dataType": "STRING",
80
"defaultNullValue": ""
81
},
82
{
83
"name": "organization",
84
"dataType": "STRING",
85
"defaultNullValue": ""
86
}
87
],
88
"metricFieldSpecs": [
89
{
90
"name": "count",
91
"dataType": "LONG",
92
"defaultNullValue": 1
93
},
94
{
95
"name": "numComments",
96
"dataType": "LONG"
97
},
98
{
99
"name": "numReviewComments",
100
"dataType": "LONG"
101
},
102
{
103
"name": "numCommits",
104
"dataType": "LONG"
105
},
106
{
107
"name": "numLinesAdded",
108
"dataType": "LONG"
109
},
110
{
111
"name": "numLinesDeleted",
112
"dataType": "LONG"
113
},
114
{
115
"name": "numFilesChanged",
116
"dataType": "LONG"
117
},
118
{
119
"name": "numAuthors",
120
"dataType": "LONG"
121
},
122
{
123
"name": "numCommitters",
124
"dataType": "LONG"
125
},
126
{
127
"name": "numReviewers",
128
"dataType": "LONG"
129
},
130
{
131
"name": "numCommenters",
132
"dataType": "LONG"
133
},
134
{
135
"name": "createdTimeMillis",
136
"dataType": "LONG"
137
},
138
{
139
"name": "elapsedTimeMillis",
140
"dataType": "LONG"
141
}
142
],
143
"dateTimeFieldSpecs": [
144
{
145
"name": "mergedTimeMillis",
146
"dataType": "TIMESTAMP",
147
"format": "1:MILLISECONDS:TIMESTAMP",
148
"granularity": "1:MILLISECONDS"
149
}
150
]
151
}
152
Copied!
The table config is present at examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json and is also pasted below.
Note If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.
pullRequestMergedEvents_realtime_table_config.json
1
{
2
"tableName": "pullRequestMergedEvents",
3
"tableType": "REALTIME",
4
"segmentsConfig": {
5
"timeColumnName": "mergedTimeMillis",
6
"timeType": "MILLISECONDS",
7
"retentionTimeUnit": "DAYS",
8
"retentionTimeValue": "60",
9
"schemaName": "pullRequestMergedEvents",
10
"replication": "1",
11
"replicasPerPartition": "1"
12
},
13
"tenants": {},
14
"tableIndexConfig": {
15
"loadMode": "MMAP",
16
"invertedIndexColumns": [
17
"organization",
18
"repo"
19
],
20
"streamConfigs": {
21
"streamType": "kafka",
22
"stream.kafka.consumer.type": "simple",
23
"stream.kafka.topic.name": "pullRequestMergedEvents",
24
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
25
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
26
"stream.kafka.zk.broker.url": "pinot-zookeeper:2181/kafka",
27
"stream.kafka.broker.list": "kafka:9092",
28
"realtime.segment.flush.threshold.time": "12h",
29
"realtime.segment.flush.threshold.size": "100000",
30
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
31
}
32
},
33
"metadata": {
34
"customConfigs": {}
35
}
36
}
Copied!
Add the table and schema using the following command
1
$ docker run \
2
--network=pinot-demo \
3
--name pinot-streaming-table-creation \
4
${PINOT_IMAGE} AddTable \
5
-schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
6
-tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json \
7
-controllerHost pinot-controller \
8
-controllerPort 9000 \
9
-exec
10
Executing command: AddTable -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
11
Sending request: http://pinot-controller:9000/schemas to controller: 20c241022a96, version: Unknown
12
{"status":"Table pullRequestMergedEvents_REALTIME succesfully added"}
Copied!

Publish events

Start streaming GitHub events into the Kafka topic
Prerequisites
Generate a personal access token on GitHub.
1
$ docker run --rm -ti \
2
--network=pinot-demo \
3
--name pinot-github-events-into-kafka \
4
-d ${PINOT_IMAGE} StreamGitHubEvents \
5
-schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
6
-topic pullRequestMergedEvents \
7
-personalAccessToken <your_github_personal_access_token> \
8
-kafkaBrokerList kafka:9092
Copied!

Short Version

For a single command to setup all the above steps, use the following command. Make sure to stop any previous running Pinot services.
1
$ docker run --rm -ti \
2
--network=pinot-demo \
3
--name pinot-github-events-quick-start \
4
${PINOT_IMAGE} GitHubEventsQuickStart \
5
-personalAccessToken <your_github_personal_access_token>
Copied!

Get Pinot

Follow instructions in Build from source to get the latest Pinot code

Long Version

Set up the Pinot cluster

Follow the instructions in Advanced Pinot Setup to setup the Pinot cluster with the components:
    1.
    Zookeeper
    2.
    Controller
    3.
    Broker
    4.
    Server
    5.
    Kafka

Create a Kafka topic

Download Apache Kafka release.
Create a Kafka topic called pullRequestMergedEvents for the demo.
1
$ bin/kafka-topics.sh \
2
--create \
3
--bootstrap-server localhost:19092 \
4
--replication-factor 1 \
5
--partitions 1 \
6
--topic pullRequestMergedEvents
Copied!

Add Pinot table and schema

Schema can be found at /examples/stream/githubevents/ in the release, and is also pasted below:
1
{
2
"schemaName": "pullRequestMergedEvents",
3
"dimensionFieldSpecs": [
4
{
5
"name": "title",
6
"dataType": "STRING",
7
"defaultNullValue": ""
8
},
9
{
10
"name": "labels",
11
"dataType": "STRING",
12
"singleValueField": false,
13
"defaultNullValue": ""
14
},
15
{
16
"name": "userId",
17
"dataType": "STRING",
18
"defaultNullValue": ""
19
},
20
{
21
"name": "userType",
22
"dataType": "STRING",
23
"defaultNullValue": ""
24
},
25
{
26
"name": "authorAssociation",
27
"dataType": "STRING",
28
"defaultNullValue": ""
29
},
30
{
31
"name": "mergedBy",
32
"dataType": "STRING",
33
"defaultNullValue": ""
34
},
35
{
36
"name": "assignees",
37
"dataType": "STRING",
38
"singleValueField": false,
39
"defaultNullValue": ""
40
},
41
{
42
"name": "authors",
43
"dataType": "STRING",
44
"singleValueField": false,
45
"defaultNullValue": ""
46
},
47
{
48
"name": "committers",
49
"dataType": "STRING",
50
"singleValueField": false,
51
"defaultNullValue": ""
52
},
53
{
54
"name": "requestedReviewers",
55
"dataType": "STRING",
56
"singleValueField": false,
57
"defaultNullValue": ""
58
},
59
{
60
"name": "requestedTeams",
61
"dataType": "STRING",
62
"singleValueField": false,
63
"defaultNullValue": ""
64
},
65
{
66
"name": "reviewers",
67
"dataType": "STRING",
68
"singleValueField": false,
69
"defaultNullValue": ""
70
},
71
{
72
"name": "commenters",
73
"dataType": "STRING",
74
"singleValueField": false,
75
"defaultNullValue": ""
76
},
77
{
78
"name": "repo",
79
"dataType": "STRING",
80
"defaultNullValue": ""
81
},
82
{
83
"name": "organization",
84
"dataType": "STRING",
85
"defaultNullValue": ""
86
}
87
],
88
"metricFieldSpecs": [
89
{
90
"name": "count",
91
"dataType": "LONG",
92
"defaultNullValue": 1
93
},
94
{
95
"name": "numComments",
96
"dataType": "LONG"
97
},
98
{
99
"name": "numReviewComments",
100
"dataType": "LONG"
101
},
102
{
103
"name": "numCommits",
104
"dataType": "LONG"
105
},
106
{
107
"name": "numLinesAdded",
108
"dataType": "LONG"
109
},
110
{
111
"name": "numLinesDeleted",
112
"dataType": "LONG"
113
},
114
{
115
"name": "numFilesChanged",
116
"dataType": "LONG"
117
},
118
{
119
"name": "numAuthors",
120
"dataType": "LONG"
121
},
122
{
123
"name": "numCommitters",
124
"dataType": "LONG"
125
},
126
{
127
"name": "numReviewers",
128
"dataType": "LONG"
129
},
130
{
131
"name": "numCommenters",
132
"dataType": "LONG"
133
},
134
{
135
"name": "createdTimeMillis",
136
"dataType": "LONG"
137
},
138
{
139
"name": "elapsedTimeMillis",
140
"dataType": "LONG"
141
}
142
],
143
"timeFieldSpec": {
144
"incomingGranularitySpec": {
145
"timeType": "MILLISECONDS",
146
"timeFormat": "EPOCH",
147
"dataType": "LONG",
148
"name": "mergedTimeMillis"
149
}
150
}
151
}
152
Copied!
Table config can be found at /examples/stream/githubevents/ in the release, and is also pasted below.
Note
If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.
1
{
2
"tableName": "pullRequestMergedEvents",
3
"tableType": "REALTIME",
4
"segmentsConfig": {
5
"timeColumnName": "mergedTimeMillis",
6
"timeType": "MILLISECONDS",
7
"retentionTimeUnit": "DAYS",
8
"retentionTimeValue": "60",
9
"schemaName": "pullRequestMergedEvents",
10
"replication": "1",
11
"replicasPerPartition": "1"
12
},
13
"tenants": {},
14
"tableIndexConfig": {
15
"loadMode": "MMAP",
16
"invertedIndexColumns": [
17
"organization",
18
"repo"
19
],
20
"streamConfigs": {
21
"streamType": "kafka",
22
"stream.kafka.consumer.type": "simple",
23
"stream.kafka.topic.name": "pullRequestMergedEvents",
24
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
25
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
26
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
27
"stream.kafka.broker.list": "localhost:19092",
28
"realtime.segment.flush.threshold.time": "12h",
29
"realtime.segment.flush.threshold.size": "100000",
30
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
31
}
32
},
33
"metadata": {
34
"customConfigs": {}
35
}
36
}
37
38
Copied!
Add the table and schema using the command
1
$ bin/pinot-admin.sh AddTable \
2
-tableConfigFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json \
3
-schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
4
-exec
Copied!

Publish events

Start streaming GitHub events into the Kafka topic
Prerequisites
Generate a personal access token on GitHub.
1
$ bin/pinot-admin.sh StreamGitHubEvents \
2
-topic pullRequestMergedEvents \
3
-personalAccessToken <your_github_personal_access_token> \
4
-kafkaBrokerList localhost:19092 \
5
-schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json
Copied!

Short Version

For a single command to setup all the above steps
1
$ bin/pinot-admin.sh GitHubEventsQuickStart \
2
-personalAccessToken <your_github_personal_access_token>
Copied!

Kubernetes cluster

If you already have a Kubernetes cluster with Pinot and Kafka (see Running Pinot in Kubernetes), first create the topic and then setup the table and streaming using
1
$ cd kubernetes/helm
2
$ kubectl apply -f pinot-github-realtime-events.yml
Copied!

Query

Head over to the Query Console to checkout the data!

Visualizing on SuperSet

You can use SuperSet to visualize this data. Some of the interesting insights we captures were

Most Active organizations during the lockdown

Repositories by number of commits in the Apache organization
To integrate with SuperSet you can check out the SuperSet Integrations page.
Last modified 3mo ago