Рубрики
Без рубрики

Мое путешествие со искрой на Кубернане … В Python (3/3)

Нам нужно управлять Kubernetes как часть клиентского приложения Python. Итак, нам нужно взаимодействовать с … Теги с искрой, Куберанес, Python.

Нам нужно управлять Kubernetes как часть клиентского приложения Python. Итак, нам нужно взаимодействовать с api api kubernetes. К счастью, нам не нужно реализовывать вызовы API и управлять HTTP-запросами/ответами самими собой: мы можем положиться на Kubernetes Python Client , Среди других официально поддерживаемых клиентских библиотек Kubernetes для других языков, таких как GO, Java, .NET, JavaScript и haskell (есть также много общественных библиотек клиентских библиотек для многих языков).

Клиент Kubernetes Python совместим с Python 2.7 и 3.4+. Смотрите Матрица совместимости для поддерживаемых версий Куберов.

При использовании клиентской библиотеки мы должны сначала загрузить информацию о аутентификации и кластере.

Загрузить аутентификацию и информацию о кластере

Во-первых, вам необходимо настроить необходимую учетную запись услуг и роли.

K8S/Python-Client-SA-RBAC.YAML

apiVersion: v1
kind: ServiceAccount
metadata:
  name: python-client-sa
  namespace: spark-jobs
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: spark-jobs
  name: python-client-role
rules:
- apiGroups: [""]
  resources: ["configmaps", "pods", "pods/log", "pods/status", "services"]
  verbs: ["*"]
- apiGroups: ["networking.k8s.io"]
  resources: ["ingresses", "ingresses/status"]
  verbs: ["*"]
- apiGroups: ["sparkoperator.k8s.io"]
  resources: [sparkapplications]
  verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: python-client-role-binding
  namespace: spark-jobs
subjects:
- kind: ServiceAccount
  name: python-client-sa
  namespace: spark-jobs
roleRef:
  kind: Role
  name: python-client-role
  apiGroup: rbac.authorization.k8s.io
--------
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: node-reader
rules:
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list"]
--------
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: python-client-cluster-role-binding
subjects:
- kind: ServiceAccount
  name: python-client-sa
  namespace: spark-jobs
roleRef:
  kind: ClusterRole
  name: node-reader
  apiGroup: rbac.authorization.k8s.io
kubectl create -f k8s/python-client-sa-rbac.yaml

Эта команда создает новую учетную запись службы с именем Python-Client-SA новая роль с необходимыми разрешениями в Зажигание Пространство имен, а затем связывает новую роль на вновь созданную учетную запись услуг.

Предупреждение : Python-Client-SA Это учетная запись услуг, которая предоставит идентификацию для клиента Kubernetes Python в нашем приложении. Не путайте эту учетную запись услуг с Driver-Sa Учетная запись услуг для Pods драйверов.

Легкий путь

В этом методе мы можем использовать утилиту помощника для загрузки аутентификации и информации кластера из A KubeConfig Файл и хранить их в Kubernetes.client.configuration Отказ

from kubernetes import config, client

config.load_kube_config("path/to/kubeconfig_file")

v1 = client.CoreV1Api()

print("Listing pods with their IPs:")

ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
    print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))

Но мы Не Хотите полагаться на по умолчанию KubeConfig файл, обозначенный переменной среды KubeConfig Или, не в состоянии, в ~/.kube/config . Это KubeConfig Файл – это ваш, как пользователь kubectl команда. Конкретно, с этим KubeConfig Файл, вы имеете право делать практически все в кластере Kubernetes, а во всех пространствах имен. Вместо этого мы собираемся генерировать один, особенно для учетной записи службы, созданные выше, с помощью скрипта kubeconfig-gen.sh :

#!/usr/bin/env bash

# set -eux

# Reads the API server name from the default `kubeconfig` file.
# Here we suppose that the kubectl command-line tool is already configured to communicate with our cluster.
APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')

SERVICE_ACCOUNT_NAME=${1:-python-client-sa}
NAMESPACE=${2:-spark-jobs}
SECRET_NAME=$(kubectl get serviceaccount ${SERVICE_ACCOUNT_NAME} -n ${NAMESPACE} -o jsonpath='{.secrets[0].name}')
TOKEN=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath='{.data.token}' | base64 --decode)
CACERT=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath="{['data']['ca\.crt']}")


cat > kubeconfig-sa << EOF
apiVersion: v1
kind: Config
clusters:
- cluster:
    certificate-authority-data: ${CACERT}
    server: ${APISERVER}
  name: default-cluster
contexts:
- context:
    cluster: default-cluster
    namespace: ${NAMESPACE}
    user: ${SERVICE_ACCOUNT_NAME}
  name: default-context
current-context: default-context
users:
- user:
    token: ${TOKEN}
  name: ${SERVICE_ACCOUNT_NAME}
EOF

KubeConfig При этом создан файл настраивает доступ к кластеру для Python-Client-SA Сервисная учетная запись, имеющие только права, необходимые для нашего клиентского приложения и в одном пространстве имен Зажигание ( «Принцип наименее привилегии» ).

Трудный путь

Привлечь учетные данные

Здесь мы собираемся настроить клиент Python в наиболее программном порядке. Во-первых, нам нужно получить учетные данные для доступа к кластеру Kubernetes. Мы будем хранить их в переменных среды Python.

export APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
SECRET_NAME=$(kubectl get serviceaccount python-client-sa -o jsonpath='{.secrets[0].name}')
export TOKEN=$(kubectl get secret ${SECRET_NAME} -o jsonpath='{.data.token}' | base64 --decode)
export CACERT=$(kubectl get secret ${SECRET_NAME} -o jsonpath="{['data']['ca\.crt']}")

Обратите внимание, что переменные среды захватываются в первый раз ОС Модуль импортируется, как правило, во время начала IDE/Python. Изменения в среде, сделанные после этого времени, не отражаются в Os.environ (За исключением изменений, сделанных изменением OS.environ напрямую).

Использование образца Python

import base64
import os
from tempfile import NamedTemporaryFile

from kubernetes import client

api_server = os.environ["APISERVER"]
cacert = os.environ["CACERT"]
token = os.environ["TOKEN"]

# Set the configuration
configuration = client.Configuration()
with NamedTemporaryFile(delete=False) as cert:
    cert.write(base64.b64decode(cacert))
    configuration.ssl_ca_cert = cert.name
configuration.host = api_server
configuration.verify_ssl = True
configuration.debug = False
configuration.api_key = {"authorization": "Bearer " + token}
client.Configuration.set_default(configuration)

v1 = client.CoreV1Api()

print("Listing pods with their IPs:")

ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
    print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))

Начиная

Управление объектом Kubernetes

С помощью клиента Kubernetes Python вы можете создавать и управлять объектами Kubernetes Processed.

В следующем примере (предусмотрено в репозитории GitHub ), мы создаем, обновите, затем удалите A Развертывание Использование Appsv1api :

"""
Creates, updates, and deletes a deployment using AppsV1Api.
"""

from kubernetes import client, config

DEPLOYMENT_NAME = "nginx-deployment"


def create_deployment_object():
    # Configureate Pod template container
    container = client.V1Container(
        name="nginx",
        image="nginx:1.15.4",
        ports=[client.V1ContainerPort(container_port=80)],
        resources=client.V1ResourceRequirements(
            requests={"cpu": "100m", "memory": "200Mi"},
            limits={"cpu": "500m", "memory": "500Mi"}
        )
    )
    # Create and configurate a spec section
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
        spec=client.V1PodSpec(containers=[container]))
    # Create the specification of deployment
    spec = client.V1DeploymentSpec(
        replicas=3,
        template=template,
        selector={'matchLabels': {'app': 'nginx'}})
    # Instantiate the deployment object
    deployment = client.V1Deployment(
        api_version="apps/v1",
        kind="Deployment",
        metadata=client.V1ObjectMeta(name=DEPLOYMENT_NAME),
        spec=spec)

    return deployment


def create_deployment(api_instance, deployment):
    # Create deployement
    api_response = api_instance.create_namespaced_deployment(
        body=deployment,
        namespace="default")
    print("Deployment created. status='%s'" % str(api_response.status))


def update_deployment(api_instance, deployment):
    # Update container image
    deployment.spec.template.spec.containers[0].image = "nginx:1.16.0"
    # Update the deployment
    api_response = api_instance.patch_namespaced_deployment(
        name=DEPLOYMENT_NAME,
        namespace="default",
        body=deployment)
    print("Deployment updated. status='%s'" % str(api_response.status))


def delete_deployment(api_instance):
    # Delete deployment
    api_response = api_instance.delete_namespaced_deployment(
        name=DEPLOYMENT_NAME,
        namespace="default",
        body=client.V1DeleteOptions(
            propagation_policy='Foreground',
            grace_period_seconds=5))
    print("Deployment deleted. status='%s'" % str(api_response.status))


def main():
    # Configs can be set in Configuration class directly or using helper
    # utility. If no argument provided, the config will be loaded from
    # default location.
    config.load_kube_config("path/to/kubeconfig_file")
    apps_v1 = client.AppsV1Api()

    deployment = create_deployment_object()

    create_deployment(apps_v1, deployment)

    update_deployment(apps_v1, deployment)

    delete_deployment(apps_v1)


if __name__ == '__main__':
    main()

Это здорово, но это включает в себя освоение API клиента, и прежде всего, мы должны настроить наши объекты неизначно : Мы указываем желаемую работу (создать, заменить и т. Д.) на объектах Python, которые представляют объекты Кубератеса. Здесь мы предпочитаем управлять нашими объектами в декларативный Путь и работайте на файлах конфигурации объекта (хранится локально вдоль источника кода Python), как мы обычно делаем с kubectl команда. Действительно, код Python должен быть просто простой исполнительной вершиной для триггера операций Kubernetes, а Бизнес Логика, так сказать, должна быть сосредоточена в манифесте файлах.

Развертывание, которое мы создали выше, так же, как в nginx-deployment.yaml :

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.15.4
        ports:
        - containerPort: 80

Мы можем напрямую загрузить манифест следующим образом:

from os import path
import yaml
from kubernetes import client, config


config.load_kube_config("path/to/kubeconfig_file")

with open(path.join(path.dirname(__file__), "nginx-deployment.yaml")) as f:
    dep = yaml.safe_load(f)
    k8s_apps_v1 = client.AppsV1Api()
    resp = k8s_apps_v1.create_namespaced_deployment(
        body=dep, namespace="default")
    print("Deployment created. status='%s'" % resp.metadata.name)

Это эквивалент в Python kubectl Создать -F nginx-deployment.yaml Отказ

Как видите, вы должны позвонить create_namespace_deployment создать развертывание. Точно так же, вы бы назвали create_namepple_pod создать стручок и так далее. Это потому, что клиент Python автоматически генерируется после Openapi Технические характеристики API Kubernetes.

Приятно призвать определенный способ создания определенного типа объекта, даже если сам тип объекта уже указан в манифесте, который мы загружаем через этот метод. К счастью, клиент Kubernetes Python предоставляет метод утилиты, который действует как входной концентратор для любого вида объекта.

import os
import yaml
from kubernetes import client, config, utils


config.load_kube_config("path/to/kubeconfig_file")

with open(os.path.join(os.path.dirname(__file__), "nginx-deployment.yaml")) as f:
    dep = yaml.safe_load(f)
    k8s_client = client.ApiClient()
    resp = utils.create_from_dict(k8s_client, dep)
    print("Deployment created. status='%s'" % resp[0].metadata.name)

Utils.create_from_dict это магический метод здесь. Требуется только Дикт Держать действительные объекты Кубератеса. Это благословение, чтобы найти его, потому что он хорошо спрятан в клиенте и вообще не задокументирован.

Итак, чтобы запустить зажиму с помощью Spark-Doad, вы можете просто вызвать фрагмент кода выше с одним файлом YAML, который группируют все необходимые ресурсы (разделенные — в YAML).

Но как насчет оператора искры? Utils.create_from_dict. не поддерживает Пользовательские ресурсы , это означает, что типы объектов, которые не являются частью API Core Kubernetes, а именно Свечающее применение от оператора искры. Чтобы запустить искровую работу с помощью оператора Spark, у вас нет другого выбора, чем звонить create_namespace_custom_object Функция Customobjectsapi :

import os
import yaml
from kubernetes import client, config, utils


config.load_kube_config("path/to/kubeconfig_file")

with open(os.path.join(os.path.dirname(__file__), "k8s/spark-operator/pyspark-pi.yaml")) as f:
    dep = yaml.safe_load(f)
    custom_object_api = client.CustomObjectsApi()

    custom_object_api.create_namespaced_custom_object(
        group="sparkoperator.k8s.io",
        version="v1beta2",
        namespace="spark-jobs",
        plural="sparkapplications",
        body=dep,
    )
    print("SparkApplication created")

Относительно Spark-Poed, есть еще более прямой метод Utils.create_from_yaml , который читает объекты Kubernetes из файла yaml. Но мы не можем использовать его, так как нам нужно «параметризировать» наши файлы yaml, прежде чем отправлять их в клиент Kubernetes Python.

Как вы знаете, когда вы применяете файл манифеста в Kubernetes – описания ресурсов, отформатированные YAML, которые Cubernetes могут понять – вы должны указать имя ресурса, которое должно быть уникальным для этого типа ресурса (и в том же пространстве имен), в противном случае Kubernetes жалуется, что ресурс уже существует.

Например, у вас может быть только один POD по имени MyApp-1234 в том же пространстве имен, но вы можете иметь один POD и одно развертывание, которые каждый из них называется MyApp-1234 Отказ

Как мы хотим запустить несколько искровных заданий одновременно, и, как эти искры, в основном идентичны, за исключением нескольких параметров времени выполнения, нам нужно для параметризации, или шаблон наши файлы kubernetes yaml.

Обычно вы этого не делаете, по крайней мере, это не в философии Kubernetes: файлы Kubernetes должны быть бесплатными шаблонами и должны быть только исправлены, с помощью Kustomize например. Хелм Имеет также свою собственную систему шаблонов.

Вы не можете использовать такой инструмент с клиентом Kubernetes Python. Вместо этого мы собираемся заменить ссылки на переменные формы $ Var. или $ {Var} С соответствующими значениями, точно такими как envsubst. , Но программно.

Давайте вернемся к искру (родной). Ранее мы видели файл yaml, который определяет POD драйвера для запуска программы пример PI. Как видите, у нас есть заполнители, чтобы указать пространство имен , Приоритетноеклассное значение , serviceаccountname. , ideaffinity и а Name_suffix сделать имя POD уникальным.

Теперь в коде Python мы заменяем во время выполнения этих переменных с нужными значениями перед созданием POD:

import binascii
import os
from os import listdir
from pprint import pprint

import yaml
from kubernetes import client, config, utils


def create_k8s_object(yaml_file=None, env_subst=None):
    with open(yaml_file) as f:
        str = f.read()
        if env_subst:
            for env, value in env_subst.items():
                str = str.replace(env, value)
        return yaml.safe_load(str)


def main():
    # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config("path/to/kubeconfig_file")

    name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
    priority_class_name = "routine"
    env_subst = {"${NAMESPACE}": "spark-jobs",
                 "${SERVICE_ACCOUNT_NAME}": "driver-sa",
                 "${DRIVER_NODE_AFFINITIES}": "driver",
                 "${EXECUTOR_NODE_AFFINITIES}": "compute",
                 "${NAME_SUFFIX}": name_suffix,
                 "${PRIORITY_CLASS_NAME}": priority_class_name}

    k8s_client = client.ApiClient()
    verbose = True

    # Create driver pod
    k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
    k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
    pprint(k8s_object_dict)
    k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    # TODO: create the other resources

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))

if __name__ == "__main__":
    main()

Теперь у нас есть общая механика, и мы можем создать все необходимые ресурсы. Помните, что драйвер POD потребляет Configmap Чтобы определить переменные среды и для установки файлов конфигурации в контейнере Spark (включая шаблон для Pods Исполнителя). У нас также есть Сервис Это позволяет исполнителям взаимодействовать с драйвером. И, наконец, у нас есть еще один Сервис , поддерживаемый Вход , разоблачить зажиму Ui.

Мы просто повторяем для файлов YAML, которые определяют эти ресурсы и просто называют тот же метод Utils.create_from_dict :

    # List all YAML files in k8s/spark-native directory, except the driver pod definition file
    other_resources = listdir(k8s_dir)
    other_resources.remove("pyspark-pi-driver-pod.yaml")
    for f in other_resources:
        k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
        pprint(k8s_object_dict)
        utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))

Теперь, когда мы запустили полный Зажигание Давайте посмотрим, что произойдет, когда мы убьем его 😈 или когда приложение завершится нормально.

Убитые приложения

Роль сборщика мусора Kubernetes – удалить определенные объекты, которые когда-то имели владельца, но больше нет. Цель состоит в том, чтобы убедиться, что сборщик мусора правильно удаляет ресурсы, которые больше не нужны при убийстве приложения искры. Важно освободить ресурсы кластера Kubernetes, когда вы собираетесь запустить десятки/сотни искровых приложений параллельно.

Для этого некоторые объекты Kubernetes могут быть объявлены владельцами других объектов. ” Собственные «объекты называются зависимые на объекте владельца. Каждый зависимый объект имеет Metadata.ownerReference поле, которое указывает на объект владения. При удалении объекта владельца все зависимые объекты также автоматически удаляются ( Cascading Slealition ) по умолчанию.

Пример POD Исполнителя, принадлежащий его драйверу POD:

apiVersion: v1
kind: Pod
metadata:
  labels:
    spark-role: executor
  ownerReferences:
    - apiVersion: v1
      controller: true
      kind: Pod
      name: pyspark-pi-routine-0245dc3d340cd533-driver
      uid: 3b10fa97-c847-4fce-b3e1-71f779cffbef
...

Оператор Spark автоматически устанавливает значение OlludherReference На разных уровнях: пользовательский Свечающее применение Ресурс владеет драйвером POD, который владеет его исполнителями.

Для приложений, которые подаются родом (Без оператора Spark) объект владельца высшего уровня – это драйвер POD: Pods Executor автоматически устанавливает OlludherReference Поле, указывая на драйвер POD. Но мы должны управлять отношениями владения себя для других Configmap , Сервис и Вход Ресурсы. Для этого мы должны получить автоматическую UID Из недавно созданного драйвера POD и ввести его в зависимые объекты: невозможно вручную установить UID В файлах определения YAML это можно сделать только во время выполнения через код (и именно поэтому мы не можем поставить все ресурсы в один файл yaml).

import binascii
import os
from os import listdir
from pprint import pprint

import yaml
from kubernetes import config, utils
from kubernetes.client import ApiClient


def create_k8s_object(yaml_file=None, env_subst=None):
    with open(yaml_file) as f:
        str = f.read()
        if env_subst:
            for env in env_subst:
                str = str.replace(env, env_subst[env])
        return yaml.safe_load(str)


def main():
    # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config("path/to/kubeconfig_file")

    name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
    priority_class_name = "routine"
    env_subst = {"${NAMESPACE}": "spark-jobs",
                 "${SERVICE_ACCOUNT_NAME}": "driver-sa",
                 "${DRIVER_NODE_AFFINITIES}": "driver",
                 "${EXECUTOR_NODE_AFFINITIES}": "compute",
                 "${NAME_SUFFIX}": name_suffix,
                 "${PRIORITY_CLASS_NAME}": priority_class_name}

    k8s_client = ApiClient()
    verbose = True

    # Create driver pod
    k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
    k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
    pprint(k8s_object_dict)
    k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    # Prepare ownership on dependent objects
    owner_refs = [{"apiVersion": "v1",
                   "controller": True,
                   "kind": "Pod",
                   "name": k8s_objects[0].metadata.name,
                   "uid": k8s_objects[0].metadata.uid}]

    # List all YAML files in k8s/spark-native directory, except the driver pod definition file
    other_resources = listdir(k8s_dir)
    other_resources.remove("pyspark-pi-driver-pod.yaml")
    for f in other_resources:
        k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
        # Set ownership
        k8s_object_dict["metadata"]["ownerReferences"] = owner_refs
        pprint(k8s_object_dict)
        utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))

if __name__ == "__main__":
    main()

Приложения обычно завершены

Когда приложение завершается нормально, PODS исполнителя завершается и очищается, но POD драйвера сохраняется журналы и остается в «завершенном» состоянии в API Kubernetes «до тех пор, пока он в конечном итоге не будет собран или убран вручную».

Обратите внимание, что в завершенном состоянии драйвер POD не использует никаких вычислений или ресурсов памяти.

Оператор Spark имеет поддержку TTL для Свечания Через необязательное поле с именем .spec.timetoliveseconds , который, если установлен, определяет продолжительность Time-To-Live (TTL) в считанные секунды для Свеча после его прекращения. Свечающее применение Объект будет мусор, собранный, если текущее время больше, чем .spec.timetolivesecons. С момента его прекращения. Пример ниже иллюстрирует, как использовать поле:

spec:
  timeToLiveSeconds: 86400

На родной искренной стороне, в докторе нет ничего, что указывает, как срабатывает драйверы в конечном итоге удалены. Мы могли бы создать простые кубераты Cronjob Это будет периодически запускать, чтобы удалить их автоматически.

На момент написания этой статьи есть ожидающие запросы в Kubernetes для поддержки TTL в Стручки как в Рабочие места : «Контроллер TTL только обрабатывает задания в настоящее время и может быть расширен для обработки других ресурсов, которые завершит выполнение, такие как PODS и пользовательские ресурсы».

Предпосылка каскадного удаления

При убийстве приложения из кода Python мы удаляем объект владельца с помощью Фон Каскадная политика удаления. В фоновом фоне каскадного удаления Kubernetes немедленно удаляет объект владельца, а сборщик мусора затем удаляет иждивенцев на заднем плане. Это полезно, чтобы не отложить основную тему выполнения.

Чтобы удалить искровую работу, запущенную Spark-Отправить :

from kubernetes import client


core_v1_api = client.CoreV1Api()
core_v1_api.delete_namespaced_pod("driver-pod-name", "spark-jobs", propagation_policy="Background")

Чтобы удалить искровую работу, запущенную с оператором Spark, мы должны удалить ограждение Свечающее применение ресурс:

from kubernetes import client, config

custom_object_api = client.CustomObjectsApi()
custom_object_api.delete_namespaced_custom_object(
    group="sparkoperator.k8s.io",
    version="v1beta2",
    namespace="spark-jobs",
    plural="sparkapplications",
    name="app_name",
    propagation_policy="Background")

Будь то вулкан или по умолчанию Kube-планировщик Удаление работы полагается на приоритеты работы. Для двух заданий планировщик решает, чья приоритет выше, сравнивая .spec.priorityClassname (Тогда CreateTime ).

Приоритет распространяется на Pods водителя и исполнителя, будь то с собственной искрой или с помощью оператора SPARE, и независимо от близости узла.

Как узнать, какие стручки были выгружены

Вы можете получить информацию высокого уровня по происходящему в кластере. Чтобы перечислить все события в пространстве имен Зажигание ты можешь использовать:

# List Events sorted by timestamp
kubectl get events --sort-by=.metadata.creationTimestamp --namespace=spark-jobs
LAST SEEN   TYPE      REASON             OBJECT                                                   MESSAGE
69s         Normal    Scheduled          podgroup/podgroup-6083b4f9-f240-4a0e-95f2-06882aec2942   pod group is ready
93s         Normal    Scheduled          pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Successfully assigned spark-jobs/pyspark-pi-driver-routine-bf20cae50b6a8253 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
92s         Normal    Started            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Started container pyspark-pi
92s         Normal    Pulled             pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
92s         Normal    Created            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Created container pyspark-pi
82s         Normal    Scheduled          pod/pythonpi-34b3597593d246a1-exec-2                     Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
82s         Normal    Scheduled          pod/pythonpi-34b3597593d246a1-exec-1                     Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
82s         Normal    Created            pod/pythonpi-34b3597593d246a1-exec-1                     Created container spark-kubernetes-executor
82s         Normal    Started            pod/pythonpi-34b3597593d246a1-exec-1                     Started container spark-kubernetes-executor
82s         Normal    Pulled             pod/pythonpi-34b3597593d246a1-exec-1                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
81s         Normal    Created            pod/pythonpi-34b3597593d246a1-exec-2                     Created container spark-kubernetes-executor
81s         Normal    Pulled             pod/pythonpi-34b3597593d246a1-exec-2                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
80s         Normal    Started            pod/pythonpi-34b3597593d246a1-exec-2                     Started container spark-kubernetes-executor
42s         Normal    Killing            pod/pythonpi-34b3597593d246a1-exec-1                     Stopping container spark-kubernetes-executor
42s         Normal    Killing            pod/pythonpi-34b3597593d246a1-exec-2                     Stopping container spark-kubernetes-executor
42s         Warning   FailedScheduling   pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              all nodes are unavailable: 3 node(s) resource fit failed.
42s         Normal    Killing            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Stopping container pyspark-pi
42s         Warning   Evict              pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Pod is evicted, because of preempt
34s         Warning   Unschedulable      podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816   0/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable.
41s         Warning   FailedScheduling   pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              1/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable.
18s         Normal    Scheduled          podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816   pod group is ready
33s         Normal    Scheduled          pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Successfully assigned spark-jobs/pyspark-pi-driver-rush-bb25fc0b8efe7c4d to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
32s         Normal    Started            pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Started container pyspark-pi
32s         Normal    Created            pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Created container pyspark-pi
32s         Normal    Pulled             pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
22s         Normal    Scheduled          pod/pythonpi-f36cce7593d332f1-exec-1                     Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
22s         Normal    Scheduled          pod/pythonpi-f36cce7593d332f1-exec-2                     Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
22s         Normal    Pulled             pod/pythonpi-f36cce7593d332f1-exec-1                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
21s         Normal    Started            pod/pythonpi-f36cce7593d332f1-exec-1                     Started container spark-kubernetes-executor
21s         Normal    Created            pod/pythonpi-f36cce7593d332f1-exec-1                     Created container spark-kubernetes-executor
21s         Normal    Pulled             pod/pythonpi-f36cce7593d332f1-exec-2                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
21s         Normal    Created            pod/pythonpi-f36cce7593d332f1-exec-2                     Created container spark-kubernetes-executor
21s         Normal    Started            pod/pythonpi-f36cce7593d332f1-exec-2                     Started container spark-kubernetes-executor

На выходе выше, мы видим, что POD Pyspark-Pi-Driver-Rigines-BF20CAE50B6A8253 был «выселен из-за упреждений» другой работой с приоритетом «спешной».

Будущая работа

Kubernetes обеспечивает контейнеры с крючками жизненного цикла. Крючки позволяют контейнерам осознавать события в их жизненном цикле управления и запущенным кодом, реализованным в обработчике, когда выполняется соответствующий крючок жизненного цикла.

В частности, Престоп Крючок можно назвать непосредственно перед прекращением контейнера из-за упрека (среди других событий). Таким образом, мы можем рассмотреть действие, что бы она ни было, чтобы быть вызванным в случае упреждения. Все, что вам нужно сделать, это реализовать и зарегистрировать обработчик для этого крючка.

Посмотреть Крючки жизненного цикла контейнера Отказ

Мы скоро дойдем до конца нашего путешествия. Прежде чем желать вам спокойной ночи, давайте посмотрим на то, как мы можем отслеживать искровое приложение из кода Python.

Получение статуса искровой заявки

Многие операции в клиенте Kubernetes Python могут быть Смотрел Отказ Это позволяет нашу программу Python наблюдать за изменениями конкретного ресурса, пока не получите желаемый результат или истечения срока действия часов.

Здесь мы хотим отслеживать жизненный цикл драйвера POD, начиная с В ожидании фаза, движущаяся через Бег Если зажигающий контейнер начинается нормально, а затем через либо Преуспел или Не удалось Фазы:

from kubernetes import client, watch

app_name = 'pyspark-pi-routine-bf20cae50b6a8253'
v1 = client.CoreV1Api()

count = 2
w = watch.Watch()
label_selector = 'app-name=%s,spark-role=driver' % app_name
for event in w.stream(v1.list_namespaced_pod, namespace='spark-jobs', label_selector=label_selector, timeout_seconds=60):
    print('Event: %s' % event['object'].status.phase)
    count -= 1
    if not count:
        w.stop()

Здесь, ожидается, что водитель POD (отсюда, зажигание), как ожидается, будет выполнен, успешно или нет, в течение 60 секунд или меньше. Его статус должен меняться только дважды в течение этого периода: в идеале В ожидании > Бег > Преуспел Отказ

Получение журналов

Можно извлечь журналы драйвера POD и смешать их в те из хост-приложения. Получение журналов так же просто:

from kubernetes import client, watch
from threading import Thread

v1 = client.CoreV1Api()
pod_name = 'pyspark-pi-routine-bf20cae50b6a8253-driver'

def logs(pod_name):
    w = watch.Watch()
    for event in w.stream(v1.read_namespaced_pod_log, pod_name=pod_name, namespace='spark-jobs', _request_timeout=300):
        yield event

# We surely don't want to block the main thread while reading the logs
def consumer():
    for log in logs(pod_name):
        print(log)

t = Thread(target=consumer)
t.start()

Получение вхождения URI

Как только исходное приложение началось, проникновение (по меньшей мере, его публичный IP-адрес), который обнажает UI SPORK, может занять некоторое время, прежде чем она станет доступным. Здесь также мы можем следить за ресурсом входа следующим образом:

from kubernetes import client, watch

app_name = 'pyspark-pi-routine-bf20cae50b6a8253'

networking_v1_beta1_api = client.NetworkingV1beta1Api()
w = watch.Watch()
label_selector = 'app-name=%s' % app_name
for event in w.stream(networking_v1_beta1_api.list_namespaced_ingress, namespace=namespace,
                      label_selector=label_selector,
                      timeout_seconds=30):
    ingress = event['object'].status.load_balancer.ingress
    if ingress:
        external_ip = ingress[0].ip
        print('Event: The Spark Web UI is available at http://%s/%s' % (external_ip, app_name))
        w.stop()
    else:
        print('Event: Ingress not yet available')

Что, черт возьми!

Мы видели, что код Python для запуска или удаления зажигательных приложений немного отличается в зависимости от того, используете ли мы оператор Spark или Spark-sead. Но так как мы называем и отметим объекты Kubernetes последовательно между ними, и, поскольку мы устанавливаем правильные отношения владения, мы можем контролировать наши зажимать приложения и управлять своим жизненным циклом одинаково.

Хочешь узнать больше? Сценарии Python, объясненные в этой последней статье, доступны в этом Репозиторий GitHub . Служить себе.

Оригинал: “https://dev.to/stack-labs/my-journey-with-spark-on-kubernetes-in-python-3-3-536e”