In this guide you'll learn how to visualize data from Apache Pinot using Streamlit. Streamlit is a Python library that makes it easy to build interactive data based web applications.
We're going to use Streamlit to build a real-time dashboard to visualize the changes being made to Wikimedia properties.
Real-Time Dashboard Architecture
Startup components
We're going to use the following Docker compose file, which spins up instances of Zookeeper, Kafka, along with a Pinot controller, broker, and server:
Run the following command to launch all the components:
docker-compose up
Wikimedia recent changes stream
Wikimedia provides provides a continuous stream of structured event data describing changes made to various Wikimedia properties. The events are published over HTTP using the Server-Side Events (SSE) Protocol.
Create a new file called wiki_to_kafka.py and import the following libraries:
import json
import sseclient
import datetime
import requests
import time
from confluent_kafka import Producer
wiki_to_kafka.py
Add these functions:
def with_requests(url, headers):
"""Get a streaming response for the given event feed using requests."""
return requests.get(url, stream=True, headers=headers)
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}"
.format(msg.value(), err.str()))
def json_serializer(obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
raise "Type %s not serializable" % type(obj)
wiki_to_kafka.py
And now let's add the code that calls the recent changes API and imports events into the wiki_events topic:
The highlighted lines are how we connect Pinot to the Kafka topic that contains the events. Create the schema and table by running the following commnad:
Once you've done that, navigate to the Pinot UI and run the following query to check that the data has made its way into Pinot:
select domain, count(*)
from wikievents
group by domain
order by count(*) DESC
limit 10
As long as you see some records, everything is working as expected.
Building a Streamlit Dashboard
Now let's write some more queries against Pinot and display the results in Streamlit.
First, install the following libraries:
pip install streamlit pinotdb plotly pandas
Create a file called app.py and import libraries and write a header for the page:
import pandas as pd
import streamlit as st
from pinotdb import connect
import plotly.express as px
st.set_page_config(layout="wide")
st.header("Wikipedia Recent Changes")
app.py
Connect to Pinot and write a query that returns recent changes, along with the users who made the changes, and domains where they were made:
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
query = """select
count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min,
count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
distinctcount(user) FILTER(WHERE ts > ago('PT1M')) AS users1Min,
distinctcount(user) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
distinctcount(domain) FILTER(WHERE ts > ago('PT1M')) AS domains1Min,
distinctcount(domain) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
from wikievents
where ts > ago('PT2M')
limit 1
"""
curs = conn.cursor()
curs.execute(query)
df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
app.py
The highlighted part of the query shows how to count the number of events from the last minute and the minute before that. We then do a similar thing to count the number of unique users and domains.
Go back to the terminal and run the following command:
streamlit run app.py
Navigate to localhost:8501 to see the Streamlit app. You should see something like the following:
Changes per minute
Next, let's add a line chart that shows the number of changes being done to Wikimedia per minute. Add the following code to app.py:
query = """
select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes,
distinctcount(user) AS users,
distinctcount(domain) AS domains
from wikievents
where ts > ago('PT1H')
group by dateMin
order by dateMin desc
LIMIT 30
"""
curs.execute(query)
df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
fig = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
fig['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
fig.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
st.plotly_chart(fig, use_container_width=True)
app.py
Go back to the web browser and you should see something like this:
Auto Refresh
At the moment we need to refresh our web browser to update the metrics and line chart, but it would be much better if that happened automatically. Let's now add auto refresh functionality.
Add the following code just under the header at the top of app.py:
if not "sleep_time" in st.session_state:
st.session_state.sleep_time = 2
if not "auto_refresh" in st.session_state:
st.session_state.auto_refresh = True
auto_refresh = st.checkbox('Auto Refresh?', st.session_state.auto_refresh)
if auto_refresh:
number = st.number_input('Refresh rate in seconds', value=st.session_state.sleep_time)
st.session_state.sleep_time = number
app.py
And the following code at the very end:
if auto_refresh:
time.sleep(number)
st.experimental_rerun()
app.py
If we navigate back to our web browser, we'll see the following:
The full script used in this example is shown below:
import pandas as pd
import streamlit as st
from pinotdb import connect
from datetime import datetime
import plotly.express as px
import time
st.set_page_config(layout="wide")
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
st.header("Wikipedia Recent Changes")
now = datetime.now()
dt_string = now.strftime("%d %B %Y %H:%M:%S")
st.write(f"Last update: {dt_string}")
# Use session state to keep track of whether we need to auto refresh the page and the refresh frequency
if not "sleep_time" in st.session_state:
st.session_state.sleep_time = 2
if not "auto_refresh" in st.session_state:
st.session_state.auto_refresh = True
auto_refresh = st.checkbox('Auto Refresh?', st.session_state.auto_refresh)
if auto_refresh:
number = st.number_input('Refresh rate in seconds', value=st.session_state.sleep_time)
st.session_state.sleep_time = number
# Find changes that happened in the last 1 minute
# Find changes that happened between 1 and 2 minutes ago
query = """
select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min,
count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
distinctcount(user) FILTER(WHERE ts > ago('PT1M')) AS users1Min,
distinctcount(user) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min,
distinctcount(domain) FILTER(WHERE ts > ago('PT1M')) AS domains1Min,
distinctcount(domain) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min
from wikievents
where ts > ago('PT2M')
limit 1
"""
curs = conn.cursor()
curs.execute(query)
df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
metric1, metric2, metric3 = st.columns(3)
metric1.metric(
label="Changes",
value=df_summary['events1Min'].values[0],
delta=float(df_summary['events1Min'].values[0] - df_summary['events1Min2Min'].values[0])
)
metric2.metric(
label="Users",
value=df_summary['users1Min'].values[0],
delta=float(df_summary['users1Min'].values[0] - df_summary['users1Min2Min'].values[0])
)
metric3.metric(
label="Domains",
value=df_summary['domains1Min'].values[0],
delta=float(df_summary['domains1Min'].values[0] - df_summary['domains1Min2Min'].values[0])
)
# Find all the changes by minute in the last hour
query = """
select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes,
distinctcount(user) AS users,
distinctcount(domain) AS domains
from wikievents
where ts > ago('PT10M')
group by dateMin
order by dateMin desc
LIMIT 30
"""
curs.execute(query)
df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])
fig = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])
fig['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")
fig.update_yaxes(range=[0, df_ts["changes"].max() * 1.1])
st.plotly_chart(fig, use_container_width=True)
# Refresh the page
if auto_refresh:
time.sleep(number)
st.experimental_rerun()
app.py
Summary
In this guide we've learnt how to publish data into Kafka from Wikimedia's event stream, ingest it from there into Pinot, and finally make sense of the data using SQL queries run from Streamlit.