Kubernetes is the trending container orchestration system that can be used to host various applications from web services to data processing jobs. Applications are packaged in self-contained, yet light-weight containers, and we declare how they should be deployed, how they scale, and how they expose as services. Flink is also a trending distributed computing framework that can run on a variety of platforms, including Kubernetes. Combining them will bring us robust and scalable deployments of data processing jobs, and more safely Flink can share a Kubernetes cluster with other services.
When deploying Flink on Kubernetes, there are two options, session cluster and job cluster. Session cluster is like running a standalone Flink cluster on k8s that can accept multiple jobs and is suitable for short running tasks or ad-hoc queries. Job cluster, on the other hand, deploys a full set of Flink cluster for each individual job. We build container image for each job, and provide it with dedicated resources, so that jobs have less chance interfering with other, and can scale out independently. So this article will illustrate how to run a Flink job cluster on Kubernetes, the steps are:
- Compile and package the Flink job jar.
- Build a Docker image containing the Flink runtime and the job jar.
- Create a Kubernetes Job for Flink JobManager.
- Create a Kubernetes Service for this Job.
- Create a Kubernetes Deployment for Flink TaskManagers.
- Enable Flink JobManager HA with ZooKeeper.
- Correctly stop and resume Flink job with SavePoint facility.
Kubernetes Playground
In case you do not already have a Kubernetes environment, one can easily setup a local playground with minikube. Take MacOS for an example:
- Install VirtualBox, since minikube will setup a k8s cluster inside a virtual machine.
- Download the minikube binary, making it executable and accessible from PATH.
- Execute
minikube start
, it will download the virtual machine image, kubelet and kubeadm facilities, install and verify the k8s cluster. If you have trouble accessing the internet, setup a proxy and tell minikube to use it. - Download and install the kubectl binary. Minikube has configured kubectl to point to the installed k8s cluster, so one can execute
kubectl get pods -A
to see the running system pods.
1 | NAMESPACE NAME READY STATUS RESTARTS AGE |
Flink Streaming Job
Let us create a simple streaming job, that reads data from socket, and prints the count of words every 5 seconds. The following code is taken from Flink doc, and a full Maven project can be found on GitHub.
1 | DataStream<Tuple2<String, Integer>> dataStream = env |
IP 192.168.99.1
allows container to access services running on minikube host. For this example to work, you need to run nc -lk 9999
on your host before creating the JobManager pod.
Run mvn clean package
, and the compiled job jar can be found in target/flink-on-kubernetes-0.0.1-SNAPSHOT-jar-with-dependencies.jar
.
Build Docker Image
Flink provides an official docker image on DockerHub. We can use it as the base image and add job jar into it. Besides, in recent Flink distribution, the Hadoop binary is not included anymore, so we need to add Hadoop jar as well. Take a quick look at the base image’s Dockerfile, it does the following tasks:
- Create from OpenJDK 1.8 base image.
- Install Flink into
/opt/flink
. - Add
flink
user and group. - Configure the entry point, which we will override in k8s deployments.
1 | FROM openjdk:8-jre |
Based on it, we create a new Dockerfile:
1 | FROM flink:1.8.1-scala_2.12 |
Before building the image, you need to install Docker CLI and point it to the docker service inside minikube:
1 | $ brew install docker |
Then, download the Hadoop uber jar, and execute the following commands:
1 | $ cd /path/to/Dockerfile |
Now we have a local docker image that is ready to be deployed.
1 | $ docker image ls |
Deploy JobManager
First, we create a k8s Job for Flink JobManager. Job and Deployment both create and manage Pods to do some work. The difference is Job will quit if the Pod finishes successfully, based on the exit code, while Deployment only quits when asked to. This feature enables us to cancel the Flink job manually, without worrying Deployment restarts the JobManager by mistake.
Here’s the jobmanager.yml
:
1 | apiVersion: batch/v1 |
${JOB}
can be replaced byenvsubst
, so that config files can be reused by different jobs.- Container’s entry point is changed to
standalone-job.sh
. It will start the JobManager in foreground, scan the class path for aMain-Class
as the job entry point, or you can specify the full class name via-j
option. Then, this job is automatically submitted to the cluster. - JobManager’s RPC address is the k8s Service‘s name, which we will create later. Other containers can access JobManager via this host name.
- Blob server and queryable state server’s ports are by default random. We change them to fixed ports for easy exposure.
1 | $ export JOB=flink-on-kubernetes |
Next, we expose this JobManager as k8s Service, so that TaskManagers can register to it.
service.yml
1 | apiVersion: v1 |
type: NodePort
is necessary because we also want to interact with this JobManager outside the k8s cluster.
1 | $ envsubst <service.yml | kubectl create -f - |
We can see Flink dashboard is exposed on port 31254 on the virtual machine. Minikube provides a command to retrieve the full url of a service.
1 | $ minikube service $JOB-jobmanager --url |
Deploy TaskManager
taskmanager.yml
1 | apiVersion: apps/v1 |
Change the number of replicas
to add more TaskManagers. The taskmanager.numberOfTaskSlots
is set to 1
in this image, which is recommended because we should let k8s handle the scaling.
Now the job cluster is running, try typing something into the nc
console:
1 | $ nc -lk 9999 |
Open another terminal and tail the TaskManager’s standard output:
1 | $ kubectl logs -f -l instance=$JOB-taskmanager |
Configure JobManager HA
While TaskManager can achieve high availability by increasing the replicas of the Deployment, JobManager is still a single point of failure. Flink comes with an HA solution with the help of ZooKeeper and a distributed file system like HDFS. In a standalone cluster, multiple JobManagers are started and one of them is elected as leader. In YARN or Kubernetes deployment, only one JobManager instance is required. The cluster’s meta info is stored in ZooKeeper, and checkpoint data are stored in HDFS. When JobManager is down, Kubernetes will restart the container, and the new JobManager will restore the last checkpoint and resume the job.
To enable JobManager HA, change the start command of both JobManager and TaskManager:
jobmanager-ha.yml
1 | command: ["/opt/flink/bin/standalone-job.sh"] |
taskmanager-ha.yml
1 | command: ["/opt/flink/bin/taskmanager.sh"] |
- Prepare a ZooKeeper and HDFS environment on minikube host, so that Flink containers can access them via
192.168.99.1:2181
and192.168.99.1:9000
. - Cluster meta data will be stored under
/flink/${JOB}
in ZooKeeper. - Checkpoint data is stored under
/flink/recovery
in HDFS. Make sure you create the/flink
directory with proper permission. - The
jobmanager.rpc.address
property is removed from TaskManager’s arguments because the RPC host and port of JobManager will be fetched from ZooKeeper. The RPC port is by default random, so we changed to a fixed port viahigh-availability.jobmanager.port
, which is exposed in k8s Service.
Manage Flink Job
We can interact with Flink cluster via RESTful API. It is the same port as Flink Dashboard. Install Flink binaries on your host machine, and pass -m
argument to point to the JobManager in k8s:
1 | $ bin/flink list -m 192.168.99.108:30206 |
In HA mode, Flink job ID is by default 00000000000000000000000000000000
. We can use this ID to cancel Flink job with SavePoint:
1 | $ bin/flink cancel -m 192.168.99.108:30206 -s hdfs://192.168.99.1:9000/flink/savepoints/ 00000000000000000000000000000000 |
And the k8s Job is now in completed status:
1 | $ kubectl get job |
To re-submit the job, we need to delete them first:
1 | $ kubectl delete job $JOB-jobmanager |
Then add a command argument to jobmanager-savepoint.yml
:
1 | command: ["/opt/flink/bin/standalone-job.sh"] |
Start this job from the SavePoint:
1 | $ export SAVEPOINT=hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c |
One note on SavePoint, it has to be used with HA mode, because the --fromSavepoint
argument will be passed to standalone-job.sh
every time Kubernetes tries to restart a failed JobManager. With HA mode enabled, the new JobManager will first restore from the CheckPoint, ignoring the SavePoint.
Scale Flink Job
There are two ways to scale a Flink job. One is manually restarting it with a different parallelism.default
config, which can be found in jobmanager.yml
. Another way is using the bin/flink modify
command. Under the hood, this command cancels the job with a SavePoint, and restarts it with the new parallelism. So for this to work, you need to first set the default SavePoint directory, like:
1 | command: ["/opt/flink/bin/standalone-job.sh"] |
Then, add more TaskManagers with kubectl scale
:
1 | $ kubectl scale --replicas=2 deployment/$JOB-taskmanager |
And modify the parallelism of the running job:
1 | $ bin/flink modify 755877434b676ce9dae5cfb533ed7f33 -m 192.168.99.108:30206 -p 2 |
However, due to an unresolved issue, we cannot use flink modify
to scale an HA job cluster in Kubernetes mode. Use the manual method instead.
Flink Native Support on Kubernetes
Flink enjoys a very active community that constantly improves its own design (FLIP-6) to adopt current cloud-native environment. They’ve also noticed the rapid development of Kubernetes, and the native support of Flink on K8s is under development as well. It’s known that Flink can run natively on resource management systems like YARN/Mesos. Take YARN for an instance, Flink will first start an ApplicationMaster as the JobManager, analyze how much resource this job needs, and request YARN ResourceManager for containers to run TaskManager. When the parallelism changes, JobManager will acquire or release containers correspondingly. This kind of active resource management for Kubernetes is under development (FLINK-9953). In future, we can deploy Flink cluster on K8s with a simple command.
Besides, another kind of resource management is also on its way. It’s called reactive container mode (FLINK-10407). In short, when the JobManager realizes there are idle TaskManagers, it will automatically scale the job to its maximum parallelism. Thus, we only need to use kubectl scale
changing replicas
, without executing flink modify
later. Such convenient features will be available soon, I believe.
References
- https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
- https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
- https://jobs.zalando.com/tech/blog/running-apache-flink-on-kubernetes/
- https://www.slideshare.net/tillrohrmann/redesigning-apache-flinks-distributed-architecture-flink-forward-2017
- https://www.slideshare.net/tillrohrmann/future-of-apache-flink-deployments-containers-kubernetes-and-more-flink-forward-2019-sf