This guide shows you how to ingest a stream of records from an Amazon Kinesis topic into a Pinot table.
To ingest events from an Amazon Kinesis stream into Pinot, set the following configs into your table config:
where the Kinesis specific properties are:
Kinesis supports authentication using the DefaultCredentialsProviderChain. The credential provider looks for the credentials in the following order:
Environment Variables - AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY
and AWS_SECRET_KEY
(only recognized by Java SDK)
Java System Properties - aws.accessKeyId
and aws.secretKey
Web Identity Token credentials from the environment or container
Credential profiles file at the default location (~/.aws/credentials)
shared by all AWS SDKs and the AWS CLI
Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable,
Instance profile credentials delivered through the Amazon EC2 metadata service
You must provide all read
access level
permissions for Pinot to work with an AWS Kinesis data stream. See the AWS documentation for details.
Although you can also specify the accessKey
and secretKey
in the properties above, we don't recommend this insecure method. We recommend using it only for non-production proof-of-concept (POC) setups. You can also specify other AWS fields such as AWS_SESSION_TOKEN as environment variables and config and it will work.
ShardID
is of the format "shardId-000000000001". We use the numeric part as partitionId
. Our partitionId
variable is integer. If shardIds grow beyond Integer.MAX\_VALUE
, we will overflow into the partitionId space.
Segment size based thresholds for segment completion will not work. It assumes that partition "0" always exists. However, once the shard 0 is split/merged, we will no longer have partition 0.
Property | Description |
---|---|
streamType
This should be set to "kinesis"
stream.kinesis.topic.name
Kinesis stream name
region
Kinesis region e.g. us-west-1
accessKey
Kinesis access key
secretKey
Kinesis secret key
shardIteratorType
Set to LATEST to consume only new records, TRIM_HORIZON for earliest sequence number_,_ AT___SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start consumptions from a particular sequence number
maxRecordsToFetch
... Default is 20.