LogoLogo
release-0.10.0
release-0.10.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • Batch Ingestion
        • Spark
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Filtering with IdSet
      • Transformation Functions
      • Aggregation Functions
      • User-Defined Functions (UDFs)
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Explain Plan
      • Grouping Algorithm
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Transformations
      • Null Value Support
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Tiered Storage
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
      • Upgrading Pinot with confidence
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Functions
      • ABS
      • ADD
      • arrayConcatInt
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctString
      • arrayDistinctInt
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • hour
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • VALUEIN
      • week
      • year
      • yearOfWeek
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page
  • Prerequisite
  • Create an Amazon MSK Cluster
  • Connect to MSK
  • Config SecurityGroup
  • Create Kafka topic
  • Write sample data into Kafka
  • Create a Pinot table

Was this helpful?

Export as PDF
  1. For Operators
  2. Tutorials

Amazon MSK (Kafka)

How to Connect Pinot with Amazon Managed Streaming for Apache Kafka (Amazon MSK)

PreviousAmazon EKS (Kafka)NextMonitor Pinot using Prometheus and Grafana

Last updated 3 years ago

Was this helpful?

This wiki documents how to connect Pinot deployed in to .

Prerequisite

Please follow this to run Pinot on Amazon EKS.

Create an Amazon MSK Cluster

Please go to to create a Kafka Cluster.

Note:

  1. For demo simplicity, this MSK cluster reuses same VPC created by EKS cluster in the previous step. Otherwise a is required to ensure two VPCs could talk to each other.

  2. Under Encryption section, chooseBoth TLS encrypted and plaintext traffic allowed

Below is a sample screenshot to create an Amazon MSK cluster.

                                                       ![](../../.gitbook/assets/snapshot-msk.png)

After click on Create button, you can take a coffee break and come back.

Once the cluster is created, you can view it and click View client information to see the Zookeeper and Kafka Broker list.

Sample Client Information

Connect to MSK

Config SecurityGroup

In order to connect MSK to EKS, we need to allow the traffic could go through each other.

This is configured through Amazon VPC Page.

  1. Record the Amazon MSK SecurityGroup from the Cluster page, in the above demo, it's sg-01e7ab1320a77f1a9.

Please ensure you are picking ClusterShardNodeSecurityGroup

  1. In SecurityGroup, click on MSK SecurityGroup (sg-01e7ab1320a77f1a9), then Click on Edit Rules , then add above ClusterSharedNodeSecurityGroup (sg-0402b59d7e440f8d1) to it.

  1. Click EKS Security Group ClusterSharedNodeSecurityGroup (sg-0402b59d7e440f8d1), add In bound Rule for MSK Security Group (sg-01e7ab1320a77f1a9).

Now, EKS cluster should be able to talk to Amazon MSK.

Create Kafka topic

To run below commands, please ensure you set two environment variable with ZOOKEEPER_CONNECT_STRING and BROKER_LIST_STRING (Use plaintext) from Amazon MSK client information, and replace the Variables accordingly.

E.g.

ZOOKEEPER_CONNECT_STRING="z-3.pinot-quickstart-msk-d.ky727f.c3.kafka.us-west-2.amazonaws.com:2181,z-1.pinot-quickstart-msk-d.ky727f.c3.kafka.us-west-2.amazonaws.com:2181,z-2.pinot-quickstart-msk-d.ky727f.c3.kafka.us-west-2.amazonaws.com:2181"
BROKER_LIST_STRING="b-1.pinot-quickstart-msk-d.ky727f.c3.kafka.us-west-2.amazonaws.com:9092,b-2.pinot-quickstart-msk-d.ky727f.c3.kafka.us-west-2.amazonaws.com:9092"

You can log into one EKS node or container and run below command to create a topic.

E.g. Enter into Pinot controller container:

kubectl exec -it pod/pinot-controller-0  -n pinot-quickstart bash

Then install wget then download Kafka binary.

apt-get update
apt-get install wget -y
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz
cd kafka_2.12-2.2.1

Create a Kafka topic:

bin/kafka-topics.sh \
  --zookeeper ${ZOOKEEPER_CONNECT_STRING} \
  --create \
  --topic pullRequestMergedEventsAwsMskDemo \
  --replication-factor 1 \
  --partitions 1

Topic creation succeeds with below message:

Created topic "pullRequestMergedEventsAwsMskDemo".

Write sample data into Kafka

Once topic is created, we can start a simple application to produce to it.

You can download below yaml file, then replace:

  • ${ZOOKEEPER_CONNECT_STRING} -> MSK Zookeeper String

  • ${BROKER_LIST_STRING} -> MSK Plaintext Broker String in the deployment

And apply the YAML file by.

kubectl apply -f github-events-aws-msk-demo.yaml

Once the pod is up, you can verify by running a console consumer to read from it.

Try to run from the Pinot Controller container entered in above step.

bin/kafka-console-consumer.sh \
  --bootstrap-server ${BROKER_LIST_STRING} \
  --topic pullRequestMergedEventsAwsMskDemo

Create a Pinot table

This step is relatively easy.

Since we already put table creation request into the ConfigMap, we can just enter into pinot-github-events-data-into-msk-kafka pod to execute the command.

  • Check if the pod is running:

kubectl get pod -n pinot-quickstart  |grep pinot-github-events-data-into-msk-kafka

Sample output:

pinot-github-events-data-into-msk-kafka-68948fb4cd-rrzlf   1/1     Running     0          14m
  • Enter into the pod

podname=`kubectl get pod -n pinot-quickstart  |grep pinot-github-events-data-into-msk-kafka|awk '{print $1}'`
kubectl exec -it ${podname} -n pinot-quickstart bash
  • Create Table

bin/pinot-admin.sh AddTable \
  -controllerHost pinot-controller \
  -tableConfigFile /var/pinot/examples/pullRequestMergedEventsAwsMskDemo_realtime_table_config.json \
  -schemaFile /var/pinot/examples/pullRequestMergedEventsAwsMskDemo_schema.json \
  -exec

Sample output:

Executing command: AddTable -tableConfigFile /var/pinot/examples/pullRequestMergedEventsAwsMskDemo_realtime_table_config.json -schemaFile /var/pinot/examples/pullRequestMergedEventsAwsMskDemo_schema.json -controllerHost pinot-controller -controllerPort 9000 -exec
Sending request: http://pinot-controller:9000/schemas to controller: pinot-controller-0.pinot-controller-headless.pinot-quickstart.svc.cluster.local, version: Unknown
{"status":"Table pullRequestMergedEventsAwsMskDemo_REALTIME succesfully added"}
  • Then you can open Pinot Query Console to browse the data

Until now, the MSK cluster is still not accessible, you can follow this to create an EC2 instance to connect to it for topic creation, run console producer and consumer.

Open , click on SecurityGroups on left bar. Find the EKS Security group: eksctl-${PINOT_EKS_CLUSTER}-cluster/ClusterSharedNodeSecurityGroup.

${GITHUB_PERSONAL_ACCESS_TOKEN} -> A personal Github Personal Access Token generated from , please grant all read permissions to it. Here is the to generate Github Events.

Wiki
Amazon VPC Page
here
source code
Amazon EKS
Amazon Managed Kafka
AWS Quickstart Wiki
MSK Landing Page
VPC Peering
7KB
github-events-aws-msk-demo.yaml
github-events-aws-msk-demo.yaml
Amazon MSK Clusters View
MSK Cluster View
Amazon EKS ClusterSharedNodeSecurityGroup
Add SecurityGroup to Amazon MSK
Add SecurityGroup to Amazon EKS