In this guide you'll learn how to visualize data from Apache Pinot using Plotly's Dash web framework. Dash is the most downloaded, trusted Python framework for building ML & data science web apps.
We're going to use Dash to build a real-time dashboard to visualize the changes being made to Wikimedia properties.
Real-Time Dashboard Architecture
Startup components
We're going to use the following Docker compose file, which spins up instances of Zookeeper, Kafka, along with a Pinot controller, broker, and server:
Run the following command to launch all the components:
docker-composeup
Wikimedia recent changes stream
Wikimedia provides provides a continuous stream of structured event data describing changes made to various Wikimedia properties. The events are published over HTTP using the Server-Side Events (SSE) Protocol.
defwith_requests(url,headers):"""Get a streaming response for the given event feed using requests."""return requests.get(url, stream=True, headers=headers)defacked(err,msg):if err isnotNone:print("Failed to deliver message: {0}: {1}" .format(msg.value(), err.str()))defjson_serializer(obj):ifisinstance(obj, (datetime.datetime, datetime.date)):return obj.isoformat()raise"Type %s not serializable"%type(obj)
wiki_to_kafka.py
And now let's add the code that calls the recent changes API and imports events into the wiki_events topic:
The highlighted lines are how we connect Pinot to the Kafka topic that contains the events. Create the schema and table by running the following commnad:
Once you've done that, navigate to the Pinot UI and run the following query to check that the data has made its way into Pinot:
select domain, count(*) from wikievents group by domainorder bycount(*) DESClimit10
As long as you see some records, everything is working as expected.
Building a Dash Dashboard
Now let's write some more queries against Pinot and display the results in Dash.
First, install the following libraries:
pipinstalldashpinotdbplotlypandas
Create a file called dashboard.py and import libraries and write a header for the page:
import pandas as pdfrom dash import Dash, html, dccimport plotly.graph_objects as gofrom pinotdb import connectimport plotly.express as pxexternal_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']app =Dash(__name__, external_stylesheets=external_stylesheets)app.title ="Wiki Recent Changes Dashboard"
app.py
Connect to Pinot and write a query that returns recent changes, along with the users who made the changes, and domains where they were made:
conn =connect(host='localhost', port=8099, path='/query/sql', scheme='http')query ="""select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min, count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min, distinctcount(user) FILTER(WHERE ts > ago('PT1M')) AS users1Min, distinctcount(user) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min, distinctcount(domain) FILTER(WHERE ts > ago('PT1M')) AS domains1Min, distinctcount(domain) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Minfrom wikievents where ts > ago('PT2M')limit 1"""curs = conn.cursor()curs.execute(query)df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
app.py
The highlighted part of the query shows how to count the number of events from the last minute and the minute before that. We then do a similar thing to count the number of unique users and domains.
Metrics
Now let's create some metrics based on that data.
First, let's create a couple of helper functions for creating these metrics:
Go back to the terminal and run the following command:
pythondashboard.py
Navigate to localhost:8501 to see the Dash app. You should see something like the following:
Changes per minute
Next, let's add a line chart that shows the number of changes being done to Wikimedia per minute. Update app.py as follows:
query ="""select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, distinctcount(user) AS users, distinctcount(domain) AS domainsfrom wikievents where ts > ago('PT2M')group by dateMinorder by dateMin descLIMIT 30"""curs.execute(query)df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description])df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains'])line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green'])line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute")line_chart.update_yaxes(range=[0, df_ts["changes"].max() *1.1])app.layout = html.Div([ html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}), html.Div(id='content', children=[ dcc.Graph(figure=fig), dcc.Graph(figure=line_chart), ])])
app.py
Go back to the web browser and you should see something like this:
Auto Refresh
At the moment we need to refresh our web browser to update the metrics and line chart, but it would be much better if that happened automatically. Let's now add auto refresh functionality.
This will require some restructuring of our application so that each component is rendered from a function annotated with a callback that causes the function to be called on an interval.
@app.callback(Output(component_id='indicators', component_property='figure'),Input('interval-component', 'n_intervals'))defindicators(n): query =""" select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min, count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min, distinctcount(user) FILTER(WHERE ts > ago('PT1M')) AS users1Min, distinctcount(user) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min, distinctcount(domain) FILTER(WHERE ts > ago('PT1M')) AS domains1Min, distinctcount(domain) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min from wikievents where ts > ago('PT2M') limit 1 """ curs = connection.cursor() curs.execute(query) df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) curs.close() fig = go.Figure(layout=go.Layout(height=300))if df_summary["events1Min"][0] >0:if df_summary["events1Min"][0] >0:add_delta_trace(fig, "Changes", df_summary["events1Min"][0], df_summary["events1Min2Min"][0], 0, 0)add_delta_trace(fig, "Users", df_summary["users1Min"][0], df_summary["users1Min2Min"][0], 0, 1)add_delta_trace(fig, "Domain", df_summary["domains1Min"][0], df_summary["domains1Min2Min"][0], 0, 2)else:add_trace(fig, "Changes", df_summary["events1Min"][0], 0, 0)add_trace(fig, "Users", df_summary["users1Min2Min"][0], 0, 1)add_trace(fig, "Domains", df_summary["domains1Min2Min"][0], 0, 2) fig.update_layout(grid = {"rows": 1, "columns": 3, 'pattern': "independent"},)else: fig.update_layout(annotations = [{"text": "No events found", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 28}}])return fig
app.py
And finally, the following function refreshes the line chart:
@app.callback(Output(component_id='time-series', component_property='figure'),Input('interval-component', 'n_intervals'))deftime_series(n): query =""" select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, distinctcount(user) AS users, distinctcount(domain) AS domains from wikievents where ts > ago('PT1H') group by dateMin order by dateMin desc LIMIT 30 """ curs = connection.cursor() curs.execute(query) df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) curs.close() df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains']) line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green']) line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute") line_chart.update_yaxes(range=[0, df_ts["changes"].max() *1.1])return line_chart
app.py
If we navigate back to our web browser, we'll see the following:
The full script used in this example is shown below:
import pandas as pdfrom dash import Dash, html, dash_table, dcc, Input, Outputimport plotly.graph_objects as gofrom pinotdb import connectfrom dash_utils import add_delta_trace, add_traceimport plotly.express as pximport datetimeexternal_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']app =Dash(__name__, external_stylesheets=external_stylesheets)app.title ="Wiki Recent Changes Dashboard"connection =connect(host="localhost", port="8099", path="/query/sql", scheme=( "http"))@app.callback(Output(component_id='indicators', component_property='figure'),Input('interval-component', 'n_intervals'))defindicators(n): query =""" select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min, count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min, distinctcount(user) FILTER(WHERE ts > ago('PT1M')) AS users1Min, distinctcount(user) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS users1Min2Min, distinctcount(domain) FILTER(WHERE ts > ago('PT1M')) AS domains1Min, distinctcount(domain) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS domains1Min2Min from wikievents where ts > ago('PT2M') limit 1 """ curs = connection.cursor() curs.execute(query) df_summary = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) curs.close() fig = go.Figure(layout=go.Layout(height=300))if df_summary["events1Min"][0] >0:if df_summary["events1Min"][0] >0:add_delta_trace(fig, "Changes", df_summary["events1Min"][0], df_summary["events1Min2Min"][0], 0, 0)add_delta_trace(fig, "Users", df_summary["users1Min"][0], df_summary["users1Min2Min"][0], 0, 1)add_delta_trace(fig, "Domain", df_summary["domains1Min"][0], df_summary["domains1Min2Min"][0], 0, 2)else:add_trace(fig, "Changes", df_summary["events1Min"][0], 0, 0)add_trace(fig, "Users", df_summary["users1Min2Min"][0], 0, 1)add_trace(fig, "Domains", df_summary["domains1Min2Min"][0], 0, 2) fig.update_layout(grid = {"rows": 1, "columns": 3, 'pattern': "independent"},)else: fig.update_layout(annotations = [{"text": "No events found", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 28}}])return fig@app.callback(Output(component_id='time-series', component_property='figure'),Input('interval-component', 'n_intervals'))deftime_series(n): query =""" select ToDateTime(DATETRUNC('minute', ts), 'yyyy-MM-dd hh:mm:ss') AS dateMin, count(*) AS changes, distinctcount(user) AS users, distinctcount(domain) AS domains from wikievents where ts > ago('PT1H') group by dateMin order by dateMin desc LIMIT 30 """ curs = connection.cursor() curs.execute(query) df_ts = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) curs.close() df_ts_melt = pd.melt(df_ts, id_vars=['dateMin'], value_vars=['changes', 'users', 'domains']) line_chart = px.line(df_ts_melt, x='dateMin', y="value", color='variable', color_discrete_sequence =['blue', 'red', 'green']) line_chart['layout'].update(margin=dict(l=0,r=0,b=0,t=40), title="Changes/Users/Domains per minute") line_chart.update_yaxes(range=[0, df_ts["changes"].max() *1.1])return line_chart@app.callback(Output(component_id='latest-timestamp', component_property='children'),Input('interval-component', 'n_intervals'))deftimestamp(n):return html.Span(f"Last updated: {datetime.datetime.now()}")app.layout = html.Div([ html.H1("Wiki Recent Changes Dashboard", style={'text-align': 'center'}), html.Div(id='latest-timestamp', style={"padding": "5px 0", "text-align": "center"}), dcc.Interval( id='interval-component', interval=1*1000, n_intervals=0 ), html.Div(id='content', children=[ dcc.Graph(id="indicators"), dcc.Graph(id="time-series"), ])])if__name__=='__main__': app.run_server(debug=True)
dashboard.py
Summary
In this guide we've learnt how to publish data into Kafka from Wikimedia's event stream, ingest it from there into Pinot, and finally make sense of the data using SQL queries run from Dash.