arrow-left

All pages
gitbookPowered by GitBook
1 of 5

Loading...

Loading...

Loading...

Loading...

Loading...

Visualize data with Redash

  1. Install Redash and start a running instance, following the Docker Based Developer Installation Guidearrow-up-right.

  2. Configure Redash to query Pinot, by doing the following:

    1. Add pinotdb dependency

  3. Create visualizations, by doing the following:

hashtag
Add pinot db dependency

Apache Pinot provides a Python client library pinotdb to query Pinot from Python applications. Install pinotdb inside the Redash worker instance to make network calls to Pinot.

  1. Navigate to the root directory where you’ve cloned Redash. Run the following command to get the name of the Redash worker container (by default, redash_worker_1):

docker-compose ps

  1. Run the following command (change redash_worker_1 to your own Redash worker container name, if applicable):

  1. Restart Docker.

hashtag
Add Python data source for Pinot

  1. In Redash, select Settings > Data Sources.

  2. Select New Data Source, and then select Python from the list.

  3. On the Redash Settings - Data Source page, add Pinot as the name of the data source, enter pinotdb

hashtag
Start Pinot

Run the following command in a new terminal to spin up an Apache Pinot Docker container in the quick start mode with a baseball stats dataset built in.

hashtag
Run a query in Redash

  1. In Redash, select Queries > New Query, and then select the Python data source you created in .

  2. Add Python code to query data. For more information, see the .

  3. Click Execute to run the query and view results.

You can also include libraries like Pandas to perform more advanced data manipulation on Pinot’s data and visualize the output with Redash.

For more information, see in Redash documentation.

hashtag
Example Python queries

hashtag
Query top 10 teams by total runs

The following query connects to Pinot and queries the baseballStats table to retrieve the top ten players with the highest scores. The results are transformed into a dictionary format supported by Redash.

hashtag
Query top 10 teams by total runs

hashtag
Query total strikeouts by year

hashtag
Add a visualization and dashboard in Redash

hashtag
Add a visualization

In Redash, after you've ran your query, click the New Visualization tab, and select the type of visualization your want to create, for example, Bar Chart. The Visualization Editor appears with your chart.

For example, you may want to create a bar chart to view the top 10 players with highest scores.

You may want to create a line chart to view the total variation in strikeouts over time.

For more information, see .

hashtag
Add a dashboard

Create a dashboard with one or more visualizations (widgets).

  1. In Redash, go to Dashboards > New Dashboards.

  2. Add the widgets to your dashboard. For example, by adding the three visualizations from the above, you create a Baseball stats dashboard.

For more information, see in the Redash documentation.

in the
Modules to import prior to running the script
field.
  • Enter the following optional fields as needed:

    • AdditionalModulesPaths: Enter a comma-separated list of absolute paths on the Redash server to Python modules to make available when querying from Redash. Useful for private modules unavailable in pip.

    • AdditionalBuiltins: Specify additional built-in functions as needed. By default, Redash automatically includes 25 Python built-in functions.

  • Click Save.

  • Add a Python data source for Pinot
    Start Pinot
    Query in Redash
    Add a Python data source for Pinot
    Python query runnerarrow-up-right
    Queryingarrow-up-right
    Visualizationsarrow-up-right
    three example queries
    Dashboardsarrow-up-right
    Bar chart configuration
    Baseball stats dashboard
    docker exec -it redash_worker_1 /bin/sh                                
    pip install pinotdb
    docker run \
      --name pinot-quickstart \
      -p 2123:2123 \
      -p 9000:9000 \
      -p 8000:8000 \
      apachepinot/pinot:0.9.3 QuickStart -type batch
    from pinotdb import connect
    
    conn = connect(host='host.docker.internal', port=8000, path='/query/sql', scheme='http')
    curs = conn.cursor()
    curs.execute("""
        select 
    playerName, sum(runs) as total_runs
    from baseballStats
    group by playerName
    order by total_runs desc
    limit 10
    """)
    
    result = {}
    result['columns'] = [
        {
          "name": "player_name",
          "type": "string",
          "friendly_name": "playerName"
        },
        {
          "name": "total_runs",
          "type": "integer",
          "friendly_name": "total_runs"
        }
      ]
    
    rows = []
    
    for row in curs:
        record = {}
        record['player_name'] = row[0]
        record['total_runs'] = row[1]
    
    
        rows.append(record)
    
    result["rows"] = rows
    from pinotdb import connect
    
    conn = connect(host='host.docker.internal', port=8000, path='/query/sql', scheme='http')
    curs = conn.cursor()
    curs.execute("""
        select 
    teamID, sum(runs) as total_runs
    from baseballStats
    group by teamID
    order by total_runs desc
    limit 10
    """)
    
    result = {}
    result['columns'] = [
        {
          "name": "teamID",
          "type": "string",
          "friendly_name": "Team"
        },
        {
          "name": "total_runs",
          "type": "integer",
          "friendly_name": "Total Runs"
        }
      ]
    
    rows = []
    
    for row in curs:
        record = {}
        record['teamID'] = row[0]
        record['total_runs'] = row[1]
    
    
        rows.append(record)
    
    result["rows"] = rows
    from pinotdb import connect
    
    conn = connect(host='host.docker.internal', port=8000, path='/query/sql', scheme='http')
    curs = conn.cursor()
    curs.execute("""
        select 
    yearID, sum(strikeouts) as total_so
    from baseballStats
    group by yearID
    order by yearID asc
    limit 1000
    """)
    
    result = {}
    result['columns'] = [
        {
          "name": "yearID",
          "type": "integer",
          "friendly_name": "Year"
        },
        {
          "name": "total_so",
          "type": "integer",
          "friendly_name": "Total Strikeouts"
        }
      ]
    
    rows = []
    
    for row in curs:
        record = {}
        record['yearID'] = row[0]
        record['total_so'] = row[1]
    
    
        rows.append(record)
    
    result["rows"] = rows
    Add a visualization and dashboard in Redash

    Recipes

    Here you will find a collection of ready-made sample applications and examples for real-world data

    Connect to Streamlit

    In this Apache Pinot guide, we'll learn how visualize data using the Streamlit web framework.

    In this guide you'll learn how to visualize data from Apache Pinot using Streamlitarrow-up-right. Streamlit is a Python library that makes it easy to build interactive data based web applications.

    We're going to use Streamlit to build a real-time dashboard to visualize the changes being made to Wikimedia properties.

    Real-Time Dashboard Architecture

    hashtag
    Startup components

    We're going to use the following Docker compose file, which spins up instances of Zookeeper, Kafka, along with a Pinot controller, broker, and server:

    docker-compose.yml

    Run the following command to launch all the components:

    hashtag
    Wikimedia recent changes stream

    Wikimedia provides provides a continuous stream of structured event data describing changes made to various Wikimedia properties. The events are published over HTTP using the Server-Side Events (SSE) Protocol.

    You can find the endpoint at:

    We'll need to install the SSE client library to consume this data:

    Next, create a file called wiki.py that contains the following:

    wiki.py

    The highlighted section shows how we connect to the recent changes feed using the SSE client library.

    Let's run this script as shown below:

    We'll see the following (truncated) output:

    Output

    hashtag
    Ingest recent changes into Kafka

    Now we're going to import each of the events into Apache Kafka. First let's create a Kafka topic called wiki_events with 5 partitions:

    Create a new file called wiki_to_kafka.py and import the following libraries:

    wiki_to_kafka.py

    Add these functions:

    wiki_to_kafka.py

    And now let's add the code that calls the recent changes API and imports events into the wiki_events topic:

    wiki_to_kafka.py

    The highlighted parts of this script indicate where events are ingested into Kafka and then flushed to disk.

    If we run this script:

    We'll see a message every time 100 messages are pushed to Kafka, as shown below:

    Output

    hashtag
    Explore Kafka

    Let's check that the data has made its way into Kafka.

    The following command returns the message offset for each partition in the wiki_events topic:

    Output

    Looks good. We can also stream all the messages in this topic by running the following command:

    Output

    hashtag
    Configure Pinot

    Now let's configure Pinot to consume the data from Kafka.

    We'll have the following schema:

    schema.json

    And the following table config:

    table.json

    The highlighted lines are how we connect Pinot to the Kafka topic that contains the events. Create the schema and table by running the following commnad:

    Once you've done that, navigate to the and run the following query to check that the data has made its way into Pinot:

    As long as you see some records, everything is working as expected.

    hashtag
    Building a Streamlit Dashboard

    Now let's write some more queries against Pinot and display the results in Streamlit.

    First, install the following libraries:

    Create a file called app.py and import libraries and write a header for the page:

    app.py

    Connect to Pinot and write a query that returns recent changes, along with the users who made the changes, and domains where they were made:

    app.py

    The highlighted part of the query shows how to count the number of events from the last minute and the minute before that. We then do a similar thing to count the number of unique users and domains.

    hashtag
    Metrics

    Now let's create some metrics based on that data:

    app.py

    Go back to the terminal and run the following command:

    Navigate to to see the Streamlit app. You should see something like the following:

    Streamlit Metrics

    hashtag
    Changes per minute

    Next, let's add a line chart that shows the number of changes being done to Wikimedia per minute. Add the following code to app.py:

    app.py

    Go back to the web browser and you should see something like this:

    Streamlit Time Series

    hashtag
    Auto Refresh

    At the moment we need to refresh our web browser to update the metrics and line chart, but it would be much better if that happened automatically. Let's now add auto refresh functionality.

    Add the following code just under the header at the top of app.py:

    app.py

    And the following code at the very end:

    app.py

    If we navigate back to our web browser, we'll see the following:

    Streamlit Auto Refresh

    The full script used in this example is shown below:

    app.py

    hashtag
    Summary

    In this guide we've learnt how to publish data into Kafka from Wikimedia's event stream, ingest it from there into Pinot, and finally make sense of the data using SQL queries run from Streamlit.

    stream.wikimedia.org/v2/stream/recentchangearrow-up-right
    Pinot UIarrow-up-right
    localhost:8501arrow-up-right
    version: '3.7'
    services:
      zookeeper:
        image: zookeeper:3.5.6
        container_name: "zookeeper-wiki"
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
      kafka:
        image: wurstmeister/kafka:latest
        restart: unless-stopped
        container_name: "kafka-wiki"
        ports:
          - "9092:9092"
        expose:
          - "9093"
        depends_on:
          - zookeeper
        environment:
          KAFKA_ZOOKEEPER_CONNECT: zookeeper-wiki:2181/kafka
          KAFKA_BROKER_ID: 0
          KAFKA_ADVERTISED_HOST_NAME: kafka-wiki
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-wiki:9093,OUTSIDE://localhost:9092
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      pinot-controller:
        image: apachepinot/pinot:0.10.0
        command: "StartController -zkAddress zookeeper-wiki:2181 -dataDir /data"
        container_name: "pinot-controller-wiki"
        volumes:
          - ./config:/config
          - ./data:/data
        restart: unless-stopped
        ports:
          - "9000:9000"
        depends_on:
          - zookeeper
      pinot-broker:
        image: apachepinot/pinot:0.10.0
        command: "StartBroker -zkAddress zookeeper-wiki:2181"
        restart: unless-stopped
        container_name: "pinot-broker-wiki"
        volumes:
          - ./config:/config
        ports:
          - "8099:8099"
        depends_on:
          - pinot-controller
      pinot-server:
        image: apachepinot/pinot:0.10.0
        command: "StartServer -zkAddress zookeeper-wiki:2181"
        restart: unless-stopped
        container_name: "pinot-server-wiki"
        volumes:
          - ./config:/config
        depends_on:
          - pinot-broker
    docker-compose up
    pip install sseclient-py
    import json
    import pprint
    import sseclient
    import requests
    
    def with_requests(url, headers):
        """Get a streaming response for the given event feed using requests."""    
        return requests.get(url, stream=True, headers=headers)
    
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    headers = {'Accept': 'text/event-stream'}
    response = with_requests(url, headers)
    client = sseclient.SSEClient(response)
    
    for event in client.events():
        stream = json.loads(event.data)
        pprint.pprint(stream)
    python wiki.py
    {'$schema': '/mediawiki/recentchange/1.0.0',
     'bot': False,
     'comment': '[[:File:Storemyr-Fagerbakken landskapsvernområde HVASSER '
                'Oslofjorden Norway (Protected coastal forest Recreational area '
                'hiking trails) Rituell-kultisk steinstreng sørøst i skogen (small '
                'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg]] removed '
                'from category',
     'id': 1923506287,
     'meta': {'domain': 'commons.wikimedia.org',
              'dt': '2022-05-12T09:57:00Z',
              'id': '3800228e-43d8-440d-8034-c68977742653',
              'offset': 3855767440,
              'partition': 0,
              'request_id': '930b17cc-f14a-4656-afa1-d15b79a8f666',
              'stream': 'mediawiki.recentchange',
              'topic': 'eqiad.mediawiki.recentchange',
              'uri': 'https://commons.wikimedia.org/wiki/Category:Iron_Age_in_Norway'},
     'namespace': 14,
     'parsedcomment': '<a '
                      'href="/wiki/File:Storemyr-Fagerbakken_landskapsvernomr%C3%A5de_HVASSER_Oslofjorden_Norway_(Protected_coastal_forest_Recreational_area_hiking_trails)_Rituell-kultisk_steinstreng_s%C3%B8r%C3%B8st_i_skogen_(small_archeological_stone_string)_V%C3%A5r_(spring)_2021-04-24.jpg" '
                      'title="File:Storemyr-Fagerbakken landskapsvernområde '
                      'HVASSER Oslofjorden Norway (Protected coastal forest '
                      'Recreational area hiking trails) Rituell-kultisk '
                      'steinstreng sørøst i skogen (small archeological stone '
                      'string) VÃ¥r (spring) '
                      '2021-04-24.jpg">File:Storemyr-Fagerbakken '
                      'landskapsvernområde HVASSER Oslofjorden Norway (Protected '
                      'coastal forest Recreational area hiking trails) '
                      'Rituell-kultisk steinstreng sørøst i skogen (small '
                      'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg</a> '
                      'removed from category',
     'server_name': 'commons.wikimedia.org',
     'server_script_path': '/w',
     'server_url': 'https://commons.wikimedia.org',
     'timestamp': 1652349420,
     'title': 'Category:Iron Age in Norway',
     'type': 'categorize',
     'user': 'Krg',
     'wiki': 'commonswiki'}
    {'$schema': '/mediawiki/recentchange/1.0.0',
     'bot': False,
     'comment': '[[:File:Storemyr-Fagerbakken landskapsvernområde HVASSER '
                'Oslofjorden Norway (Protected coastal forest Recreational area '
                'hiking trails) Rituell-kultisk steinstreng sørøst i skogen (small '
                'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg]] removed '
                'from category',
     'id': 1923506289,
     'meta': {'domain': 'commons.wikimedia.org',
              'dt': '2022-05-12T09:57:00Z',
              'id': '2b819d20-beca-46a5-8ce3-b2f3b73d2cbe',
              'offset': 3855767441,
              'partition': 0,
              'request_id': '930b17cc-f14a-4656-afa1-d15b79a8f666',
              'stream': 'mediawiki.recentchange',
              'topic': 'eqiad.mediawiki.recentchange',
              'uri': 'https://commons.wikimedia.org/wiki/Category:Cultural_heritage_monuments_in_F%C3%A6rder'},
     'namespace': 14,
     'parsedcomment': '<a '
                      'href="/wiki/File:Storemyr-Fagerbakken_landskapsvernomr%C3%A5de_HVASSER_Oslofjorden_Norway_(Protected_coastal_forest_Recreational_area_hiking_trails)_Rituell-kultisk_steinstreng_s%C3%B8r%C3%B8st_i_skogen_(small_archeological_stone_string)_V%C3%A5r_(spring)_2021-04-24.jpg" '
                      'title="File:Storemyr-Fagerbakken landskapsvernområde '
                      'HVASSER Oslofjorden Norway (Protected coastal forest '
                      'Recreational area hiking trails) Rituell-kultisk '
                      'steinstreng sørøst i skogen (small archeological stone '
                      'string) VÃ¥r (spring) '
                      '2021-04-24.jpg">File:Storemyr-Fagerbakken '
                      'landskapsvernområde HVASSER Oslofjorden Norway (Protected '
                      'coastal forest Recreational area hiking trails) '
                      'Rituell-kultisk steinstreng sørøst i skogen (small '
                      'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg</a> '
                      'removed from category',
     'server_name': 'commons.wikimedia.org',
     'server_script_path': '/w',
     'server_url': 'https://commons.wikimedia.org',
     'timestamp': 1652349420,
     'title': 'Category:Cultural heritage monuments in Færder',
     'type': 'categorize',
     'user': 'Krg',
     'wiki': 'commonswiki'}
    docker exec -it kafka-wiki kafka-topics.sh \
      --bootstrap-server localhost:9092 \
      --create \
      --topic wiki_events \
      --partitions 5
    import json
    import sseclient
    import datetime
    import requests
    import time
    from confluent_kafka import Producer
    def with_requests(url, headers):
        """Get a streaming response for the given event feed using requests."""    
        return requests.get(url, stream=True, headers=headers)
    
    def acked(err, msg):
        if err is not None:
            print("Failed to deliver message: {0}: {1}"
                  .format(msg.value(), err.str()))
    
    def json_serializer(obj):
        if isinstance(obj, (datetime.datetime, datetime.date)):
            return obj.isoformat()
        raise "Type %s not serializable" % type(obj)
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
    
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    headers = {'Accept': 'text/event-stream'}
    response = with_requests(url, headers) 
    client = sseclient.SSEClient(response)
    
    events_processed = 0
    while True:
        try: 
            for event in client.events():
                stream = json.loads(event.data)
                payload = json.dumps(stream, default=json_serializer, ensure_ascii=False).encode('utf-8')
                producer.produce(topic='wiki_events', 
                  key=str(stream['meta']['id']), value=payload, callback=acked)
    
                events_processed += 1
                if events_processed % 100 == 0:
                    print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events")
                    producer.flush()
        except Exception as ex:
            print(f"{str(datetime.datetime.now())} Got error:" + str(ex))
            response = with_requests(url, headers) 
            client = sseclient.SSEClient(response)
            time.sleep(2)
    python wiki_to_kafka.py
    2022-05-12 10:58:34.449326 Flushing after 100 events
    2022-05-12 10:58:39.151599 Flushing after 200 events
    2022-05-12 10:58:43.399528 Flushing after 300 events
    2022-05-12 10:58:47.350277 Flushing after 400 events
    2022-05-12 10:58:50.847959 Flushing after 500 events
    2022-05-12 10:58:54.768228 Flushing after 600 events
    docker exec -it kafka-wiki kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list localhost:9092 \
      --topic wiki_events
    wiki_events:0:42
    wiki_events:1:61
    wiki_events:2:52
    wiki_events:3:56
    wiki_events:4:58
    docker exec -it kafka-wiki kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --topic wiki_events \
      --from-beginning
    ...
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://en.wikipedia.org/wiki/Super_Wings", "request_id": "6f82e64d-220f-41f4-88c3-2e15f03ae504", "id": "c30cd735-1ead-405e-94d1-49fbe7c40411", "dt": "2022-05-12T10:05:36Z", "domain": "en.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779703}, "type": "log", "namespace": 0, "title": "Super Wings", "comment": "", "timestamp": 1652349936, "user": "2001:448A:50E0:885B:FD1D:2D04:233E:7647", "bot": false, "log_id": 0, "log_type": "abusefilter", "log_action": "hit", "log_params": {"action": "edit", "filter": "550", "actions": "tag", "log": 32575794}, "log_action_comment": "2001:448A:50E0:885B:FD1D:2D04:233E:7647 triggered [[Special:AbuseFilter/550|filter 550]], performing the action \"edit\" on [[Super Wings]]. Actions taken: Tag ([[Special:AbuseLog/32575794|details]])", "server_url": "https://en.wikipedia.org", "server_name": "en.wikipedia.org", "server_script_path": "/w", "wiki": "enwiki", "parsedcomment": ""}
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://no.wikipedia.org/wiki/Brukerdiskusjon:Haros", "request_id": "a20c9692-f301-4faf-9373-669bebbffff4", "id": "566ee63e-8e86-4a7e-a1f3-562704306509", "dt": "2022-05-12T10:05:36Z", "domain": "no.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779714}, "id": 84572581, "type": "edit", "namespace": 3, "title": "Brukerdiskusjon:Haros", "comment": "/* Stor forbokstav / ucfirst */", "timestamp": 1652349936, "user": "Asav", "bot": false, "minor": false, "patrolled": true, "length": {"old": 110378, "new": 110380}, "revision": {"old": 22579494, "new": 22579495}, "server_url": "https://no.wikipedia.org", "server_name": "no.wikipedia.org", "server_script_path": "/w", "wiki": "nowiki", "parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\"><a href=\"/wiki/Brukerdiskusjon:Haros#Stor_forbokstav_/_ucfirst\" title=\"Brukerdiskusjon:Haros\">→‎Stor forbokstav / ucfirst</a></span></span>"}
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://es.wikipedia.org/wiki/Campo_de_la_calle_Industria", "request_id": "d45bd9af-3e2c-4aac-ae8f-e16d3340da76", "id": "7fb3956e-9bd2-4fa5-8659-72b266cdb45b", "dt": "2022-05-12T10:05:35Z", "domain": "es.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779718}, "id": 266270269, "type": "edit", "namespace": 0, "title": "Campo de la calle Industria", "comment": "/* Historia */", "timestamp": 1652349935, "user": "Raimon will", "bot": false, "minor": false, "length": {"old": 7566, "new": 7566}, "revision": {"old": 143485393, "new": 143485422}, "server_url": "https://es.wikipedia.org", "server_name": "es.wikipedia.org", "server_script_path": "/w", "wiki": "eswiki", "parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\"><a href=\"/wiki/Campo_de_la_calle_Industria#Historia\" title=\"Campo de la calle Industria\">→‎Historia</a></span></span>"}
    ^CProcessed a total of 269 messages
    {
        "schemaName": "wikipedia",
        "dimensionFieldSpecs": [
          {
            "name": "id",
            "dataType": "STRING"
          },
          {
            "name": "wiki",
            "dataType": "STRING"
          },
          {
            "name": "user",
            "dataType": "STRING"
          },
          {
            "name": "title",
            "dataType": "STRING"
          },
          {
            "name": "comment",
            "dataType": "STRING"
          },
          {
            "name": "stream",
            "dataType": "STRING"
          },
          {
            "name": "domain",
            "dataType": "STRING"
          },
          {
            "name": "topic",
            "dataType": "STRING"
          },
          {
            "name": "type",
            "dataType": "STRING"
          },
          {
            "name": "uri",
            "dataType": "STRING"
          },
          {
            "name": "bot",
            "dataType": "BOOLEAN"
          },
          {
            "name": "metaJson",
            "dataType": "STRING"
          }
        ],
        "dateTimeFieldSpecs": [
          {
            "name": "ts",
            "dataType": "TIMESTAMP",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
          }
        ]
      }
    {
        "tableName": "wikievents",
        "tableType": "REALTIME",
        "segmentsConfig": {
          "timeColumnName": "ts",
          "schemaName": "wikipedia",
          "replication": "1",
          "replicasPerPartition": "1"
        },
    
        "tableIndexConfig": {
          "invertedIndexColumns": [],
          "rangeIndexColumns": [],
          "autoGeneratedInvertedIndex": false,
          "createInvertedIndexDuringSegmentGeneration": false,
          "sortedColumn": [],
          "bloomFilterColumns": [],
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "wiki_events",
            "stream.kafka.broker.list": "kafka-wiki:9093",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "1000",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.segment.size": "100M"
          },
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant",
          "tagOverrideConfig": {}
        },
          "noDictionaryColumns": [],
          "onHeapDictionaryColumns": [],
          "varLengthDictionaryColumns": [],
          "enableDefaultStarTree": false,
          "enableDynamicStarTreeCreation": false,
          "aggregateMetrics": false,
          "nullHandlingEnabled": false
        },
        "metadata": {},
        "quota": {},
        "routing": {},
        "query": {},
        "ingestionConfig": {
          "transformConfigs": [
            {
              "columnName": "metaJson",
              "transformFunction": "JSONFORMAT(meta)"
            },
            {
              "columnName": "id",
              "transformFunction": "JSONPATH(metaJson, '$.id')"
            },
            {
              "columnName": "stream",
              "transformFunction": "JSONPATH(metaJson, '$.stream')"
            },
            {
              "columnName": "domain",
              "transformFunction": "JSONPATH(metaJson, '$.domain')"
            },
            {
              "columnName": "topic",
              "transformFunction": "JSONPATH(metaJson, '$.topic')"
            },
            {
              "columnName": "uri",
              "transformFunction": "JSONPATH(metaJson, '$.uri')"
            },
            {
              "columnName": "ts",
              "transformFunction": "\"timestamp\" * 1000"
            }
          ]
        },
        "isDimTable": false
      }
    docker exec -it pinot-controller-wiki bin/pinot-admin.sh AddTable \
      -tableConfigFile /config/table.json \
      -schemaFile /config/schema.json \
      -exec
    select domain, count(*) 
    from wikievents 
    group by domain
    order by count(*) DESC
    limit 10
    pip install streamlit pinotdb plotly pandas
    import pandas as pd
    import streamlit as st
    from pinotdb import connect
    import plotly.express as px
    
    st.set_page_config(layout="wide")
    st.header("Wikipedia Recent Changes")
    conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
    
    query = """select
      count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
      count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,     
      distinctcount(user) FILTER(WHERE  ts > ago('PT1M')) AS users1Min,
      distinctcount(user) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
      distinctcount(domain) FILTER(WHERE  ts > ago('PT1M')) AS domains1Min,
      distinctcount(domain) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
    from wikievents 
    where ts > ago('PT2M')
    limit 1
    """
    
    curs = conn.cursor()
    
    curs.execute(query)
    df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    metric1, metric2, metric3 = st.columns(3)
    metric1.metric(label="Changes", value=df_summary['events1Min'].values[0],
        delta=float(df_summary['events1Min'].values[0] - df_summary['events1Min2Min'].values[0]))
    
    metric2.metric(label="Users", value=df_summary['users1Min'].values[0],
        delta=float(df_summary['users1Min'].values[0] - df_summary['users1Min2Min'].values[0]))
    
    metric3.metric(label="Domains", value=df_summary['domains1Min'].values[0],
        delta=float(df_summary['domains1Min'].values[0] - df_summary['domains1Min2Min'].values[0]))
    streamlit run app.py
    query = """
    select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, 
           distinctcount(user) AS users,
           distinctcount(domain) AS domains
    from wikievents 
    where ts > ago('PT1H')
    group by dateMin
    order by dateMin desc
    LIMIT 30
    """
    
    curs.execute(query)
    df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
    
    fig = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
    fig['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
    fig.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
    st.plotly_chart(fig, use_container_width=True)
    if not "sleep_time" in st.session_state:
        st.session_state.sleep_time = 2
    
    if not "auto_refresh" in st.session_state:
        st.session_state.auto_refresh = True
    
    auto_refresh = st.checkbox('Auto Refresh?', st.session_state.auto_refresh)
    
    if auto_refresh:
        number = st.number_input('Refresh rate in seconds', value=st.session_state.sleep_time)
        st.session_state.sleep_time = number
    if auto_refresh:
        time.sleep(number)
        st.experimental_rerun()
    import pandas as pd
    import streamlit as st
    from pinotdb import connect
    from datetime import datetime
    import plotly.express as px
    import time
    
    st.set_page_config(layout="wide")
    
    conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
    
    st.header("Wikipedia Recent Changes")
    
    now = datetime.now()
    dt_string = now.strftime("%d %B %Y %H:%M:%S")
    st.write(f"Last update: {dt_string}")
    
    # Use session state to keep track of whether we need to auto refresh the page and the refresh frequency
    
    if not "sleep_time" in st.session_state:
        st.session_state.sleep_time = 2
    
    if not "auto_refresh" in st.session_state:
        st.session_state.auto_refresh = True
    
    auto_refresh = st.checkbox('Auto Refresh?', st.session_state.auto_refresh)
    
    if auto_refresh:
        number = st.number_input('Refresh rate in seconds', value=st.session_state.sleep_time)
        st.session_state.sleep_time = number
    
    # Find changes that happened in the last 1 minute
    # Find changes that happened between 1 and 2 minutes ago
    
    query = """
    select count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
            count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
            distinctcount(user) FILTER(WHERE  ts > ago('PT1M')) AS users1Min,
            distinctcount(user) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
            distinctcount(domain) FILTER(WHERE  ts > ago('PT1M')) AS domains1Min,
            distinctcount(domain) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
    from wikievents 
    where ts > ago('PT2M')
    limit 1
    """
    
    curs = conn.cursor()
    
    curs.execute(query)
    df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    
    
    metric1, metric2, metric3 = st.columns(3)
    
    metric1.metric(
        label="Changes",
        value=df_summary['events1Min'].values[0],
        delta=float(df_summary['events1Min'].values[0] - df_summary['events1Min2Min'].values[0])
    )
    
    metric2.metric(
        label="Users",
        value=df_summary['users1Min'].values[0],
        delta=float(df_summary['users1Min'].values[0] - df_summary['users1Min2Min'].values[0])
    )
    
    metric3.metric(
        label="Domains",
        value=df_summary['domains1Min'].values[0],
        delta=float(df_summary['domains1Min'].values[0] - df_summary['domains1Min2Min'].values[0])
    )
    
    # Find all the changes by minute in the last hour
    
    query = """
    select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, 
           distinctcount(user) AS users,
           distinctcount(domain) AS domains
    from wikievents 
    where ts > ago('PT10M')
    group by dateMin
    order by dateMin desc
    LIMIT 30
    """
    
    curs.execute(query)
    df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
    
    fig = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
    fig['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
    fig.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
    st.plotly_chart(fig, use_container_width=True)
    
    # Refresh the page
    if auto_refresh:
        time.sleep(number)
        st.experimental_rerun()

    GitHub Events Stream

    Steps for setting up a Pinot cluster and a real-time table which consumes from the GitHub events stream.

    In this recipe you will set up an Apache Pinot cluster and a real-time table which consumes data flowing from a GitHub events stream. The stream is based on GitHub pull requests and uses Kafka.

    In this recipe you will perform the following steps:

    1. Set up a Pinot cluster, to do which you will:

      a. Start zookeeper.

      b. Start the controller.

      c. Start the broker.

      d. Start the server.

    2. Set up a Kafka cluster.

    3. Create a Kafka topic, which will be called pullRequestMergedEvents.

    4. Create a real-time table called pullRequestMergedEvents and a schema.

    5. Start a task which reads from the and publishes events about merged pull requests to the topic.

    6. Query the real-time data.

    hashtag
    Steps

    hashtag
    Use either Docker images or launcher scripts

    Pull the Docker image

    Get the latest Docker image.

    Long version

    Set up the Pinot cluster

    Follow the instructions in to set up a Pinot cluster with the components:

    hashtag
    Kubernetes cluster

    If you already have a Kubernetes cluster with Pinot and Kafka (see ), first create the topic, then set up the table and streaming using

    hashtag
    Query

    Browse to the to view the data.

    hashtag
    Visualize with SuperSet

    You can use SuperSet to visualize this data. Some of the interesting insights we captures were

    hashtag
    List the most active organizations during the lockdown

    Repositories by number of commits in the Apache organization

    To integrate with SuperSet you can check out the page.

    Zookeeper
  • Controller

  • Broker

  • Server

  • Kafka

  • Create a Kafka topic

    Create a Kafka topic called pullRequestMergedEvents for the demo.

    Add a Pinot table and schema

    The schema is present at examples/stream/githubEvents/pullRequestMergedEvents_schema.json and is also pasted below

    The table config is present at examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json and is also pasted below.

    circle-info

    Note If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

    Add the table and schema using the following command:

    Publish events

    Start streaming GitHub events into the Kafka topic:

    circle-info

    Prerequisites

    Generate a personal access tokenarrow-up-right on GitHub.

    Short version

    The short method of setting things up is to use the following command. Make sure to stop any previously running Pinot services.

    Get Pinot

    Follow the instructions in Build from sourcearrow-up-right to get the latest Pinot code

    Long version

    Set up the Pinot cluster

    Follow the instructions in Advanced Pinot Setuparrow-up-right to set up the Pinot cluster with the components:

    • Zookeeper

    • Controller

    • Broker

    • Server

    • Kafka

    Create a Kafka topic

    Download .

    Create a Kafka topic called pullRequestMergedEvents for the demo.

    Add a Pinot table and schema

    Schema can be found at /examples/stream/githubevents/ in the release, and is also pasted below:

    The table config can be found at /examples/stream/githubevents/ in the release, and is also pasted below.

    circle-info

    Note

    If you're setting this up on a pre-configured cluster, set the properties stream.kafka.zk.broker.url and stream.kafka.broker.list correctly, depending on the configuration of your Kafka cluster.

    Add the table and schema using the command:

    Publish events

    Start streaming GitHub events into the Kafka topic

    circle-info

    Prerequisites

    Generate a on GitHub.

    Short version

    For a single command to setup all the above steps

    GitHub events API arrow-up-right
    Advanced Pinot Setuparrow-up-right
    Running Pinot in Kubernetes
    Query Consolearrow-up-right
    SuperSet Integrations
    export PINOT_VERSION=latest
    export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
    docker pull ${PINOT_IMAGE}
    docker exec \
      -t kafka \
    
    $ cd kubernetes/helm
    $ kubectl apply -f pinot-github-realtime-events.yml
    pullRequestMergedEvents_schema.json
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "mergedTimeMillis",
          "dataType": "TIMESTAMP",
          "format": "1:MILLISECONDS:TIMESTAMP",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }
    pullRequestMergedEvents_realtime_table_config.json
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "pinot-zookeeper:2181/kafka",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    $ docker run \
        --network=pinot-demo \
        --name pinot-streaming-table-creation \
        ${PINOT_IMAGE} AddTable \
        -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
        -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json \
        -controllerHost pinot-controller \
        -controllerPort 9000 \
        -exec
    Executing command: AddTable -tableConfigFile examples/stream/githubEvents/docker/pullRequestMergedEvents_realtime_table_config.json -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
    Sending request: http://pinot-controller:9000/schemas to controller: 20c241022a96, version: Unknown
    {"status":"Table pullRequestMergedEvents_REALTIME succesfully added"}
    $ docker run --rm -ti \
        --network=pinot-demo \
        --name pinot-github-events-into-kafka \
        -d ${PINOT_IMAGE} StreamGitHubEvents \
        -schemaFile examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
        -topic pullRequestMergedEvents \
        -personalAccessToken <your_github_personal_access_token> \
        -kafkaBrokerList kafka:9092
    $ docker run --rm -ti \
        --network=pinot-demo \
        --name pinot-github-events-quick-start \
         ${PINOT_IMAGE} GitHubEventsQuickStart \
        -personalAccessToken <your_github_personal_access_token> 
    /opt/kafka/bin/kafka-topics.sh \
    --zookeeper pinot-zookeeper:2181/kafka \
    --partitions=1 --replication-factor=1 \
    --create --topic pullRequestMergedEvents
    Apache Kafkaarrow-up-right
    personal access tokenarrow-up-right

    Connect to Dash

    In this Apache Pinot guide, we'll learn how visualize data using the Dash web framework.

    In this guide you'll learn how to visualize data from Apache Pinot using Plotly's Dasharrow-up-right web framework. Dash is the most downloaded, trusted Python framework for building ML & data science web apps.

    We're going to use Dash to build a real-time dashboard to visualize the changes being made to Wikimedia properties.

    Real-Time Dashboard Architecture

    hashtag
    Startup components

    We're going to use the following Docker compose file, which spins up instances of Zookeeper, Kafka, along with a Pinot controller, broker, and server:

    docker-compose.yml

    Run the following command to launch all the components:

    hashtag
    Wikimedia recent changes stream

    Wikimedia provides provides a continuous stream of structured event data describing changes made to various Wikimedia properties. The events are published over HTTP using the Server-Side Events (SSE) Protocol.

    You can find the endpoint at:

    We'll need to install the SSE client library to consume this data:

    Next, create a file called wiki.py that contains the following:

    wiki.py

    The highlighted section shows how we connect to the recent changes feed using the SSE client library.

    Let's run this script as shown below:

    We'll see the following (truncated) output:

    Output

    hashtag
    Ingest recent changes into Kafka

    Now we're going to import each of the events into Apache Kafka. First let's create a Kafka topic called wiki_events with 5 partitions:

    Create a new file called wiki_to_kafka.py and import the following libraries:

    wiki_to_kafka.py

    Add these functions:

    wiki_to_kafka.py

    And now let's add the code that calls the recent changes API and imports events into the wiki_events topic:

    wiki_to_kafka.py

    The highlighted parts of this script indicate where events are ingested into Kafka and then flushed to disk.

    If we run this script:

    We'll see a message every time 100 messages are pushed to Kafka, as shown below:

    Output

    hashtag
    Explore Kafka

    Let's check that the data has made its way into Kafka.

    The following command returns the message offset for each partition in the wiki_events topic:

    Output

    Looks good. We can also stream all the messages in this topic by running the following command:

    Output

    hashtag
    Configure Pinot

    Now let's configure Pinot to consume the data from Kafka.

    We'll have the following schema:

    schema.json

    And the following table config:

    table.json

    The highlighted lines are how we connect Pinot to the Kafka topic that contains the events. Create the schema and table by running the following commnad:

    Once you've done that, navigate to the and run the following query to check that the data has made its way into Pinot:

    As long as you see some records, everything is working as expected.

    hashtag
    Building a Dash Dashboard

    Now let's write some more queries against Pinot and display the results in Dash.

    First, install the following libraries:

    Create a file called dashboard.py and import libraries and write a header for the page:

    app.py

    Connect to Pinot and write a query that returns recent changes, along with the users who made the changes, and domains where they were made:

    app.py

    The highlighted part of the query shows how to count the number of events from the last minute and the minute before that. We then do a similar thing to count the number of unique users and domains.

    hashtag
    Metrics

    Now let's create some metrics based on that data.

    First, let's create a couple of helper functions for creating these metrics:

    dash_utils.py

    And now let's add the following import to app.py:

    app.py

    And the following code at the end of the file:

    app.py

    Go back to the terminal and run the following command:

    Navigate to to see the Dash app. You should see something like the following:

    Dash Metrics

    hashtag
    Changes per minute

    Next, let's add a line chart that shows the number of changes being done to Wikimedia per minute. Update app.py as follows:

    app.py

    Go back to the web browser and you should see something like this:

    Dash Time Series

    hashtag
    Auto Refresh

    At the moment we need to refresh our web browser to update the metrics and line chart, but it would be much better if that happened automatically. Let's now add auto refresh functionality.

    This will require some restructuring of our application so that each component is rendered from a function annotated with a callback that causes the function to be called on an interval.

    The app layout now looks like this:

    app.py

    • interval-component is configured to fire a callback every 1,000 milliseconds.

    • latest-timestamp is a container that will contain the latest timestamp.

    • indicators will contain indicators with the latest counts of users, domains, and changes.

    The timestamp is refreshed by the following callback function:

    app.py

    The indicators are refreshed by this function:

    app.py

    And finally, the following function refreshes the line chart:

    app.py

    If we navigate back to our web browser, we'll see the following:

    Dash Auto Refresh

    The full script used in this example is shown below:

    dashboard.py

    hashtag
    Summary

    In this guide we've learnt how to publish data into Kafka from Wikimedia's event stream, ingest it from there into Pinot, and finally make sense of the data using SQL queries run from Dash.

    $ bin/kafka-topics.sh \
      --create \
      --bootstrap-server localhost:19092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pullRequestMergedEvents
    {
      "schemaName": "pullRequestMergedEvents",
      "dimensionFieldSpecs": [
        {
          "name": "title",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "labels",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "userId",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "userType",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "authorAssociation",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "mergedBy",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "assignees",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "authors",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "committers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedReviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "requestedTeams",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "reviewers",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "commenters",
          "dataType": "STRING",
          "singleValueField": false,
          "defaultNullValue": ""
        },
        {
          "name": "repo",
          "dataType": "STRING",
          "defaultNullValue": ""
        },
        {
          "name": "organization",
          "dataType": "STRING",
          "defaultNullValue": ""
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "count",
          "dataType": "LONG",
          "defaultNullValue": 1
        },
        {
          "name": "numComments",
          "dataType": "LONG"
        },
        {
          "name": "numReviewComments",
          "dataType": "LONG"
        },
        {
          "name": "numCommits",
          "dataType": "LONG"
        },
        {
          "name": "numLinesAdded",
          "dataType": "LONG"
        },
        {
          "name": "numLinesDeleted",
          "dataType": "LONG"
        },
        {
          "name": "numFilesChanged",
          "dataType": "LONG"
        },
        {
          "name": "numAuthors",
          "dataType": "LONG"
        },
        {
          "name": "numCommitters",
          "dataType": "LONG"
        },
        {
          "name": "numReviewers",
          "dataType": "LONG"
        },
        {
          "name": "numCommenters",
          "dataType": "LONG"
        },
        {
          "name": "createdTimeMillis",
          "dataType": "LONG"
        },
        {
          "name": "elapsedTimeMillis",
          "dataType": "LONG"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "timeType": "MILLISECONDS",
          "timeFormat": "EPOCH",
          "dataType": "LONG",
          "name": "mergedTimeMillis"
        }
      }
    }
    {
      "tableName": "pullRequestMergedEvents",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mergedTimeMillis",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "60",
        "schemaName": "pullRequestMergedEvents",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "invertedIndexColumns": [
          "organization",
          "repo"
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "pullRequestMergedEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092",
          "realtime.segment.flush.threshold.time": "12h",
          "realtime.segment.flush.threshold.rows": "100000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    
    $ bin/pinot-admin.sh AddTable \
      -tableConfigFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json \
      -exec
    $ bin/pinot-admin.sh StreamGitHubEvents \
      -topic pullRequestMergedEvents \
      -personalAccessToken <your_github_personal_access_token> \
      -kafkaBrokerList localhost:19092 \
      -schemaFile $PATH_TO_CONFIGS/examples/stream/githubEvents/pullRequestMergedEvents_schema.json
    $ bin/pinot-admin.sh GitHubEventsQuickStart \
      -personalAccessToken <your_github_personal_access_token>

    time-series will contain the time series line chart.

    stream.wikimedia.org/v2/stream/recentchangearrow-up-right
    Pinot UIarrow-up-right
    localhost:8501arrow-up-right
    version: '3.7'
    services:
      zookeeper:
        image: zookeeper:3.5.6
        container_name: "zookeeper-wiki"
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
      kafka:
        image: wurstmeister/kafka:latest
        restart: unless-stopped
        container_name: "kafka-wiki"
        ports:
          - "9092:9092"
        expose:
          - "9093"
        depends_on:
          - zookeeper
        environment:
          KAFKA_ZOOKEEPER_CONNECT: zookeeper-wiki:2181/kafka
          KAFKA_BROKER_ID: 0
          KAFKA_ADVERTISED_HOST_NAME: kafka-wiki
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-wiki:9093,OUTSIDE://localhost:9092
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      pinot-controller:
        image: apachepinot/pinot:0.10.0
        command: "StartController -zkAddress zookeeper-wiki:2181 -dataDir /data"
        container_name: "pinot-controller-wiki"
        volumes:
          - ./config:/config
          - ./data:/data
        restart: unless-stopped
        ports:
          - "9000:9000"
        depends_on:
          - zookeeper
      pinot-broker:
        image: apachepinot/pinot:0.10.0
        command: "StartBroker -zkAddress zookeeper-wiki:2181"
        restart: unless-stopped
        container_name: "pinot-broker-wiki"
        volumes:
          - ./config:/config
        ports:
          - "8099:8099"
        depends_on:
          - pinot-controller
      pinot-server:
        image: apachepinot/pinot:0.10.0
        command: "StartServer -zkAddress zookeeper-wiki:2181"
        restart: unless-stopped
        container_name: "pinot-server-wiki"
        volumes:
          - ./config:/config
        depends_on:
          - pinot-broker
    docker-compose up
    pip install sseclient-py
    import json
    import pprint
    import sseclient
    import requests
    
    def with_requests(url, headers):
        """Get a streaming response for the given event feed using requests."""    
        return requests.get(url, stream=True, headers=headers)
    
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    headers = {'Accept': 'text/event-stream'}
    response = with_requests(url, headers)
    client = sseclient.SSEClient(response)
    
    for event in client.events():
        stream = json.loads(event.data)
        pprint.pprint(stream)
    python wiki.py
    {'$schema': '/mediawiki/recentchange/1.0.0',
     'bot': False,
     'comment': '[[:File:Storemyr-Fagerbakken landskapsvernområde HVASSER '
                'Oslofjorden Norway (Protected coastal forest Recreational area '
                'hiking trails) Rituell-kultisk steinstreng sørøst i skogen (small '
                'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg]] removed '
                'from category',
     'id': 1923506287,
     'meta': {'domain': 'commons.wikimedia.org',
              'dt': '2022-05-12T09:57:00Z',
              'id': '3800228e-43d8-440d-8034-c68977742653',
              'offset': 3855767440,
              'partition': 0,
              'request_id': '930b17cc-f14a-4656-afa1-d15b79a8f666',
              'stream': 'mediawiki.recentchange',
              'topic': 'eqiad.mediawiki.recentchange',
              'uri': 'https://commons.wikimedia.org/wiki/Category:Iron_Age_in_Norway'},
     'namespace': 14,
     'parsedcomment': '<a '
                      'href="/wiki/File:Storemyr-Fagerbakken_landskapsvernomr%C3%A5de_HVASSER_Oslofjorden_Norway_(Protected_coastal_forest_Recreational_area_hiking_trails)_Rituell-kultisk_steinstreng_s%C3%B8r%C3%B8st_i_skogen_(small_archeological_stone_string)_V%C3%A5r_(spring)_2021-04-24.jpg" '
                      'title="File:Storemyr-Fagerbakken landskapsvernområde '
                      'HVASSER Oslofjorden Norway (Protected coastal forest '
                      'Recreational area hiking trails) Rituell-kultisk '
                      'steinstreng sørøst i skogen (small archeological stone '
                      'string) VÃ¥r (spring) '
                      '2021-04-24.jpg">File:Storemyr-Fagerbakken '
                      'landskapsvernområde HVASSER Oslofjorden Norway (Protected '
                      'coastal forest Recreational area hiking trails) '
                      'Rituell-kultisk steinstreng sørøst i skogen (small '
                      'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg</a> '
                      'removed from category',
     'server_name': 'commons.wikimedia.org',
     'server_script_path': '/w',
     'server_url': 'https://commons.wikimedia.org',
     'timestamp': 1652349420,
     'title': 'Category:Iron Age in Norway',
     'type': 'categorize',
     'user': 'Krg',
     'wiki': 'commonswiki'}
    {'$schema': '/mediawiki/recentchange/1.0.0',
     'bot': False,
     'comment': '[[:File:Storemyr-Fagerbakken landskapsvernområde HVASSER '
                'Oslofjorden Norway (Protected coastal forest Recreational area '
                'hiking trails) Rituell-kultisk steinstreng sørøst i skogen (small '
                'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg]] removed '
                'from category',
     'id': 1923506289,
     'meta': {'domain': 'commons.wikimedia.org',
              'dt': '2022-05-12T09:57:00Z',
              'id': '2b819d20-beca-46a5-8ce3-b2f3b73d2cbe',
              'offset': 3855767441,
              'partition': 0,
              'request_id': '930b17cc-f14a-4656-afa1-d15b79a8f666',
              'stream': 'mediawiki.recentchange',
              'topic': 'eqiad.mediawiki.recentchange',
              'uri': 'https://commons.wikimedia.org/wiki/Category:Cultural_heritage_monuments_in_F%C3%A6rder'},
     'namespace': 14,
     'parsedcomment': '<a '
                      'href="/wiki/File:Storemyr-Fagerbakken_landskapsvernomr%C3%A5de_HVASSER_Oslofjorden_Norway_(Protected_coastal_forest_Recreational_area_hiking_trails)_Rituell-kultisk_steinstreng_s%C3%B8r%C3%B8st_i_skogen_(small_archeological_stone_string)_V%C3%A5r_(spring)_2021-04-24.jpg" '
                      'title="File:Storemyr-Fagerbakken landskapsvernområde '
                      'HVASSER Oslofjorden Norway (Protected coastal forest '
                      'Recreational area hiking trails) Rituell-kultisk '
                      'steinstreng sørøst i skogen (small archeological stone '
                      'string) VÃ¥r (spring) '
                      '2021-04-24.jpg">File:Storemyr-Fagerbakken '
                      'landskapsvernområde HVASSER Oslofjorden Norway (Protected '
                      'coastal forest Recreational area hiking trails) '
                      'Rituell-kultisk steinstreng sørøst i skogen (small '
                      'archeological stone string) VÃ¥r (spring) 2021-04-24.jpg</a> '
                      'removed from category',
     'server_name': 'commons.wikimedia.org',
     'server_script_path': '/w',
     'server_url': 'https://commons.wikimedia.org',
     'timestamp': 1652349420,
     'title': 'Category:Cultural heritage monuments in Færder',
     'type': 'categorize',
     'user': 'Krg',
     'wiki': 'commonswiki'}
    docker exec -it kafka-wiki kafka-topics.sh \
      --bootstrap-server localhost:9092 \
      --create \
      --topic wiki_events \
      --partitions 5
    import json
    import sseclient
    import datetime
    import requests
    import time
    from confluent_kafka import Producer
    def with_requests(url, headers):
        """Get a streaming response for the given event feed using requests."""    
        return requests.get(url, stream=True, headers=headers)
    
    def acked(err, msg):
        if err is not None:
            print("Failed to deliver message: {0}: {1}"
                  .format(msg.value(), err.str()))
    
    def json_serializer(obj):
        if isinstance(obj, (datetime.datetime, datetime.date)):
            return obj.isoformat()
        raise "Type %s not serializable" % type(obj)
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
    
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    headers = {'Accept': 'text/event-stream'}
    response = with_requests(url, headers) 
    client = sseclient.SSEClient(response)
    
    events_processed = 0
    while True:
        try: 
            for event in client.events():
                stream = json.loads(event.data)
                payload = json.dumps(stream, default=json_serializer, ensure_ascii=False).encode('utf-8')
                producer.produce(topic='wiki_events', 
                  key=str(stream['meta']['id']), value=payload, callback=acked)
    
                events_processed += 1
                if events_processed % 100 == 0:
                    print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events")
                    producer.flush()
        except Exception as ex:
            print(f"{str(datetime.datetime.now())} Got error:" + str(ex))
            response = with_requests(url, headers) 
            client = sseclient.SSEClient(response)
            time.sleep(2)
    python wiki_to_kafka.py
    2022-05-12 10:58:34.449326 Flushing after 100 events
    2022-05-12 10:58:39.151599 Flushing after 200 events
    2022-05-12 10:58:43.399528 Flushing after 300 events
    2022-05-12 10:58:47.350277 Flushing after 400 events
    2022-05-12 10:58:50.847959 Flushing after 500 events
    2022-05-12 10:58:54.768228 Flushing after 600 events
    docker exec -it kafka-wiki kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list localhost:9092 \
      --topic wiki_events
    wiki_events:0:42
    wiki_events:1:61
    wiki_events:2:52
    wiki_events:3:56
    wiki_events:4:58
    docker exec -it kafka-wiki kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --topic wiki_events \
      --from-beginning
    ...
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://en.wikipedia.org/wiki/Super_Wings", "request_id": "6f82e64d-220f-41f4-88c3-2e15f03ae504", "id": "c30cd735-1ead-405e-94d1-49fbe7c40411", "dt": "2022-05-12T10:05:36Z", "domain": "en.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779703}, "type": "log", "namespace": 0, "title": "Super Wings", "comment": "", "timestamp": 1652349936, "user": "2001:448A:50E0:885B:FD1D:2D04:233E:7647", "bot": false, "log_id": 0, "log_type": "abusefilter", "log_action": "hit", "log_params": {"action": "edit", "filter": "550", "actions": "tag", "log": 32575794}, "log_action_comment": "2001:448A:50E0:885B:FD1D:2D04:233E:7647 triggered [[Special:AbuseFilter/550|filter 550]], performing the action \"edit\" on [[Super Wings]]. Actions taken: Tag ([[Special:AbuseLog/32575794|details]])", "server_url": "https://en.wikipedia.org", "server_name": "en.wikipedia.org", "server_script_path": "/w", "wiki": "enwiki", "parsedcomment": ""}
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://no.wikipedia.org/wiki/Brukerdiskusjon:Haros", "request_id": "a20c9692-f301-4faf-9373-669bebbffff4", "id": "566ee63e-8e86-4a7e-a1f3-562704306509", "dt": "2022-05-12T10:05:36Z", "domain": "no.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779714}, "id": 84572581, "type": "edit", "namespace": 3, "title": "Brukerdiskusjon:Haros", "comment": "/* Stor forbokstav / ucfirst */", "timestamp": 1652349936, "user": "Asav", "bot": false, "minor": false, "patrolled": true, "length": {"old": 110378, "new": 110380}, "revision": {"old": 22579494, "new": 22579495}, "server_url": "https://no.wikipedia.org", "server_name": "no.wikipedia.org", "server_script_path": "/w", "wiki": "nowiki", "parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\"><a href=\"/wiki/Brukerdiskusjon:Haros#Stor_forbokstav_/_ucfirst\" title=\"Brukerdiskusjon:Haros\">→‎Stor forbokstav / ucfirst</a></span></span>"}
    {"$schema": "/mediawiki/recentchange/1.0.0", "meta": {"uri": "https://es.wikipedia.org/wiki/Campo_de_la_calle_Industria", "request_id": "d45bd9af-3e2c-4aac-ae8f-e16d3340da76", "id": "7fb3956e-9bd2-4fa5-8659-72b266cdb45b", "dt": "2022-05-12T10:05:35Z", "domain": "es.wikipedia.org", "stream": "mediawiki.recentchange", "topic": "eqiad.mediawiki.recentchange", "partition": 0, "offset": 3855779718}, "id": 266270269, "type": "edit", "namespace": 0, "title": "Campo de la calle Industria", "comment": "/* Historia */", "timestamp": 1652349935, "user": "Raimon will", "bot": false, "minor": false, "length": {"old": 7566, "new": 7566}, "revision": {"old": 143485393, "new": 143485422}, "server_url": "https://es.wikipedia.org", "server_name": "es.wikipedia.org", "server_script_path": "/w", "wiki": "eswiki", "parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\"><a href=\"/wiki/Campo_de_la_calle_Industria#Historia\" title=\"Campo de la calle Industria\">→‎Historia</a></span></span>"}
    ^CProcessed a total of 269 messages
    {
        "schemaName": "wikipedia",
        "dimensionFieldSpecs": [
          {
            "name": "id",
            "dataType": "STRING"
          },
          {
            "name": "wiki",
            "dataType": "STRING"
          },
          {
            "name": "user",
            "dataType": "STRING"
          },
          {
            "name": "title",
            "dataType": "STRING"
          },
          {
            "name": "comment",
            "dataType": "STRING"
          },
          {
            "name": "stream",
            "dataType": "STRING"
          },
          {
            "name": "domain",
            "dataType": "STRING"
          },
          {
            "name": "topic",
            "dataType": "STRING"
          },
          {
            "name": "type",
            "dataType": "STRING"
          },
          {
            "name": "uri",
            "dataType": "STRING"
          },
          {
            "name": "bot",
            "dataType": "BOOLEAN"
          },
          {
            "name": "metaJson",
            "dataType": "STRING"
          }
        ],
        "dateTimeFieldSpecs": [
          {
            "name": "ts",
            "dataType": "TIMESTAMP",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
          }
        ]
      }
    {
        "tableName": "wikievents",
        "tableType": "REALTIME",
        "segmentsConfig": {
          "timeColumnName": "ts",
          "schemaName": "wikipedia",
          "replication": "1",
          "replicasPerPartition": "1"
        },
    
        "tableIndexConfig": {
          "invertedIndexColumns": [],
          "rangeIndexColumns": [],
          "autoGeneratedInvertedIndex": false,
          "createInvertedIndexDuringSegmentGeneration": false,
          "sortedColumn": [],
          "bloomFilterColumns": [],
          "loadMode": "MMAP",
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "wiki_events",
            "stream.kafka.broker.list": "kafka-wiki:9093",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "1000",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.segment.size": "100M"
          },
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant",
          "tagOverrideConfig": {}
        },
          "noDictionaryColumns": [],
          "onHeapDictionaryColumns": [],
          "varLengthDictionaryColumns": [],
          "enableDefaultStarTree": false,
          "enableDynamicStarTreeCreation": false,
          "aggregateMetrics": false,
          "nullHandlingEnabled": false
        },
        "metadata": {},
        "quota": {},
        "routing": {},
        "query": {},
        "ingestionConfig": {
          "transformConfigs": [
            {
              "columnName": "metaJson",
              "transformFunction": "JSONFORMAT(meta)"
            },
            {
              "columnName": "id",
              "transformFunction": "JSONPATH(metaJson, '$.id')"
            },
            {
              "columnName": "stream",
              "transformFunction": "JSONPATH(metaJson, '$.stream')"
            },
            {
              "columnName": "domain",
              "transformFunction": "JSONPATH(metaJson, '$.domain')"
            },
            {
              "columnName": "topic",
              "transformFunction": "JSONPATH(metaJson, '$.topic')"
            },
            {
              "columnName": "uri",
              "transformFunction": "JSONPATH(metaJson, '$.uri')"
            },
            {
              "columnName": "ts",
              "transformFunction": "\"timestamp\" * 1000"
            }
          ]
        },
        "isDimTable": false
      }
    docker exec -it pinot-controller-wiki bin/pinot-admin.sh AddTable \
      -tableConfigFile /config/table.json \
      -schemaFile /config/schema.json \
      -exec
    select domain, count(*) 
    from wikievents 
    group by domain
    order by count(*) DESC
    limit 10
    pip install dash pinotdb plotly pandas
    import pandas as pd
    from dash import Dash, html, dcc
    import plotly.graph_objects as go
    from pinotdb import connect
    import plotly.express as px
    
    external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
    app = Dash(__name__, external_stylesheets=external_stylesheets)
    app.title = "Wiki Recent Changes Dashboard"
    conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
    
    query = """select
      count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
      count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,  
      distinctcount(user) FILTER(WHERE  ts > ago('PT1M')) AS users1Min,
      distinctcount(user) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
      distinctcount(domain) FILTER(WHERE  ts > ago('PT1M')) AS domains1Min,
      distinctcount(domain) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
    from wikievents 
    where ts > ago('PT2M')
    limit 1
    """
    
    curs = conn.cursor()
    
    curs.execute(query)
    df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    from dash import html, dash_table
    import plotly.graph_objects as go
    
    def add_delta_trace(fig, title, value, last_value, row, column):
        fig.add_trace(go.Indicator(
            mode = "number+delta",
            title= {'text': title},
            value = value,
            delta = {'reference': last_value, 'relative': True},
            domain = {'row': row, 'column': column})
        )
    
    def add_trace(fig, title, value, row, column):
        fig.add_trace(go.Indicator(
            mode = "number",
            title= {'text': title},
            value = value,
            domain = {'row': row, 'column': column})
        )
    from dash_utils import add_delta_trace, add_trace
    fig = go.Figure(layout=go.Layout(height=300))
    if df_summary["events1Min"][0] > 0:
        if df_summary["events1Min"][0] > 0:
            add_delta_trace(fig, "Changes", df_summary["events1Min"][0], df_summary["events1Min2Min"][0], 0, 0)
            add_delta_trace(fig, "Users", df_summary["users1Min"][0], df_summary["users1Min2Min"][0], 0, 1)
            add_delta_trace(fig, "Domain", df_summary["domains1Min"][0], df_summary["domains1Min2Min"][0], 0, 2)
        else:
            add_trace(fig, "Changes", df_summary["events1Min"][0], 0, 0)
            add_trace(fig, "Users", df_summary["users1Min2Min"][0], 0, 1)
            add_trace(fig, "Domains", df_summary["domains1Min2Min"][0], 0, 2)
        fig.update_layout(grid = {"rows": 1, "columns": 3,  'pattern': "independent"},) 
    else:
        fig.update_layout(annotations = [{"text": "No events found", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 28}}])
    
    app.layout = html.Div([
        html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}),
        html.Div(id='content', children=[
            dcc.Graph(figure=fig)
        ])
    ])
    
    if __name__ == '__main__':
        app.run_server(debug=True)    
    python dashboard.py
    query = """
    select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, 
        distinctcount(user) AS users,
        distinctcount(domain) AS domains
    from wikievents 
    where ts > ago('PT2M')
    group by dateMin
    order by dateMin desc
    LIMIT 30
    """
    
    curs.execute(query)
    df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
    
    line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
    line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
    line_chart.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
    
    app.layout = html.Div([
        html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}),
        html.Div(id='content', children=[
            dcc.Graph(figure=fig),
            dcc.Graph(figure=line_chart),
        ])
    ])
    app.layout = html.Div([
        html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}),
        html.Div(id='latest-timestamp', style={"padding": "5px 0", "text-align": "center"}),
        dcc.Interval(
                id='interval-component',
                interval=1 * 1000,
                n_intervals=0
            ),
        html.Div(id='content', children=[
            dcc.Graph(id="indicators"),
            dcc.Graph(id="time-series"),
        ])
    ])
    @app.callback(
        Output(component_id='latest-timestamp', component_property='children'),
        Input('interval-component', 'n_intervals'))
    def timestamp(n):
        return html.Span(f"Last updated: {datetime.datetime.now()}")
    @app.callback(Output(component_id='indicators', component_property='figure'),
                  Input('interval-component', 'n_intervals'))
    def indicators(n):
        query = """
        select count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
            count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
            distinctcount(user) FILTER(WHERE  ts > ago('PT1M')) AS users1Min,
            distinctcount(user) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
            distinctcount(domain) FILTER(WHERE  ts > ago('PT1M')) AS domains1Min,
            distinctcount(domain) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
        from wikievents 
        where ts > ago('PT2M')
        limit 1
        """
    
        curs = connection.cursor()
        curs.execute(query)
        df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
        curs.close()
    
        fig = go.Figure(layout=go.Layout(height=300))
        if df_summary["events1Min"][0] > 0:
            if df_summary["events1Min"][0] > 0:
                add_delta_trace(fig, "Changes", df_summary["events1Min"][0], df_summary["events1Min2Min"][0], 0, 0)
                add_delta_trace(fig, "Users", df_summary["users1Min"][0], df_summary["users1Min2Min"][0], 0, 1)
                add_delta_trace(fig, "Domain", df_summary["domains1Min"][0], df_summary["domains1Min2Min"][0], 0, 2)
            else:
                add_trace(fig, "Changes", df_summary["events1Min"][0], 0, 0)
                add_trace(fig, "Users", df_summary["users1Min2Min"][0], 0, 1)
                add_trace(fig, "Domains", df_summary["domains1Min2Min"][0], 0, 2)
            fig.update_layout(grid = {"rows": 1, "columns": 3,  'pattern': "independent"},) 
        else:
            fig.update_layout(annotations = [{"text": "No events found", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 28}}])
        return fig
    @app.callback(Output(component_id='time-series', component_property='figure'),
        Input('interval-component', 'n_intervals'))
    def time_series(n):
        query = """
        select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, 
            distinctcount(user) AS users,
            distinctcount(domain) AS domains
        from wikievents 
        where ts > ago('PT1H')
        group by dateMin
        order by dateMin desc
        LIMIT 30
        """
    
        curs = connection.cursor()
        curs.execute(query)
        df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
        curs.close()
    
        df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
    
        line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
        line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
        line_chart.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
        return line_chart
    import pandas as pd
    from dash import Dash, html, dash_table, dcc, Input, Output
    import plotly.graph_objects as go
    from pinotdb import connect
    from dash_utils import add_delta_trace, add_trace
    import plotly.express as px
    import datetime
    
    external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
    app = Dash(__name__, external_stylesheets=external_stylesheets)
    app.title = "Wiki Recent Changes Dashboard"
    
    connection = connect(host="localhost", port="8099", path="/query/sql", scheme=( "http"))
    
    
    @app.callback(Output(component_id='indicators', component_property='figure'),
                  Input('interval-component', 'n_intervals'))
    def indicators(n):
        query = """
        select count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
            count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
            distinctcount(user) FILTER(WHERE  ts > ago('PT1M')) AS users1Min,
            distinctcount(user) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
            distinctcount(domain) FILTER(WHERE  ts > ago('PT1M')) AS domains1Min,
            distinctcount(domain) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
        from wikievents 
        where ts > ago('PT2M')
        limit 1
        """
    
        curs = connection.cursor()
        curs.execute(query)
        df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
        curs.close()
    
        fig = go.Figure(layout=go.Layout(height=300))
        if df_summary["events1Min"][0] > 0:
            if df_summary["events1Min"][0] > 0:
                add_delta_trace(fig, "Changes", df_summary["events1Min"][0], df_summary["events1Min2Min"][0], 0, 0)
                add_delta_trace(fig, "Users", df_summary["users1Min"][0], df_summary["users1Min2Min"][0], 0, 1)
                add_delta_trace(fig, "Domain", df_summary["domains1Min"][0], df_summary["domains1Min2Min"][0], 0, 2)
            else:
                add_trace(fig, "Changes", df_summary["events1Min"][0], 0, 0)
                add_trace(fig, "Users", df_summary["users1Min2Min"][0], 0, 1)
                add_trace(fig, "Domains", df_summary["domains1Min2Min"][0], 0, 2)
            fig.update_layout(grid = {"rows": 1, "columns": 3,  'pattern': "independent"},) 
        else:
            fig.update_layout(annotations = [{"text": "No events found", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 28}}])
        return fig
    
    @app.callback(Output(component_id='time-series', component_property='figure'),
        Input('interval-component', 'n_intervals'))
    def time_series(n):
        query = """
        select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, 
            distinctcount(user) AS users,
            distinctcount(domain) AS domains
        from wikievents 
        where ts > ago('PT1H')
        group by dateMin
        order by dateMin desc
        LIMIT 30
        """
    
        curs = connection.cursor()
        curs.execute(query)
        df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
        curs.close()
    
        df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
    
        line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
        line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
        line_chart.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
        return line_chart
    
    @app.callback(
        Output(component_id='latest-timestamp', component_property='children'),
        Input('interval-component', 'n_intervals'))
    def timestamp(n):
        return html.Span(f"Last updated: {datetime.datetime.now()}")
    
    app.layout = html.Div([
        html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}),
        html.Div(id='latest-timestamp', style={"padding": "5px 0", "text-align": "center"}),
        dcc.Interval(
                id='interval-component',
                interval=1 * 1000,
                n_intervals=0
            ),
        html.Div(id='content', children=[
            dcc.Graph(id="indicators"),
            dcc.Graph(id="time-series"),
        ])
    ])
    
    if __name__ == '__main__':
        app.run_server(debug=True)
    Redash Settings - Data Sources
    Real-Time Dashboard Architecture
    Streamlit Time Series
    Streamlit Auto Refresh
    Streamlit Metrics
    Dash Metrics
    Real-Time Dashboard Architecture
    Dash Time Series
    Dash Auto Refresh