This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Kubernetes Setup

This page describes how to deploy a Flink job and session cluster on Kubernetes.

Info This page describes deploying a standalone Flink session on top of Kubernetes. For information on native Kubernetes deployments read here.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A basic Flink session cluster deployment in Kubernetes has three components:

  • a Deployment/Job which runs the JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Using the resource definitions for a session cluster, launch the cluster with the kubectl command:

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

You can then access the Flink UI via different ways:

./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
  • Create a NodePort service on the rest service of jobmanager:
    1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://<public-node-ip>:<node-port> in your browser.
    3. Similarly to port-forward solution, you could also use the following command below to submit jobs to the cluster:
./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

In order to terminate the Flink session cluster, use kubectl:

kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f flink-configuration-configmap.yaml

A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.

Creating the job-specific image

The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started. Therefore, one needs to build a dedicated container image for every job. Please follow these instructions to build the Docker image.

Using plugins

As described in the plugins documentation page: in order to use plugins they must be copied to the correct location in the flink installation for them to work.

The simplest way to enable plugins for use on Kubernetes is to modify the provided Flink docker image by adding an additional layer. This does however assume you have a docker registry available where you can push images to and that is accessible by your Kubernetes cluster.

How this can be done is described on the Docker Setup page.

With such an image created you can now start your Kubernetes based Flink cluster which can use the enabled plugins.

In order to deploy the a job cluster on Kubernetes please follow these instructions.

Advanced Cluster Deployment

An early version of a Flink Helm chart is available on GitHub.

Appendix

Session cluster resource definitions

The Deployment definitions use the pre-built image flink:latest which can be found on Docker Hub. The image is built from this Github repository.

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager

Back to top