Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Decouple the controller from the data path for real-time Pinot tables.
Schedule queries to prioritize them.
# this is the default, 10 minutes.
pinot.server.startup.timeoutMs=600000
# this is the default. you do not have to specify this.
pinot.server.startup.enableServiceStatusCheck=true# this is the default, 10 minutes.
pinot.server.startup.timeoutMs=600000
# this is the default. you do not have to specify this.
pinot.server.startup.enableServiceStatusCheck=true
# the default is 0, and the server will not wait
pinot.server.starter.realtimeConsumptionCatchupWaitMs=60000# this is the default, 10 minutes.
pinot.server.startup.timeoutMs=600000
# this is the default. you do not have to specify this.
pinot.server.startup.enableServiceStatusCheck=true
# the default is 0, and the server will not wait
pinot.server.starter.realtimeConsumptionCatchupWaitMs=60000
# this is disabled by default.
pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker=true# this is the default, 10 minutes.
pinot.server.startup.timeoutMs=600000
# this is the default. you do not have to specify this.
pinot.server.startup.enableServiceStatusCheck=true
# the default is 0, and the server will not wait
pinot.server.starter.realtimeConsumptionCatchupWaitMs=60000
# this is disabled by default.
pinot.server.starter.enableRealtimeFreshnessBasedConsumptionStatusChecker=true
# this is the default. The server wants events to be no more than 10
# seconds old.
pinot.server.starter.realtimeMinFreshnessMs=10000
# this is the default. the server will keep waiting for segments to catch up
# even if they are not making progress.
pinot.server.starter.realtimeFreshnessIdleTimeoutMs=0
# the server will still start and serve queries if it not caught up
pinot.server.starter.exitServerOnStartupStatusFailure=falsepinot.server.startup.enableServiceStatusCheck=true
pinot.server.starter.enableRealtimeFreshnessBasedConsumptionStatusChecker=true
# these should be set to your environment based on how long
# catching up typically takes.
pinot.server.startup.timeoutMs=<your_timeout_ms>
pinot.server.starter.realtimeConsumptionCatchupWaitMs=<your_timeout_ms>
pinot.server.starter.realtimeMinFreshnessMs=<your_desired_freshness>
pinot.server.starter.realtimeFreshnessIdleTimeoutMs=1000
pinot.server.startup.exitOnServiceStatusCheckFailure=falsepinot.server.startup.enableServiceStatusCheck=true
pinot.server.starter.enableRealtimeFreshnessBasedConsumptionStatusChecker=true
# these should be set to your environment based on how long
# catching up typically takes.
pinot.server.startup.timeoutMs=<your_timeout_ms>
pinot.server.starter.realtimeConsumptionCatchupWaitMs=<your_timeout_ms>
pinot.server.starter.realtimeMinFreshnessMs=<your_desired_freshness>
pinot.server.starter.realtimeFreshnessIdleTimeoutMs=0
pinot.server.startup.exitOnServiceStatusCheckFailure=truerealtime.segment.serverUploadToDeepStore = truecontroller.allow.hlc.tables=false
controller.enable.split.commit=truepinot.server.instance.segment.store.uri=<URI of segment store>
pinot.server.instance.enable.split.commit=true
pinot.server.storage.factory.class.(scheme)=<the corresponding Pinot FS impl> "segmentsConfig": {
...
"peerSegmentDownloadScheme": "http"
}controller.realtime.segment.deepStoreUploadRetryEnabled=truepinot.server.instance.segment.directory.loader=tierBased
pinot.server.instance.tierConfigs.tierNames=hotTier,coldTier
pinot.server.instance.tierConfigs.hotTier.dataDir=/tmp/multidir_test/hotTier
pinot.server.instance.tierConfigs.coldTier.dataDir=/tmp/multidir_test/coldTiercontroller.segmentRelocator.enableLocalTierMigration=true
// by the way,
// controller.segment.relocator.frequencyPeriod=3600s, by default
// controller.segmentRelocator.initialDelayInSeconds=random [120, 300), by defaultALL_JAVA_OPTS="-javaagent:jmx_prometheus_javaagent-0.12.0.jar=8080:pinot.yml -Xms4G -Xmx4G -XX:MaxDirectMemorySize=30g -Dlog4j2.configurationFile=conf/pinot-admin-log4j2.xml -Dplugins.dir=$BASEDIR/plugins"
bin/pinot-admin.sh ....{
"tableName": "myTable",
"tableType": ...,
"tenants": {
"server": "base_OFFLINE",
"broker": "base_BROKER"
},
"tierConfigs": [{
"name": "hotTier",
"segmentSelectorType": "time",
"segmentAge": "7d",
"storageType": "pinot_server",
"serverTag": "base_OFFLINE"
}, {
"name": "coldTier",
"segmentSelectorType": "time",
"segmentAge": "15d",
"storageType": "pinot_server",
"serverTag": "base_OFFLINE",
"tierBackendProperties": { // overwriting is not recommended, but can be done as below
"dataDir": "/tmp/multidir_test/my_custom_colddir" // assume path exists on servers.
}
}]
} {
"tableName": "myTable_OFFLINE",
"tenants" : {
"broker":"DefaultTenant",
"server":"DefaultTenant"
}
}Failed to drop instance Broker_10.1.10.51_8000 -
Instance Broker_10.1.10.51_8000 is still liveFailed to drop instance Broker_172.17.0.2_8099 -
Instance Broker_172.17.0.2_8099 exists in ideal state for brokerResourceGET /loggers$ curl -X GET -H "accept: application/json" localhost:8000/loggers
["root","org.reflections","org.apache.pinot.tools.admin"]GET /loggers/{loggerName}> curl -X GET -H "accept: application/json" localhost:8000/loggers/root
{"filter":null,"level":"INFO","name":"root"}PUT /loggers/{loggerName}?level={level}$ curl -X PUT -H "accept: application/json" localhost:8000/loggers/root?level=ERROR
{"filter":null,"level":"ERROR","name":"root"}GET /loggers/filesGET /loggers/download?filePath={filePath}GET /loggers/instancesGET /loggers/instances/{instanceName}GET /loggers/instances/{instanceName}/download?filePath={filePath}{
"tableName": "myTable",
"tableType": ...,
"tenants": {
"server": "base_OFFLINE",
"broker": "base_BROKER"
},
"tierConfigs": [{
"name": "ssdGroup",
"segmentSelectorType": "time",
"segmentAge": "7d",
"storageType": "pinot_server",
"serverTag": "ssd_OFFLINE"
}, {
"name": "hddGroup",
"segmentSelectorType": "time",
"segmentAge": "15d",
"storageType": "pinot_server",
"serverTag": "hdd_OFFLINE"
}]
}memberIdmemberIdmemberIdmemberIdPinot's built in heap usage monitoring and OOM protection
// Table config
{
...
"routing": {
"segmentPrunerTypes": ["time"]
},
...
}// Table config
{
...
"tableIndexConfig": {
...
"segmentPartitionConfig": {
"columnPartitionMap": {
"memberId": {
"functionName": "Modulo",
"numPartitions": 3
}
}
},
...
},
...
"routing": {
"segmentPrunerTypes": ["partition"]
},
...
}private val NUM_PARTITIONS = 8
def getPartitionUdf: UserDefinedFunction = {
udf((valueIn: Any) => {
(murmur2(valueIn.toString.getBytes(UTF_8)) & Integer.MAX_VALUE) % NUM_PARTITIONS
})
}column.memberId.partitionFunction = Module
column.memberId.numPartitions = 3
column.memberId.partitionValues = 1// Table config
{
...
"instanceAssignmentConfigMap": {
"OFFLINE": {
...
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 3,
"numInstancesPerReplicaGroup": 4
}
}
},
...
"routing": {
"instanceSelectorType": "replicaGroup"
},
...
}SET "useFixedReplica"=true;// Table config
{
...
"routing": {
"useFixedReplica": true
},
...
}pinot.broker.use.fixed.replica=true{
"id": "<table_name>",
"simpleFields": {},
"mapFields": {},
"listFields": {
"<segment_lineage_entry_id>": [
"<segmentsFrom_list>",
"<segmentsTo_list>",
"<state>",
"<timestamp>"
]
}
}"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "REFRESH",
"segmentIngestionFrequency": "DAILY", // or HOURLY
"consistentDataPush": true
}
}


This page introduces all the instance assignment strategies, when to use them, and how to configure them.
This page describes the Pinot cross-release compatibility test suite.



"tableName": "myTable_REALTIME",
"tableType": "REALTIME",
...
...
"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
}
}
}"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bucketTimePeriod": "6h",
"bufferTimePeriod": "5d",
"roundBucketTimePeriod": "1h",
"mergeType": "rollup",
"score.aggregationType": "max",
"maxNumRecordsPerSegment": "100000"
}
}
}

"tableName": "myTable_OFFLINE",
"tableType": "OFFLINE",
...
...
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1day.mergeType": "concat",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "1d"
}
}
}controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=1h #Specify the frequency (more frequent is better, as extra tasks aren't scheduled unless required)."task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1hour.mergeType": "rollup",
"1hour.bucketTimePeriod": "1h",
"1hour.bufferTimePeriod": "3h",
"1hour.maxNumRecordsPerSegment": "1000000",
"1hour.maxNumRecordsPerTask": "5000000",
"1hour.maxNumParallelBuckets": "5",
"1day.mergeType": "rollup",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "1d",
"1day.roundBucketTimePeriod": "1d",
"1day.maxNumRecordsPerSegment": "1000000",
"1day.maxNumRecordsPerTask": "5000000",
"metricColA.aggregationType": "sum",
"metricColB.aggregationType": "max"
}
}
}$ # This is the tool to check out and build the versions to test
$ checkoutAndBuild.sh -h
Usage: checkoutAndBuild.sh [-o olderCommit] [-n newerCommit] -w workingDir
-w, --working-dir Working directory where olderCommit and newCommit target files reside
-o, --old-commit-hash git hash (or tag) for old commit
-n, --new-commit-hash git hash (or tag) for new commit
If -n is not specified, then current commit is assumed
If -o is not specified, then previous commit is assumed (expected -n is also empty)
Examples:
To compare this checkout with previous commit: 'checkoutAndBuild.sh -w /tmp/wd'
To compare this checkout with some older tag or hash: 'checkoutAndBuild.sh -o release-0.7.1 -w /tmp/wd'
To compare any two previous tags or hashes: 'checkoutAndBuild.sh -o release-0.7.1 -n 637cc3494 -w /tmp/wd$ # Create the following file
$ cat /tmp/compat-settings.xml
<settings>
<mirrors>
<mirror>
<id>maven-default-http-blocker</id>
<mirrorOf>dummy</mirrorOf>
<name>Dummy mirror to override default blocking mirror that blocks http</name>
<url>http://0.0.0.0/</url>
<blocked>false</blocked>
</mirror>
</mirrors>
</settings>
$ export PINOT_MAVEN_OPTS="/tmp/compat-settings.xml"
$ # And now, run the checkoutAndBuid.sh
$ checkoutAndBuild.sh -o <oldVersion> -n <newVersion> -w <workingDir># This is the tool to run the compatibility test suite against
$ ./compCheck.sh -h
Usage: -w <workingDir> -t <testSuiteDir> [-k]
MANDATORY:
-w, --working-dir Working directory where olderCommit and newCommit target files reside.
-t, --test-suite-dir Test suite directory
OPTIONAL:
-k, --keep-cluster-on-failure Keep cluster on test failure
-h, --help Prints this helpgit clone https://github.com/apache/pinot.git
cd compatibility-verifier./checkoutAndBuild.sh -o $OLD_COMMIT -n $NEW_COMMIT -w /tmp/wd./compCheck.sh -w /tmp/wd -t $TEST_SUITE_DIR123456,__GENERATION_NUMBER__,"s1-0",s2-0,1,2,m1-0-0;m1-0-1,m2-0-0;m2-0-1,3;4,6;7,Java C++ Python,01a0bc,k1;k2;k3;k4;k5,1;1;2;2;2,"{""k1"":1,""k2"":1,""k3"":2,""k4"":2,""k5"":2}",10,11,12.1,13.1SELECT longDimSV1, intDimMV1, count(*) FROM FeatureTest2 WHERE generationNumber = __GENERATION_NUMBER__ AND (stringDimSV1 != 's1-6' AND longDimSV1 BETWEEN 10 AND 1000 OR (intDimMV1 < 42 AND stringDimMV2 IN ('m2-0-0', 'm2-2-0') AND intDimMV2 NOT IN (6,72))) GROUP BY longDimSV1, intDimMV1 ORDER BY longDimSV1, intDimMV1 LIMIT 20SELECT foo FROM T1 WHERE x = 7 GROUP BY bar LIMIT 5{"isSuperset":true, "resultTable":{"dataSchema":{"columnNames":["foo"],"columnDataTypes":["LONG"]},"rows":[[11],[41],[-9223372036854775808],[32],[42],[48]]},"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numSegmentsQueried":2,"numSegmentsProcessed":2,"numSegmentsMatched":2,"numConsumingSegmentsQueried":1,"numDocsScanned":13,"numEntriesScannedInFilter":120,"numEntriesScannedPostFilter":26,"numGroupsLimitReached":false,"totalDocs":66,"timeUsedMs":3,"offlineThreadCpuTimeNs":0,"realtimeThreadCpuTimeNs":352435,"segmentStatistics":[],"traceInfo":{},"minConsumingFreshnessTimeMs":1621918872017}
{
"listFields": {
"TAG_LIST": [
"Tag1_OFFLINE"
]
},
...
}{
"instanceAssignmentConfigMap": {
"OFFLINE": {
"tagPoolConfig": {
"tag": "Tag1_OFFLINE"
},
"replicaGroupPartitionConfig": {
}
}
},
...
}{
"instanceAssignmentConfigMap": {
"OFFLINE": {
"tagPoolConfig": {
"tag": "Tag1_OFFLINE"
},
"replicaGroupPartitionConfig": {
"numInstances": 2
}
}
},
...
}{
"instanceAssignmentConfigMap": {
"OFFLINE": {
"tagPoolConfig": {
"tag": "Tag1_OFFLINE"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numInstancesPerReplicaGroup": 3
}
}
},
...
}{
"instanceAssignmentConfigMap": {
"OFFLINE": {
"tagPoolConfig": {
"tag": "Tag1_OFFLINE"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numPartitions": 2,
"numInstancesPerPartition": 2,
"partitionColumn": "memberId"
}
}
},
...
}{
"listFields": {
"TAG_LIST": {
"Tag1_OFFLINE"
}
},
"mapFields": {
"pool": {
"Tag1_OFFLINE": 1
}
},
...
}{
"instanceAssignmentConfigMap":
"OFFLINE": {
"tagPoolConfig": {
"tag": "Tag1_OFFLINE",
"poolBased": true
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numPartitions": 2,
"numInstancesPerPartition": 2,
"partitionColumn": "memberId"
}
}
},
...
}{
"instanceAssignmentConfigMap": {
"OFFLINE": {
"partitionSelector": "FD_AWARE_INSTANCE_PARTITION_SELECTOR",
"tagPoolConfig": {
"tag": "Tag1_OFFLINE",
"poolBased": true
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numPartitions": 2,
"numInstancesPerPartition": 2
}
}
},
...
}Learn about tuning real-time tables.
{
"tableName": "myTable_OFFLINE",
"tenants" : {
"broker":"DefaultTenant",
"server":"DefaultTenant"
}
}curl -X PUT "http://localhost:9000/instances/Server_10.1.10.51_7000"
-H "accept: application/json"
-H "Content-Type: application/json"
-d "{ \"host\": \"10.1.10.51\", \"port\": \"7000\", \"type\": \"SERVER\", \"tags\": [ \"newName_OFFLINE\", \"DefaultTenant_REALTIME\" ]}"pinot.set.instance.id.to.hostname=true"instanceAssignmentConfigMap": {
...
"OFFLINE": {
...
"replicaGroupPartitionConfig": {
...
"minimizeDataMovement": true,
...
},
...
},
...
}"segmentsConfig": {
...
"minimizeDataMovement": true,
...
}curl -X GET "https://localhost:9000/rebalanceStatus/ffb38717-81cf-40a3-8f29-9f35892b01f9" -H "accept: application/json"{"tableRebalanceProgressStats": {
"startTimeMs": 1679073157779,
"status": "DONE", // IN_PROGRESS/DONE/FAILED
"timeToFinishInSeconds": 0, // Time it took for the rebalance job after it completes/fails
"completionStatusMsg": "Finished rebalancing table: airlineStats_OFFLINE with minAvailableReplicas: 1, enableStrictReplicaGroup: false, bestEfforts: false in 44 ms."
// The total amount of work required for rebalance
"initialToTargetStateConvergence": {
"_segmentsMissing": 0, // Number of segments missing in the current state but present in the target state
"_segmentsToRebalance": 31, // Number of segments that needs to be assigned to hosts so that the current state can get to the target state.
"_percentSegmentsToRebalance": 100, // Total number of replicas that needs to be assigned to hosts so that the current state can get to the target state.
"_replicasToRebalance": 279 // Remaining work to be done in %
},
// The pending work for rebalance
"externalViewToIdealStateConvergence": {
"_segmentsMissing": 0,
"_segmentsToRebalance": 0,
"_percentSegmentsToRebalance": 0,
"_replicasToRebalance": 0
},
// Additional work to catch up with the new ideal state, when the ideal
// state shifts since rebalance started.
"currentToTargetConvergence": {
"_segmentsMissing": 0,
"_segmentsToRebalance": 0,
"_percentSegmentsToRebalance": 0,
"_replicasToRebalance": 0
},
},
"timeElapsedSinceStartInSeconds": 28 // If rebalance is IN_PROGRESS, this gives the time elapsed since it started
}curl -X GET "https://localhost:9000/table/airlineStats_OFFLINE/jobstype=OFFLINE&jobTypes=TABLE_REBALANCE" -H "accept: application/json" "ffb38717-81cf-40a3-8f29-9f35892b01f9": {
"jobId": "ffb38717-81cf-40a3-8f29-9f35892b01f9",
"submissionTimeMs": "1679073157804",
"jobType": "TABLE_REBALANCE",
"REBALANCE_PROGRESS_STATS": "{\"initialToTargetStateConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":31,\"_percentSegmentsToRebalance\":100.0,\"_replicasToRebalance\":279},\"externalViewToIdealStateConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":0,\"_percentSegmentsToRebalance\":0.0,\"_replicasToRebalance\":0},\"currentToTargetConvergence\":{\"_segmentsMissing\":0,\"_segmentsToRebalance\":0,\"_percentSegmentsToRebalance\":0.0,\"_replicasToRebalance\":0},\"startTimeMs\":1679073157779,\"status\":\"DONE\",\"timeToFinishInSeconds\":0,\"completionStatusMsg\":\"Finished rebalancing table: airlineStats_OFFLINE with minAvailableReplicas: 1, enableStrictReplicaGroup: false, bestEfforts: false in 44 ms.\"}",
"tableName": "airlineStats_OFFLINE"








"instanceAssignmentConfigMap": {
"CONSUMING": {
"tagPoolConfig": {
"tag": "DefaultTenant_REALTIME"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numInstancesPerReplicaGroup": 2
}
},
"COMPLETED": {
"tagPoolConfig": {
"tag": "DefaultTenant_OFFLINE"
},
"replicaGroupPartitionConfig": {
"replicaGroupBased": true,
"numReplicaGroups": 2,
"numInstancesPerReplicaGroup": 4
}
}
}
...============================================================
RealtimeProvisioningHelperCommand -tableConfigFile /Users/ssubrama/tmp/samza/realtimeTableConfig.json -numPartitions 16 -pushFrequency null -numHosts 8,6,10 -numHours 6,12,18,24 -sampleCompletedSegmentDir /Users/ssubrama/tmp/samza/TestSamzaAnalyticsFeatures_1593411480000_1593500340000_0/ -ingestionRate 100 -maxUsableHostMemory 10G -retentionHours 72
Note:
* Table retention and push frequency ignored for determining retentionHours
* See https://docs.pinot.apache.org/operators/operating-pinot/tuning/realtime
Memory used per host (Active/Mapped)
numHosts --> 6 |8 |10 |
numHours
6 --------> 5.05G/19.49G |3.37G/12.99G |3.37G/12.99G |
12 --------> 5.89G/20.33G |3.93G/13.55G |3.93G/13.55G |
18 --------> 6.73G/21.49G |4.48G/14.33G |4.48G/14.33G |
24 --------> 7.56G/22G |5.04G/14.66G |5.04G/14.66G |
Optimal segment size
numHosts --> 6 |8 |10 |
numHours
6 --------> 111.98M |111.98M |111.98M |
12 --------> 223.96M |223.96M |223.96M |
18 --------> 335.94M |335.94M |335.94M |
24 --------> 447.92M |447.92M |447.92M |
Consuming memory
numHosts --> 6 |8 |10 |
numHours
6 --------> 1.45G |987.17M |987.17M |
12 --------> 2.61G |1.74G |1.74G |
18 --------> 3.77G |2.52G |2.52G |
24 --------> 4.94G |3.29G |3.29G |
Number of segments queried per host
numHosts --> 6 |8 |10 |
numHours
6 --------> 12 |12 |12 |
12 --------> 6 |6 |6 |
18 --------> 4 |4 |4 |
24 --------> 3 |3 |3 |"realtime.segment.flush.threshold.rows": "0"
"realtime.segment.flush.threshold.time": "6h"
"realtime.segment.flush.threshold.segment.size": "112M""realtime.segment.flush.threshold.rows": "0"
"realtime.segment.flush.threshold.time": "24h"
"realtime.segment.flush.threshold.segment.size": "450M"
