spark-connect
Introduction
spark-connect is a client-server decoupler that provides freedom to client (jvm and non-jvm) to create and execute spark jobs.
The client uses gRPC to make requests / responses to Spark Server.
The steps to enable a spark-server to accept gRPC connections would require the spark driver
to be able to start as spark-connect
server.
Setup spark-connect server
The following section helps to setup spark-connect
into kubernetes.
For illustration purposes, this article uses an existing spark-submit
application hosted in kubernetes and would use this as a step to transform this into a spark-connect
service.
/opt/spark/bin/spark-submit --deploy-mode client --master k8s://https://kubernetes.default.svc.cluster.local:443 --name my-app \
--class org.home.service.ServiceMain --jars ...,... --files /path/to/res1,/app/config/application.conf \
--driver-java-options -Dconfig.file=/app/config/application.conf --conf spark.kubernetes.executor.request.cores=5 \
--conf spark.driver.host=headless-service.ns1.svc.cluster.local --conf spark.driver.maxResultSize=20g \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.crossJoin.enabled=true \
--conf spark.ui.showConsoleProgress=true --conf spark.kubernetes.authenticate.serviceAccountName=spark-sql \
--conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.kubernetes.namespace=ns1 \
--conf spark.eventLog.enabled=false --conf spark.hadoop.fs.gs.io.buffersize=4096 --conf spark.kubernetes.node.selector.node-type=node-8-cores \
--conf spark.driver.port=7071 --conf spark.driver.memory=16g --conf spark.executor.instances=4 \
--conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.container.image=docker-repo/to/spark:3.5.1 \
--conf spark.executor.memory=22g --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.executor.cores=5 --conf spark.kubernetes.driver.pod.name=my-driver-5545c948c7-nt9wz \
--conf spark.sql.shuffle.partitions=2000 'main.jar'
Note:
(1) `headless-service.ns1.svc.cluster.local` is the fully-qualified path to a headless service which exposes port 7071
that is wired up with the driver
(that kicks off the above spark-submit
process).
(2) --jars
points to list of jars used to run the above spark application. Its best to provide the jars in the logical order of classpath dependencies.
(3) `spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt` points to the default service-account
provided by kubernetes. This is enabled when this above pod is deployed using `--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
`.
For more info: https://spark.apache.org/docs/latest/running-on-kubernetes.html#rbac
(4) `docker-repo/to/spark:3.5.1` is a spark that was built from the gitrepo
For more info: https://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution
The process is replaced by start-connect-server.sh
with most other resources of kubernetes is kept intact.
/opt/spark/sbin/start-connect-server.sh --deploy-mode client --master k8s://https://kubernetes.default.svc.cluster.local:443 --name spark-connect \
--jars ...,... --conf spark.kubernetes.executor.request.cores=5 \
--conf spark.driver.host=headless-service.ns1.svc.cluster.local --conf spark.driver.maxResultSize=20g \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.crossJoin.enabled=true \
--conf spark.ui.showConsoleProgress=true --conf spark.kubernetes.authenticate.serviceAccountName=spark-sql \
--conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.kubernetes.namespace=ns1 \
--conf spark.eventLog.enabled=false --conf spark.hadoop.fs.gs.io.buffersize=4096 --conf spark.kubernetes.node.selector.node-type=node-8-cores \
--conf spark.driver.port=7071 --conf spark.driver.memory=16g --conf spark.executor.instances=4 \
--conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.container.image=gcr.io/myorg/spark:3.5.1 \
--conf spark.executor.memory=22g --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.executor.cores=5 --conf spark.kubernetes.driver.pod.name=my-driver-5545c948c7-nt9wz \
--conf spark.connect.grpc.binding.port=8081 --conf spark.kubernetes.driver.pod.name=connect-server-675cf4bd64-hvtds \
--conf spark.sql.shuffle.partitions=2000
Please write to this article if you need a detailed step-by-step k8 deployment for a spark cluster.
Note: The above server starts the grpc.binding.port
at 8081.
The spark-connect-server
would show up as spark-submit
process. To confirm the running state of the process one could run the following
netstat -nlp
The less known steps of having a client to establish connection with spark-connect
(especially the ones from Scala) is illustrated below.
Connect to spark-connect using pyspark
Step 1:
Create a venv
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
Step 2:
Install pyspark
pip install setuptools pandas pyarrow grpcio pyspark
pip install --upgrade google-api-python-client
pip install google grpc_status grpcio-status
Step 3:
Start pyspark
Enable port-forwarding from kubernetes to be able to connect.
kubectl port-forward deployment/connect-server 8081:8081
pyspark --remote "sc://localhost:8081"
Connecting to spark using ‘spark-connect-repl’
‘spark-connect-repl’ is the official cli-shell provided by Apache Spark as a JVM client
spark-connect-repl
can be installed using coursier
(https://get-coursier.io/docs/cli-installation)
Once coursier
is installed, the spark-connect-repl
can be installed with
cs install --contrib spark-connect-repl
Connect to spark using scala / java code
In this attempt, we are using the following code snippet.
//> using scala 2.13.14
//> using dep org.apache.spark::spark-connect-client-jvm:3.5.1
//> using dep com.google.guava:guava:23.6-jre
//> using dep org.apache.httpcomponents:httpcore:4.4.16
import org.apache.spark.sql._
object SparkJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.remote("sc://localhost:8081")
.create()
val df = spark.range(1, 2000)
df.createOrReplaceTempView("data")
spark.sql("""
SELECT * FROM data""").show(truncate = false)
}
}
scala-cli run ScalaJob.scala
Note:
(1) Its essential that the client uses spark-connect-client-jvm
and NOT spark-sql
(shipped with spark-server
).
Thoughts and feedback welcome! Thank you for reading.