LogoLogo
release-0.12.0
release-0.12.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • Stream Ingestion with Dedup
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Native Text Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Aggregation Functions
      • Transformation Functions
      • User-Defined Functions (UDFs)
      • Grouping Algorithm
      • Query Options
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Filtering with IdSet
      • Explain Plan
      • GapFill Function For Time-Series Dataset
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null Value Support
      • Multi-Stage Query Engine
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Server Startup Status Checkers
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Moving data from one tenant to another based on segment age
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • yearOfWeek
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page
  • Running in Cluster Mode on YARN
  • FAQ

Was this helpful?

Export as PDF
  1. Basics
  2. Import Data
  3. Batch Ingestion

Spark

PreviousBatch IngestionNextFlink

Last updated 2 years ago

Was this helpful?

Pinot supports Apache spark as a processor to create and push segment files to the database. Pinot distribution is bundled with the Spark code to process your files and convert and upload them to Pinot.

We support both Spark 2.X and 3.X

You can follow the to build pinot distribution from source. The resulting JAR file can be found in pinot/target/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

Next, you need to change the execution config in the to the following -

# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

  # name: execution framework name
  name: 'spark'

  # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'

  # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'

  # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface.
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'

  #segmentMetadataPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner interface
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'

  # extraConfigs: extra configs for execution framework.
  extraConfigs:

    # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
    stagingDir: your/local/dir/staging

You can check out the sample job spec here.

To run Spark ingestion, you need the following jars in your classpath

  • pinot-batch-ingestion-spark plugin jar - available in plugins-external directory in the package

  • pinot-all jar - available in lib directory in the package

These jars can be specified using spark.driver.extraClassPath or any other option.

spark.driver.extraClassPath =>
pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar

For loading any other plugins that you want to use, you can use -

spark.driver.extraJavaOptions =>
-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins

The complete spark-submit command should look as follows

export PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/path/to/apache-pinot-${PINOT_VERSION}-bin

spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master local --deploy-mode client //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
-conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile /path/to/spark_job_spec.yaml

Please ensure environment variables PINOT_ROOT_DIR and PINOT_VERSION are set properly.

Note: You should change the master to yarn and deploy-mode to cluster for production environments.

Running in Cluster Mode on YARN

If you want to run the spark job in cluster mode on YARN/EMR cluster, the following needs to be done -

  • Build Pinot from source with option -DuseProvidedHadoop

  • Copy Pinot binaries to S3, HDFS or any other distributed storage that is accessible from all nodes.

  • Copy Ingestion spec YAML file to S3, HDFS or any other distributed storage. Mention this path as part of --files argument in the command

  • Add --jars options that contain the s3/hdfs paths to all the required plugin and pinot-all jar

  • Point classPath to spark working directory. Generally, just specifying the jar names without any paths works. Same should be done for main jar as well as the spec YAML file

Example

spark-submit //
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand //
--master yarn --deploy-mode cluster //
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" //
--conf "spark.driver.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--conf "spark.executor.extraClassPath=pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar:pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" //
--jars "${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pinot-batch-ingestion-spark-2.4-${PINOT_VERSION}-shaded.jar,${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar"
--files s3://path/to/spark_job_spec.yaml
local://pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile spark_job_spec.yaml

For Spark 3.x, replace pinot-batch-ingestion-spark-2.4 with pinot-batch-ingestion-spark-3.2 in all places in the commands. Also, ensure the classpath in ingestion spec is changed from org.apache.pinot.plugin.ingestion.batch.spark. to org.apache.pinot.plugin.ingestion.batch.spark3.

FAQ

Q - I am getting the following exception - Class has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

Q - I am not able to find pinot-batch-ingestion-spark jar.

For Pinot version prior to 0.10.0, the spark plugin is located in plugin dir of binary distribution. For 0.10.0 and later, it is located in pinot-external dir.

Q - Spark is not able to find the jars leading to java.nio.file.NoSuchFileException

This means the classpath for spark job has not been configured properly. If you are running spark in a distributed environment such as Yarn or k8s, make sure both spark.driver.classpath and spark.executor.classpath are set. Also, the jars in driver.classpath should be added to --jars argument in spark-submit so that spark can distribute those jars to all the nodes in your cluster. You also need to take provide appropriate scheme with the file path when running the jar. In this doc, we have used local:\\ but it can be different dependening on your cluster setup.

Q - Spark job failing while pushing the segments.

It can be because of misconfigured controllerURI in job spec yaml file. If the controllerURI is correct, make sure it is accessible from all the nodes of your YARN or k8s cluster.

Q - My data gets overwritten during ingestion.

Q - I am getting java.lang.RuntimeException: java.io.IOException: Failed to create directory: pinot-plugins-dir-0/plugins/*

Removing -Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins from spark.driver.extraJavaOptions should fix this. As long as plugins are mentioned in classpath and jars argument it should not be an issue.

Q - Getting Class not found: exception

Please check if extraClassPath arguments contain all the plugin jars for both driver and executors. Also, all the plugin jars are mentioned in the --jars argument. If both of these are correct, please check if the extraClassPath contains local filesystem classpaths and not s3 or hdfs or any other distributed file system classpaths.

We have stopped including spark-core dependency in our jars post 0.10.0 release. Users can try 0.11.0-SNAPSHOT and later versions of pinot-batch-ingestion-spark in case of any runtime issues. You can either or download latest master build jars.

Since 0.8.0 release, Pinot binaries are compiled with JDK 11. If you are using Spark along with Hadoop 2.7+, you need to use the java8 version of pinot. Currently, you need to .

Set to APPEND in the tableConfig.

If already set to APPEND, this is likely due to a missing timeColumnName in your table config. If you can't provide a time column, please use our in ingestion spec. Generally using inputFile segment name generator should fix your issue.

build from source
build jdk 8 version from source
segment name generation configs
segmentPushType
wiki
job spec