LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • SchemaConformingTransformer
  • Table Configurations
  • SchemaConformingTransformer Configuration
  • Configuration of reserved fields
  • Power the text search
  • Schema Design
  • Text Search

Was this helpful?

Edit on GitHub
Export as PDF
  1. Manage Data
  2. Import Data

Ingest records with dynamic schemas

Storing records with dynamic schemas in a table with a fixed schema.

PreviousComplex Type Examples (Unnest)NextAggregation Functions

Last updated 4 months ago

Was this helpful?

Some domains (e.g., logging) generate records where each record can have a different set of keys, whereas Pinot tables have a relatively static schema. For records with varying keys, it's impractical to store each field in its own table column. However, most (if not all) fields may be important, so fields should not be dropped unnecessarily.

Additionally, searching patterns on such table could also be complex and change frequently. Exact match, range query, prefix/suffix match, wildcard search and aggregation functions could be used on any old or newly created keys or values.

SchemaConformingTransformer

The is a that can transform records with dynamic schemas such that they can be ingested in a table with a static schema. The transformer takes record fields that don't exist in the schema and stores them in a type of catchall field. Moreover, it builds a __mergedTextIndex field and takes advantage of Lucene to fulfill text search.

For example, consider this record:

{
  "arrayField":[0, 1, 2, 3],
  "stringField":"a",
  "intField_noIndex":9,
  "string_noIndex":"z",
  "message": "a",
  "mapField":{
    "arrayField":[0, 1, 2, 3],
    "stringField":"a",
    "intField_noIndex":9,
    "string_noIndex":"z"
  },
  "mapField_noIndex":{
    "arrayField":[0, 1, 2, 3],
    "stringField":"a",
  },
  "nestedFields":{
    "arrayField":[0, 1, 2, 3],
    "stringField":"a",
    "intField_noIndex":9,
    "string_noIndex":"z",
    "mapField":{
      "arrayField":[0, 1, 2, 3],
      "stringField":"a",
      "intField_noIndex":9,
      "string_noIndex":"z"
    }
  }
}

Let's say the table's schema contains the following fields:

  • arrayField

  • mapField

  • nestedFields

  • nestedFields.stringField

  • json_data

  • json_data_no_idx

  • __mergedTextIndex

Without this transformer, stringField field and fields ends with _noIdx would be dropped. mapField and nestedFields fields' storage needs to rely on the global setup in complexTransformers without granular customizations. However, with this transformer, the record would be transformed into the following:

{
  "arrayField":[0, 1, 2, 3],
  "nestedFields.stringField":"a",
  "json_data":{
    "stringField":"a",
    "mapField":{
      "arrayField":[0, 1, 2, 3],
      "stringField":"a",
      "stringField":"aA_123"
    },
    "nestedFields":{
      "arrayField":[0, 1, 2, 3],
      "mapField":{
        "arrayField":[0, 1, 2, 3],
        "stringField":"a"
      }
    }
  },
  "json_data_no_idx":{
    "intField_noIndex":9,
    "string_noIndex":"z",
    "mapField":{
      "intField_noIndex":9,
      "string_noIndex":"z"
    },
    "mapField_noIndex":{
      "arrayField":[0, 1, 2, 3],
      "stringField":"a",
    },
    "nestedFields":{
      "intField_noIndex":9,
      "string_noIndex":"z",
      "mapField":{
        "intField_noIndex":9,
        "string_noIndex":"z"
      }
    }
  },
  "__mergedTextIndex": [
    // To be explained in following sections
  ]
}

Notice that there are 3 reserved (and configurable) fields json_data, json_data_no_idx and __mergedTextIndex. And the transformer does the following:

  • Flattens nested fields all the way to the leaf node and:

    • Conducts special treatments if necessary according to the config

    • If the key path matches the schema, put the data into the dedicated field

    • Otherwise, put them into json_data or json_data_no_idx depending on its key suffix

  • For keys in dedicated columns or json_data, puts them into __mergedTextIndex in the form of "Begin Anchor + value + Separator + key + End Anchor" to power the text matches.

  • Additional functionalities by configurations

    • Drop fields fieldPathsToDrop

    • Preserve the subtree without flattening fieldPathsToPreserveInput and fieldPathsToPreserveInputWithIndex

    • Skip storaging the fields but still indexing it (message in the example) fieldPathsToSkipStorage

    • Skip indexing the fields unindexableFieldSuffix

    • Optimize case insensitive search optimizeCaseInsensitiveSearch

    • Map input key path to a schema name with customizations columnNameToJsonKeyPathMap

    • Support anonymous dot, {'a.b': 'c'} vs {'a': {'b': 'c}} useAnonymousDotInFieldNames

    • Truncate value by length mergedTextIndexDocumentMaxLength

    • Double ingestion to support schema evolution fieldsToDoubleIngest

Table Configurations

SchemaConformingTransformer Configuration

To use the transformer, add the schemaConformingTransformerConfig option in the ingestionConfig section of your table configuration, as shown in the following example.

For example:

"schemaConformingTransformerConfig": {
  "enableIndexableExtras": true,
  "indexableExtrasField": "json_data",
  "enableUnindexableExtras": true,
  "unindexableExtrasField": "json_data_no_idx",
  "unindexableFieldSuffix": "_noindex",
  "fieldPathsToDrop": [],
  "fieldPathsToSkipStorage": [
    "message"
  ],
  "columnNameToJsonKeyPathMap": {},
  "mergedTextIndexField": "__mergedTextIndex",
  "useAnonymousDotInFieldNames": true,
  "optimizeCaseInsensitiveSearch": false,
  "reverseTextIndexKeyValueOrder": true,
  "mergedTextIndexDocumentMaxLength": 32766,
  "mergedTextIndexBinaryDocumentDetectionMinLength": 512,
  "mergedTextIndexPathToExclude": [
    "_timestampMillisNegative",
    "__mergedTextIndex",
    "_timestampMillis"
  ],
  "fieldsToDoubleIngest": [],
  "jsonKeyValueSeparator": "\u001e",
  "mergedTextIndexBeginOfDocAnchor": "\u0002",
  "mergedTextIndexEndOfDocAnchor": "\u0003",
  "fieldPathsToPreserveInput": [],
  "fieldPathsToPreserveInputWithIndex": []
}

Configuration of reserved fields

Other index config of 3 reserved columns could be set like:

"fieldConfigList": [
  {
    "name": "json_data",
    "encodingType": "RAW",
    "indexTypes": [],
    "compressionCodec": "LZ4",
    "indexes": null,
    "properties": {
      "rawIndexWriterVersion": "4"
    },
    "tierOverwrites": null
  },
  {
    "name": "json_data_no_idx",
    "encodingType": "RAW",
    "indexTypes": [],
    "compressionCodec": "ZSTANDARD",
    "indexes": null,
    "properties": {
      "rawIndexWriterVersion": "4"
    },
    "tierOverwrites": null
  },
  {
    "name": "__mergedTextIndex",
    "encodingType": "RAW",
    "indexType": "TEXT",
    "indexTypes": [
      "TEXT"
    ],
    "compressionCodec": "LZ4",
    "indexes": null,
    "properties": {
      "enableQueryCacheForTextIndex": "false",
      "luceneAnalyzerClass": <analyzerClass>,
      "luceneAnalyzerClassArgTypes": <>,
      "luceneAnalyzerClassArgs": <>,
      "luceneMaxBufferSizeMB": "50",
      "luceneQueryParserClass": <parserClass>,
      "luceneUseCompoundFile": "true",
      "noRawDataForTextIndex": "true",
      "rawIndexWriterVersion": "4"
    },
    "tierOverwrites": null
  }
]

"jsonIndexConfigs": {
  "json_data": {
    "disabled": false,
    "maxLevels": 3,
    "excludeArray": true,
    "disableCrossArrayUnnest": true,
    "maxValueLength": 1000,
    "skipInvalidJson": true
  }
}

Power the text search

Schema Design

With the help of SchemaConformingTransformer, all data could be kept even without specifying special dedicated columns in table schema. However, to optimize the storage and various query patterns, dedicated columns should be created based on the usage:

  • Fields with frequent exact match query, e.g. region, log_level, runtime_env

  • Fields with range query, e.g. timestamp

  • High frequency fields from messages

    • Reduce json index size

    • Optimize group by queries

Text Search

After putting each key/value pairs into the __mergedTextIndex field, there will neeed to be luceneAnalyzerClass to tokenize the document and luceneQueryParserClass to query by tokens. Some example common searching patterns and their queries are:

  • Exact key/value match TEXT_MATCH(__mergedTextIndex, '"valuer:key"')

  • Wildcard value search in a key TEXT_MATCH(__mergedTextIndex, '/.* value .*:key/')

  • Key exists check TEXT_MATCH(__mergedTextIndex, '/.*:key/')

  • Global value exact match TEXT_MATCH(__mergedTextIndex, '/"value"/')

  • Global value wildcard match TEXT_MATCH(__mergedTextIndex, '/.* value .*/')

The luceneAnalyzerClass and luceneQueryParserClass usually need to have similar delimiter set. It also needs to consider the values below.

"jsonKeyValueSeparator": "\u001e",
"mergedTextIndexBeginOfDocAnchor": "\u0002",
"mergedTextIndexEndOfDocAnchor": "\u0003",

With given example, each key/value pair would be stored as "\u0002value\u001ekey\u0003". The prefix and suffix match on key or value need to be adjusted accordingly in the luceneQueryParserClass.

Available configuration options are listed in .

Specifically, customizable json index could be set according to .

SchemaConformingTransformer
RecordTransformer
SchemaConformingTransformerConfig
json index indexPaths