spark-connect

Programmer
4 min readJul 10, 2024

--

Introduction

spark-connect

spark-connect is a client-server decoupler that provides freedom to client (jvm and non-jvm) to create and execute spark jobs.

The client uses gRPC to make requests / responses to Spark Server.

The steps to enable a spark-server to accept gRPC connections would require the spark driver to be able to start as spark-connect server.

Setup spark-connect server

The following section helps to setup spark-connect into kubernetes.

For illustration purposes, this article uses an existing spark-submit application hosted in kubernetes and would use this as a step to transform this into a spark-connect service.

 /opt/spark/bin/spark-submit --deploy-mode client --master k8s://https://kubernetes.default.svc.cluster.local:443 --name my-app \
--class org.home.service.ServiceMain --jars ...,... --files /path/to/res1,/app/config/application.conf \
--driver-java-options -Dconfig.file=/app/config/application.conf --conf spark.kubernetes.executor.request.cores=5 \
--conf spark.driver.host=headless-service.ns1.svc.cluster.local --conf spark.driver.maxResultSize=20g \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.crossJoin.enabled=true \
--conf spark.ui.showConsoleProgress=true --conf spark.kubernetes.authenticate.serviceAccountName=spark-sql \
--conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.kubernetes.namespace=ns1 \
--conf spark.eventLog.enabled=false --conf spark.hadoop.fs.gs.io.buffersize=4096 --conf spark.kubernetes.node.selector.node-type=node-8-cores \
--conf spark.driver.port=7071 --conf spark.driver.memory=16g --conf spark.executor.instances=4 \
--conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.container.image=docker-repo/to/spark:3.5.1 \
--conf spark.executor.memory=22g --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.executor.cores=5 --conf spark.kubernetes.driver.pod.name=my-driver-5545c948c7-nt9wz \
--conf spark.sql.shuffle.partitions=2000 'main.jar'

Note:
(1) `headless-service.ns1.svc.cluster.local` is the fully-qualified path to a headless service which exposes port 7071 that is wired up with the driver (that kicks off the above spark-submit process).

(2) --jars points to list of jars used to run the above spark application. Its best to provide the jars in the logical order of classpath dependencies.

(3) `spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt` points to the default service-account provided by kubernetes. This is enabled when this above pod is deployed using `--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark`.
For more info: https://spark.apache.org/docs/latest/running-on-kubernetes.html#rbac

(4) `docker-repo/to/spark:3.5.1` is a spark that was built from the gitrepo
For more info: https://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution

The process is replaced by start-connect-server.sh with most other resources of kubernetes is kept intact.

/opt/spark/sbin/start-connect-server.sh --deploy-mode client --master k8s://https://kubernetes.default.svc.cluster.local:443 --name spark-connect \
--jars ...,... --conf spark.kubernetes.executor.request.cores=5 \
--conf spark.driver.host=headless-service.ns1.svc.cluster.local --conf spark.driver.maxResultSize=20g \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.crossJoin.enabled=true \
--conf spark.ui.showConsoleProgress=true --conf spark.kubernetes.authenticate.serviceAccountName=spark-sql \
--conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.kubernetes.namespace=ns1 \
--conf spark.eventLog.enabled=false --conf spark.hadoop.fs.gs.io.buffersize=4096 --conf spark.kubernetes.node.selector.node-type=node-8-cores \
--conf spark.driver.port=7071 --conf spark.driver.memory=16g --conf spark.executor.instances=4 \
--conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.container.image=gcr.io/myorg/spark:3.5.1 \
--conf spark.executor.memory=22g --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.executor.cores=5 --conf spark.kubernetes.driver.pod.name=my-driver-5545c948c7-nt9wz \
--conf spark.connect.grpc.binding.port=8081 --conf spark.kubernetes.driver.pod.name=connect-server-675cf4bd64-hvtds \
--conf spark.sql.shuffle.partitions=2000

Please write to this article if you need a detailed step-by-step k8 deployment for a spark cluster.

Note: The above server starts the grpc.binding.port at 8081.

The spark-connect-server would show up as spark-submit process. To confirm the running state of the process one could run the following

netstat -nlp

The less known steps of having a client to establish connection with spark-connect (especially the ones from Scala) is illustrated below.

Connect to spark-connect using pyspark

Step 1:

Create a venv

python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip

Step 2:

Install pyspark

pip install setuptools pandas pyarrow grpcio pyspark
pip install --upgrade google-api-python-client
pip install google grpc_status grpcio-status

Step 3:

Start pyspark

Enable port-forwarding from kubernetes to be able to connect.

kubectl port-forward deployment/connect-server 8081:8081
pyspark --remote "sc://localhost:8081"

Connecting to spark using ‘spark-connect-repl’

‘spark-connect-repl’ is the official cli-shell provided by Apache Spark as a JVM client

spark-connect-repl can be installed using coursier (https://get-coursier.io/docs/cli-installation)

Once coursier is installed, the spark-connect-repl can be installed with

cs install --contrib spark-connect-repl

Connect to spark using scala / java code

In this attempt, we are using the following code snippet.

//> using scala 2.13.14
//> using dep org.apache.spark::spark-connect-client-jvm:3.5.1
//> using dep com.google.guava:guava:23.6-jre
//> using dep org.apache.httpcomponents:httpcore:4.4.16

import org.apache.spark.sql._

object SparkJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.remote("sc://localhost:8081")
.create()

val df = spark.range(1, 2000)
df.createOrReplaceTempView("data")
spark.sql("""
SELECT * FROM data""").show(truncate = false)
}
}
scala-cli run ScalaJob.scala

Note:

(1) Its essential that the client uses spark-connect-client-jvm and NOT spark-sql (shipped with spark-server).

Thoughts and feedback welcome! Thank you for reading.

--

--

Programmer
Programmer

Responses (1)