Categories
Uncategorized

Spark-ly OpenShift….

For the last couple of weeks I’ve been experimenting with a customer using Spark on OpenShift and it’s a lot of fun. For those who have never heard of Apache Spark it’s a ‘unified analytics engine for large-scale data processing’. A next-generation Hadoop, so to speak.

I’d never really looked at Spark as I thought it was a: complicated and b: complicated. I never really got what Spark was all about. But working with this customer it started to become clear; the advantages of distributed and parallelised algorithms is massive for certain ‘massive data’ workloads.

So I taught myself how to use Spark, and more importantly, how to use it with Kubernetes/OpenShift. And to my surprise it was pretty simple once you got the basics.

This blog post will walk through the manual approach to using Spark on Kubernetes/OpenShift, using the Spark command-lines to push jobs and configuration to an OpenShift cluster, and then explain how to use the Google Spark Operator, which runs on OpenShift and provides a YAML based way to do the same thing as the command line. I’ll also show an example where I execute a Spark job across some shared storage; the key components of being able to execute a job, being able to execute a job using a YAML based approach and being able to attach storage for persistence away from the job lifespan gives you all the key pieces you need to start having fun with Spark.

I’ve chosen JAVA as my weapon of choice for the examples – the Spark images I have used come with some fantastic little examples and I’ve cribbed from them. You can also write your applications in Python, R or Scala; in fact when I demo Open Data Hub (the ML toolkit for OpenShift) I run a Python based Spark workload for calculating Pi (the classic example that is used), with ODH orchestrating the Spark Cluster under the covers.

I also wanted to be able to use my own code for the examples, just to demonstrate to the customer how to distribute their apps accordingly. So the first thing I did was to create a composite Docker Image containing the Spark runtime and framework plus my Application.

All of the example code is available at https://github.com/utherp0/sparkdemo

I used the Spark images provided by datamechanics from Docker Hub at https://hub.docker.com/r/datamechanics/spark

The Dockerfile to create my composite Spark image was very simple and consisted simply of:

FROM datamechanics/spark:jvm-only-3.0.0-hadoop-3.2.0-java-11-scala-2.12-latest
COPY target/sparktests-1.0.jar /opt/

I ran this is the root of my repo; having built the application using Maven the jar file was in the target/ directory – the Dockerfile simply added the application into the /opt/ directory of the composite image. I then pushed that to my quay.io account.

I then installed the Spark framework on my Mac (simply using the brew command).

Pictured – Hello Spark….

I logged onto my OpenShift cluster and created a project called sparkexample. Then, to run the Spark workload, it was as simple as issuing the following command:

spark-submit --master k8s://https://api.cluster-d2ed.d2ed.sandbox1722.opentlc.com:6443 --deploy-mode cluster --name spark-pi-uth --class org.uth.sparkdemo.PiSparkTest1 --conf spark.executor.instances=2 --conf spark.kubernetes.namespace=sparkexample --conf spark.kubernetes.container.image=quay.io/ilawson/sparktest1:latest local:///opt/sparktests-1.0.jar

I’ve bolded the components of interest – the master is where the Spark job will be scheduled; in this case I’m targeting Kubernetes (k8s://) and providing the API address for my OpenShift cluster.

The name is the name that will be applied to all the objects created within the namespace.

The class is the actual workload I built in my JAR file.

The executor instances is how many ‘workers’ I want to create to execute the job – Spark works by creating a ‘driver’ which then orchestrates and controls/aggregates the ‘executors’. This is all done for you by the Spark interaction with the Cluster API. In this case I have indicated I need two executors.

I then target the namespace I created using a conf entry for spark.kubernetes.namespace.

I then provide the composite image location (which is my prebuilt image with the Spark framework and my JAR file in it).

I then provide the location of the workload as the last parameter; in this case it is a local file within the image (local:///opt/sparktests-1.0.jar) which I created using the Dockerfile.

The fun thing is that this doesn’t work, because of OpenShift’s clever security model that stops naughtiness. What happens is the driver is created, but then doesn’t have the access, through the serviceaccount you get by default in OpenShift, to do the things the driver needs to do (create a Pod, create a configmap, create a service).

The easy (but not the right way) to fix this is to simply give the default service account admin rights to the namespace. The correct way, which is much better, is to create a serviceaccount in the project specifically for Spark jobs. So I did that.

I then created a ‘Role’ which had only the operations the Spark driver needs thus:

And finally a ‘RoleBinding’ to assign that Spark role needed to my new Service Account:

Now I just have to add:

spark-submit --master k8s://https://api.cluster-d2ed.d2ed.sandbox1722.opentlc.com:6443 --deploy-mode cluster --name spark-pi-uth --class org.uth.sparkdemo.PiSparkTest1 --conf spark.executor.instances=2 --conf spark.kubernetes.namespace=sparkexample --conf spark.kubernetes.authenticate.driver.serviceAccountName=sparkuser --conf spark.kubernetes.container.image=quay.io/ilawson/sparktest1:latest local:///opt/sparktests-1.0.jar

To my spark-submit. I then watched the Pods within my namespace (the Error one was the attempt we tried without the serviceaccount). The driver started, created the executor Pods, executed the workload in those Pods, terminated those Pods and aggregated the results. Et voila….

What’s nice is that using the serviceaccount allows the Cluster ops to control exactly what the Spark jobs can do; this is part of the OpenShift system and provides a superb security model.

You can also use the spark-submit approach to run workloads that have shared storage as well – the spark-submit command provides configuration options for attaching PVCs to both the driver and executor Pods; the only gotcha is that to orchestrate jobs using a piece of shared storage you must express the PVC to both the driver and the executors thus:

  --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName=(claimname) \
  --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path=(mount dir for driver) \
  --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName=(claimname) \
  --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path=(mount dir for driver) \

Interestingly this also works with PVCs that are created as ReadWriteOnce, even though the conf specifies rwxpvc….

There’s a nice little example provided with the Spark framework image that does a distributed wordcount (total of individual words) – as a test I ran a spark-submit for that job, having created a quick PV in OpenShift, mounted it to a Pod, created a file (/mnt/playground/words.txt) and then provided that PVC as conf parameters into the spark-submit.

The command looks like:

spark-submit --master k8s://https://api.cluster-d2ed.d2ed.sandbox1722.opentlc.com:6443 --deploy-mode cluster --name spark-pi-uth-wordcount --class org.apache.spark.examples.JavaWordCount --conf spark.executor.instances=2 --conf spark.kubernetes.namespace=sparkexample --conf spark.kubernetes.authenticate.driver.serviceAccountName=sparkuser --conf spark.kubernetes.container.image=datamechanics/spark:3.1.1-latest --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName=wordclaim --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path=/mnt/playground --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName=wordclaim --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path=/mnt/playground local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar /mnt/playground/words.txt

And when the driver completes the output looks like:

Which works a treat – that example effectively mapped persistent storage into the driver and all of the executors.

Whilst that is nice (I think so) the commands are starting to get a little unwieldy, and if you add in the configuration you have to do (setting up the roles/role-bindings and the like per namespace) it feels a little clunky.

To make it much easier to use there’s an Operator (as there always is, nowadays) that wraps it all up nicely for you. Currently there are two you can choose from on the Red Hat Operator Hub, but going forward Red Hat will be contributing to the Google Spark Operator.

I installed the Google Spark operator into another namespace using the OpenShift operator hub – one of the nice features is that the Operator also installs a serviceaccount (“spark”) which is pre-configured with the appropriate roles for running spark workload kubernetes components, negating the need to create a role yourself.

You will also note that unlike a lot of ‘community’ Operators the Spark Operator capability level is nicely almost complete. It’s a mature Operator, which is why Red Hat are contributing to it rather than re-inventing the wheel.

And this is where it gets fun – instead of constructing a verbose ‘spark-submit’ command you simply create an appropriately formatted piece of YAML and submit it in the namespace where the Operator is installed. For instance, the first example we did earlier (my version of the PiSpark example using a composite image) now looks like:

apiVersion: sparkoperator.k8s.io/v1beta1
kind: SparkApplication
metadata:
  name: uthsparkpi
spec:
  sparkVersion: 3.1.1
  type: Java
  mode: cluster
  image: quay.io/ilawson/sparktest1:latest
  mainClass: org.uth.sparkdemo.PiSparkTest1
  mainApplicationFile: local:///opt/sparktests-1.0.jar
  sparkConf:
    "spark.kubernetes.authenticate.driver.serviceAccountName": "spark"
driver:
  serviceAccount: 'spark'
  labels:
    type: spark-application
  cores: 1
  coreLimit: 1
executor:
  instances: 2
  cores: 1
  coreLimit: 1

What’s also nice is you can push direct spark-submit conf settings via the YAML as well. I can then execute the job in the namespace using the oc command by simply ‘oc create -f’-ing the file.

Here’s a screengrab of the job in action – the operator runs as a Pod in the namespace, it receives the custom-resource for a ‘SparkApplication’ and creates the driver Pod, which then creates the Executors it needs and runs the workload.

Once the job has finished the driver pod completes and I can view the logs of the driver pod to get the aggregated response:

In order to execute the workload that requires persistent volumes (and you can add as many volumes as you like through the same methodology) I have the following SparkApplication defined as YAML:

apiVersion: sparkoperator.k8s.io/v1beta1
kind: SparkApplication
metadata:
  name: uthwithpvc
spec:
  sparkVersion: 3.1.1
  type: Java
  mode: cluster
  image: datamechanics/spark:3.1.1-latest
  mainClass: org.apache.spark.examples.JavaWordCount
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
  sparkConf:
    "spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName": "playground"
    "spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path": "/mnt/playground"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName": "playground"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path": "/mnt/playground"
  arguments:
    - /mnt/playground/data/words.txt
driver:
  serviceAccount: 'spark'
  labels:
    type: spark-application
  cores: 1
  coreLimit: 1
executor:
  instances: 2
  cores: 1
  coreLimit: 1

Notice this time rather than set the serviceaccount as a ‘conf’ entry I have used the fully qualified YAML fields (the driver: serviceAccount:).

So that was a very brief introduction but I hope there’s enough core components there to allow you to get playing – once I understood the mechanics of the Spark orchestration it all clicking in place and was a lot of fun to play with…..

Categories
Uncategorized

Just what on earth *is* a Cloud Event?

One of the best things about working for a company like Red Hat is that it gives you a chance, if you want it, to get involved with any aspect of the company. I volunteered, this year, to help out with coding the demo for Summit and ended up writing a set of functions, in Quarkus, driven by Cloud Events for processing the game state (for those who missed it we did a retro-styled version of Battleships completely event driven, node.js front end and game server, state update engine in functions and state held in a three cluster geo-replicated instance of Red Hat Data Grid – it was fun).

This entailed learning just what a Cloud Event actually was; not the theory behind it, which I’ll explain in a second, but what it was under the covers, and this is what I want to share and explain in this blog post because I personally feel that this kinda of event-driven, instantiated when needed model for writing micro-service components of a bigger system lends itself wonderfully to the on-demand and highly efficient way containerised applications can be orchestrated with Kubernetes and OpenShift.

When I started to look at it all I was seriously confuddled; the nice thing about the Cloud Event stuff is that it is abstracted to the point of ‘easy-to-use’, but I come from a background of needing to know exactly what is going on under the covers before I trust a new technology enough to use it. Yeah, I know, it’s a bad approach especially with the level of complexity of things like, say, Kubernetes, but it’s also nice to know where to look when something breaks under you.

So, the theory first – the idea behind Cloud Events is to simplify the mechanisms by which event driven applications can be triggered and routed within a Kubernetes and OpenShift cluster. In the old days (i.e. last week) a developer had to setup the queues, the connections, write their software around a specific technology etc etc. With Cloud Events it becomes superbly simple; you write your app to be driven by the arrival of a named event, the event itself is just a name and a payload, which can be anything. It’s almost the ultimate genericisation of the approach; again, in the old days, you used to choose to go down one of two routes, you *specialised* your approach (strictly defined interfaces, beans and the like) or you ‘genericised’ where your software would receive and then identify the payload, and act accordingly. Approach one leads to more, but more stable, code. Approach number two is much more agile for change.

So, long story short, the Cloud Event approach takes away all the complexity and required knowledge of the developers for the process of receiving the event and lets them just get on with the functionality. It also ties in very nicely with the knative approach, where an application is created on demand and exists for the duration of the required interaction, then goes away.

And I understood that. I just didn’t understand what a Cloud Event actually was. So I did a little digging and, with the help of the guys writing the implementation, it became clear.

Firstly, and this was the sticky bit for me that I didn’t understand, Cloud Events are simply http posts. Nothing more complicated than that – you want to create an event, you connect to the target broker and push a post request with the appropriate (and this is the key) headers set. There are plenty of very useful APIs and the like for processing events – for example the Quarkus Funqy library, which abstract all the handling at the client side, but it was the fact that to create and send a Cloud Event you simply post to a target URL that opened my eyes on how easy it was.

A very important link I was given, which explained the ins and outs of the Cloud Event itself, was https://github.com/cloudevents/spec/blob/v1.0.1/spec.md – this is the working draft (as of May 2021) of the Cloud Event standard.

It’s very interesting to look at the mandatory header fields for the Cloud Event as they, in themselves, describe what makes Cloud Events so good and the thought process behind them – you have the id which is the unique identifier for this event; you have the source which is a context label; the combination of the id and source must be unique (within the broker) and acts as both an identifier and a context description which is neat. And you have the type which is the identifier used for routing the events (in actuality the type is related to the triggers; a trigger listens for an event of that type on a broker and forwards it to a processor as a Cloud Event).

And back to the theory – why is this such an attractive technology, to me at least? Well, it bridges two of the major problems I’ve always seemed to have when designing and implementing these systems. The idea of microservices has always appealed to me, even back when the idea didn’t exist; the concept of being able to hive off functionality into a separate unit that could be changed without having to rebuild or redesign the core of an application is very attractive, especially when the technology changes a lot. But micro-services were always a bit of a hassle to me because I didn’t want to spend my time writing 70% of boilerplate code that had nothing to do with the actual functionality (the wrappers, the beans, the config, all the bits to stand up a ten line piece of JAVA code that actually did something).

Cloud Events solves those problems and does it in spades; the abstraction to a simple format (name and payload), the simplicity of creating the events themselves (if you look at the source code in the image at the top of the blog you’ll see how easy it is to create and send a Cloud Event), it’s got the balance of configuration and code just right.

I’m intrigued to see where this technology goes and how the industry adopts it; I think they will, not only is it very powerful it’s also, and this is the most important bit, very easy to write software around. And that’s the kicker; make it easy and useful and it will get adopted big time.