Нам нужно управлять Kubernetes как часть клиентского приложения Python. Итак, нам нужно взаимодействовать с api api kubernetes. К счастью, нам не нужно реализовывать вызовы API и управлять HTTP-запросами/ответами самими собой: мы можем положиться на Kubernetes Python Client , Среди других официально поддерживаемых клиентских библиотек Kubernetes для других языков, таких как GO, Java, .NET, JavaScript и haskell (есть также много общественных библиотек клиентских библиотек для многих языков).
Клиент Kubernetes Python совместим с Python 2.7 и 3.4+. Смотрите Матрица совместимости для поддерживаемых версий Куберов.
При использовании клиентской библиотеки мы должны сначала загрузить информацию о аутентификации и кластере.
Загрузить аутентификацию и информацию о кластере
Во-первых, вам необходимо настроить необходимую учетную запись услуг и роли.
K8S/Python-Client-SA-RBAC.YAML
apiVersion: v1 kind: ServiceAccount metadata: name: python-client-sa namespace: spark-jobs --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: spark-jobs name: python-client-role rules: - apiGroups: [""] resources: ["configmaps", "pods", "pods/log", "pods/status", "services"] verbs: ["*"] - apiGroups: ["networking.k8s.io"] resources: ["ingresses", "ingresses/status"] verbs: ["*"] - apiGroups: ["sparkoperator.k8s.io"] resources: [sparkapplications] verbs: ["*"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: python-client-role-binding namespace: spark-jobs subjects: - kind: ServiceAccount name: python-client-sa namespace: spark-jobs roleRef: kind: Role name: python-client-role apiGroup: rbac.authorization.k8s.io -------- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: node-reader rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list"] -------- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: python-client-cluster-role-binding subjects: - kind: ServiceAccount name: python-client-sa namespace: spark-jobs roleRef: kind: ClusterRole name: node-reader apiGroup: rbac.authorization.k8s.io
kubectl create -f k8s/python-client-sa-rbac.yaml
Эта команда создает новую учетную запись службы с именем Python-Client-SA
новая роль с необходимыми разрешениями в Зажигание
Пространство имен, а затем связывает новую роль на вновь созданную учетную запись услуг.
Предупреждение : Python-Client-SA
Это учетная запись услуг, которая предоставит идентификацию для клиента Kubernetes Python в нашем приложении. Не путайте эту учетную запись услуг с Driver-Sa
Учетная запись услуг для Pods драйверов.
Легкий путь
В этом методе мы можем использовать утилиту помощника для загрузки аутентификации и информации кластера из A KubeConfig
Файл и хранить их в Kubernetes.client.configuration
Отказ
from kubernetes import config, client config.load_kube_config("path/to/kubeconfig_file") v1 = client.CoreV1Api() print("Listing pods with their IPs:") ret = v1.list_namespaced_pod(namespace="spark-jobs") for i in ret.items: print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
Но мы Не Хотите полагаться на по умолчанию KubeConfig
файл, обозначенный переменной среды KubeConfig
Или, не в состоянии, в ~/.kube/config
. Это KubeConfig
Файл – это ваш, как пользователь kubectl
команда. Конкретно, с этим KubeConfig
Файл, вы имеете право делать практически все в кластере Kubernetes, а во всех пространствах имен. Вместо этого мы собираемся генерировать один, особенно для учетной записи службы, созданные выше, с помощью скрипта kubeconfig-gen.sh
:
#!/usr/bin/env bash # set -eux # Reads the API server name from the default `kubeconfig` file. # Here we suppose that the kubectl command-line tool is already configured to communicate with our cluster. APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}') SERVICE_ACCOUNT_NAME=${1:-python-client-sa} NAMESPACE=${2:-spark-jobs} SECRET_NAME=$(kubectl get serviceaccount ${SERVICE_ACCOUNT_NAME} -n ${NAMESPACE} -o jsonpath='{.secrets[0].name}') TOKEN=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath='{.data.token}' | base64 --decode) CACERT=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath="{['data']['ca\.crt']}") cat > kubeconfig-sa << EOF apiVersion: v1 kind: Config clusters: - cluster: certificate-authority-data: ${CACERT} server: ${APISERVER} name: default-cluster contexts: - context: cluster: default-cluster namespace: ${NAMESPACE} user: ${SERVICE_ACCOUNT_NAME} name: default-context current-context: default-context users: - user: token: ${TOKEN} name: ${SERVICE_ACCOUNT_NAME} EOF
KubeConfig
При этом создан файл настраивает доступ к кластеру для Python-Client-SA
Сервисная учетная запись, имеющие только права, необходимые для нашего клиентского приложения и в одном пространстве имен Зажигание
( «Принцип наименее привилегии» ).
Трудный путь
Привлечь учетные данные
Здесь мы собираемся настроить клиент Python в наиболее программном порядке. Во-первых, нам нужно получить учетные данные для доступа к кластеру Kubernetes. Мы будем хранить их в переменных среды Python.
export APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}') SECRET_NAME=$(kubectl get serviceaccount python-client-sa -o jsonpath='{.secrets[0].name}') export TOKEN=$(kubectl get secret ${SECRET_NAME} -o jsonpath='{.data.token}' | base64 --decode) export CACERT=$(kubectl get secret ${SECRET_NAME} -o jsonpath="{['data']['ca\.crt']}")
Обратите внимание, что переменные среды захватываются в первый раз ОС
Модуль импортируется, как правило, во время начала IDE/Python. Изменения в среде, сделанные после этого времени, не отражаются в Os.environ
(За исключением изменений, сделанных изменением OS.environ напрямую).
Использование образца Python
import base64 import os from tempfile import NamedTemporaryFile from kubernetes import client api_server = os.environ["APISERVER"] cacert = os.environ["CACERT"] token = os.environ["TOKEN"] # Set the configuration configuration = client.Configuration() with NamedTemporaryFile(delete=False) as cert: cert.write(base64.b64decode(cacert)) configuration.ssl_ca_cert = cert.name configuration.host = api_server configuration.verify_ssl = True configuration.debug = False configuration.api_key = {"authorization": "Bearer " + token} client.Configuration.set_default(configuration) v1 = client.CoreV1Api() print("Listing pods with their IPs:") ret = v1.list_namespaced_pod(namespace="spark-jobs") for i in ret.items: print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
Начиная
Управление объектом Kubernetes
С помощью клиента Kubernetes Python вы можете создавать и управлять объектами Kubernetes Processed.
В следующем примере (предусмотрено в репозитории GitHub ), мы создаем, обновите, затем удалите A Развертывание Использование
Appsv1api :
""" Creates, updates, and deletes a deployment using AppsV1Api. """ from kubernetes import client, config DEPLOYMENT_NAME = "nginx-deployment" def create_deployment_object(): # Configureate Pod template container container = client.V1Container( name="nginx", image="nginx:1.15.4", ports=[client.V1ContainerPort(container_port=80)], resources=client.V1ResourceRequirements( requests={"cpu": "100m", "memory": "200Mi"}, limits={"cpu": "500m", "memory": "500Mi"} ) ) # Create and configurate a spec section template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "nginx"}), spec=client.V1PodSpec(containers=[container])) # Create the specification of deployment spec = client.V1DeploymentSpec( replicas=3, template=template, selector={'matchLabels': {'app': 'nginx'}}) # Instantiate the deployment object deployment = client.V1Deployment( api_version="apps/v1", kind="Deployment", metadata=client.V1ObjectMeta(name=DEPLOYMENT_NAME), spec=spec) return deployment def create_deployment(api_instance, deployment): # Create deployement api_response = api_instance.create_namespaced_deployment( body=deployment, namespace="default") print("Deployment created. status='%s'" % str(api_response.status)) def update_deployment(api_instance, deployment): # Update container image deployment.spec.template.spec.containers[0].image = "nginx:1.16.0" # Update the deployment api_response = api_instance.patch_namespaced_deployment( name=DEPLOYMENT_NAME, namespace="default", body=deployment) print("Deployment updated. status='%s'" % str(api_response.status)) def delete_deployment(api_instance): # Delete deployment api_response = api_instance.delete_namespaced_deployment( name=DEPLOYMENT_NAME, namespace="default", body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=5)) print("Deployment deleted. status='%s'" % str(api_response.status)) def main(): # Configs can be set in Configuration class directly or using helper # utility. If no argument provided, the config will be loaded from # default location. config.load_kube_config("path/to/kubeconfig_file") apps_v1 = client.AppsV1Api() deployment = create_deployment_object() create_deployment(apps_v1, deployment) update_deployment(apps_v1, deployment) delete_deployment(apps_v1) if __name__ == '__main__': main()
Это здорово, но это включает в себя освоение API клиента, и прежде всего, мы должны настроить наши объекты неизначно : Мы указываем желаемую работу (создать, заменить и т. Д.) на объектах Python, которые представляют объекты Кубератеса. Здесь мы предпочитаем управлять нашими объектами в декларативный Путь и работайте на файлах конфигурации объекта (хранится локально вдоль источника кода Python), как мы обычно делаем с kubectl
команда. Действительно, код Python должен быть просто простой исполнительной вершиной для триггера операций Kubernetes, а Бизнес Логика, так сказать, должна быть сосредоточена в манифесте файлах.
Развертывание, которое мы создали выше, так же, как в nginx-deployment.yaml
:
apiVersion: apps/v1 kind: Deployment metadata: name: nginx-deployment labels: app: nginx spec: replicas: 3 selector: matchLabels: app: nginx template: metadata: labels: app: nginx spec: containers: - name: nginx image: nginx:1.15.4 ports: - containerPort: 80
Мы можем напрямую загрузить манифест следующим образом:
from os import path import yaml from kubernetes import client, config config.load_kube_config("path/to/kubeconfig_file") with open(path.join(path.dirname(__file__), "nginx-deployment.yaml")) as f: dep = yaml.safe_load(f) k8s_apps_v1 = client.AppsV1Api() resp = k8s_apps_v1.create_namespaced_deployment( body=dep, namespace="default") print("Deployment created. status='%s'" % resp.metadata.name)
Это эквивалент в Python kubectl Создать -F nginx-deployment.yaml
Отказ
Как видите, вы должны позвонить create_namespace_deployment
создать развертывание. Точно так же, вы бы назвали create_namepple_pod
создать стручок и так далее. Это потому, что клиент Python автоматически генерируется после Openapi
Технические характеристики API Kubernetes.
Приятно призвать определенный способ создания определенного типа объекта, даже если сам тип объекта уже указан в манифесте, который мы загружаем через этот метод. К счастью, клиент Kubernetes Python предоставляет метод утилиты, который действует как входной концентратор для любого вида объекта.
import os import yaml from kubernetes import client, config, utils config.load_kube_config("path/to/kubeconfig_file") with open(os.path.join(os.path.dirname(__file__), "nginx-deployment.yaml")) as f: dep = yaml.safe_load(f) k8s_client = client.ApiClient() resp = utils.create_from_dict(k8s_client, dep) print("Deployment created. status='%s'" % resp[0].metadata.name)
Utils.create_from_dict
это магический метод здесь. Требуется только Дикт
Держать действительные объекты Кубератеса. Это благословение, чтобы найти его, потому что он хорошо спрятан в клиенте и вообще не задокументирован.
Итак, чтобы запустить зажиму с помощью Spark-Doad, вы можете просто вызвать фрагмент кода выше с одним файлом YAML, который группируют все необходимые ресурсы (разделенные — в YAML).
Но как насчет оператора искры? Utils.create_from_dict.
не поддерживает Пользовательские ресурсы , это означает, что типы объектов, которые не являются частью API Core Kubernetes, а именно Свечающее применение
от оператора искры. Чтобы запустить искровую работу с помощью оператора Spark, у вас нет другого выбора, чем звонить create_namespace_custom_object
Функция Customobjectsapi
:
import os import yaml from kubernetes import client, config, utils config.load_kube_config("path/to/kubeconfig_file") with open(os.path.join(os.path.dirname(__file__), "k8s/spark-operator/pyspark-pi.yaml")) as f: dep = yaml.safe_load(f) custom_object_api = client.CustomObjectsApi() custom_object_api.create_namespaced_custom_object( group="sparkoperator.k8s.io", version="v1beta2", namespace="spark-jobs", plural="sparkapplications", body=dep, ) print("SparkApplication created")
Относительно Spark-Poed, есть еще более прямой метод Utils.create_from_yaml
, который читает объекты Kubernetes из файла yaml. Но мы не можем использовать его, так как нам нужно «параметризировать» наши файлы yaml, прежде чем отправлять их в клиент Kubernetes Python.
Как вы знаете, когда вы применяете файл манифеста в Kubernetes – описания ресурсов, отформатированные YAML, которые Cubernetes могут понять – вы должны указать имя ресурса, которое должно быть уникальным для этого типа ресурса (и в том же пространстве имен), в противном случае Kubernetes жалуется, что ресурс уже существует.
Например, у вас может быть только один POD по имени MyApp-1234
в том же пространстве имен, но вы можете иметь один POD и одно развертывание, которые каждый из них называется MyApp-1234
Отказ
Как мы хотим запустить несколько искровных заданий одновременно, и, как эти искры, в основном идентичны, за исключением нескольких параметров времени выполнения, нам нужно для параметризации, или шаблон наши файлы kubernetes yaml.
Обычно вы этого не делаете, по крайней мере, это не в философии Kubernetes: файлы Kubernetes должны быть бесплатными шаблонами и должны быть только исправлены, с помощью Kustomize например. Хелм Имеет также свою собственную систему шаблонов.
Вы не можете использовать такой инструмент с клиентом Kubernetes Python. Вместо этого мы собираемся заменить ссылки на переменные формы $ Var.
или $ {Var}
С соответствующими значениями, точно такими как envsubst. , Но программно.
Давайте вернемся к искру (родной). Ранее мы видели файл yaml, который определяет POD драйвера для запуска программы пример PI. Как видите, у нас есть заполнители, чтобы указать пространство имен
, Приоритетноеклассное значение
, serviceаccountname.
, ideaffinity
и а Name_suffix
сделать имя POD уникальным.
Теперь в коде Python мы заменяем во время выполнения этих переменных с нужными значениями перед созданием POD:
import binascii import os from os import listdir from pprint import pprint import yaml from kubernetes import client, config, utils def create_k8s_object(yaml_file=None, env_subst=None): with open(yaml_file) as f: str = f.read() if env_subst: for env, value in env_subst.items(): str = str.replace(env, value) return yaml.safe_load(str) def main(): # Configs can be set in Configuration class directly or using helper utility config.load_kube_config("path/to/kubeconfig_file") name_suffix = "-" + binascii.b2a_hex(os.urandom(8)) priority_class_name = "routine" env_subst = {"${NAMESPACE}": "spark-jobs", "${SERVICE_ACCOUNT_NAME}": "driver-sa", "${DRIVER_NODE_AFFINITIES}": "driver", "${EXECUTOR_NODE_AFFINITIES}": "compute", "${NAME_SUFFIX}": name_suffix, "${PRIORITY_CLASS_NAME}": priority_class_name} k8s_client = client.ApiClient() verbose = True # Create driver pod k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native") k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst) pprint(k8s_object_dict) k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose) # TODO: create the other resources print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"])) if __name__ == "__main__": main()
Теперь у нас есть общая механика, и мы можем создать все необходимые ресурсы. Помните, что драйвер POD потребляет Configmap
Чтобы определить переменные среды и для установки файлов конфигурации в контейнере Spark (включая шаблон для Pods Исполнителя). У нас также есть Сервис
Это позволяет исполнителям взаимодействовать с драйвером. И, наконец, у нас есть еще один Сервис
, поддерживаемый Вход
, разоблачить зажиму Ui.
Мы просто повторяем для файлов YAML, которые определяют эти ресурсы и просто называют тот же метод Utils.create_from_dict
:
# List all YAML files in k8s/spark-native directory, except the driver pod definition file other_resources = listdir(k8s_dir) other_resources.remove("pyspark-pi-driver-pod.yaml") for f in other_resources: k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst) pprint(k8s_object_dict) utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose) print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))
Теперь, когда мы запустили полный Зажигание Давайте посмотрим, что произойдет, когда мы убьем его 😈 или когда приложение завершится нормально.
Убитые приложения
Роль сборщика мусора Kubernetes – удалить определенные объекты, которые когда-то имели владельца, но больше нет. Цель состоит в том, чтобы убедиться, что сборщик мусора правильно удаляет ресурсы, которые больше не нужны при убийстве приложения искры. Важно освободить ресурсы кластера Kubernetes, когда вы собираетесь запустить десятки/сотни искровых приложений параллельно.
Для этого некоторые объекты Kubernetes могут быть объявлены владельцами других объектов. ” Собственные «объекты называются зависимые на объекте владельца. Каждый зависимый объект имеет Metadata.ownerReference
поле, которое указывает на объект владения. При удалении объекта владельца все зависимые объекты также автоматически удаляются ( Cascading Slealition ) по умолчанию.
Пример POD Исполнителя, принадлежащий его драйверу POD:
apiVersion: v1 kind: Pod metadata: labels: spark-role: executor ownerReferences: - apiVersion: v1 controller: true kind: Pod name: pyspark-pi-routine-0245dc3d340cd533-driver uid: 3b10fa97-c847-4fce-b3e1-71f779cffbef ...
Оператор Spark автоматически устанавливает значение OlludherReference
На разных уровнях: пользовательский Свечающее применение
Ресурс владеет драйвером POD, который владеет его исполнителями.
Для приложений, которые подаются родом (Без оператора Spark) объект владельца высшего уровня – это драйвер POD: Pods Executor автоматически устанавливает OlludherReference
Поле, указывая на драйвер POD. Но мы должны управлять отношениями владения себя для других Configmap
, Сервис
и Вход
Ресурсы. Для этого мы должны получить автоматическую UID
Из недавно созданного драйвера POD и ввести его в зависимые объекты: невозможно вручную установить UID
В файлах определения YAML это можно сделать только во время выполнения через код (и именно поэтому мы не можем поставить все ресурсы в один файл yaml).
import binascii import os from os import listdir from pprint import pprint import yaml from kubernetes import config, utils from kubernetes.client import ApiClient def create_k8s_object(yaml_file=None, env_subst=None): with open(yaml_file) as f: str = f.read() if env_subst: for env in env_subst: str = str.replace(env, env_subst[env]) return yaml.safe_load(str) def main(): # Configs can be set in Configuration class directly or using helper utility config.load_kube_config("path/to/kubeconfig_file") name_suffix = "-" + binascii.b2a_hex(os.urandom(8)) priority_class_name = "routine" env_subst = {"${NAMESPACE}": "spark-jobs", "${SERVICE_ACCOUNT_NAME}": "driver-sa", "${DRIVER_NODE_AFFINITIES}": "driver", "${EXECUTOR_NODE_AFFINITIES}": "compute", "${NAME_SUFFIX}": name_suffix, "${PRIORITY_CLASS_NAME}": priority_class_name} k8s_client = ApiClient() verbose = True # Create driver pod k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native") k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst) pprint(k8s_object_dict) k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose) # Prepare ownership on dependent objects owner_refs = [{"apiVersion": "v1", "controller": True, "kind": "Pod", "name": k8s_objects[0].metadata.name, "uid": k8s_objects[0].metadata.uid}] # List all YAML files in k8s/spark-native directory, except the driver pod definition file other_resources = listdir(k8s_dir) other_resources.remove("pyspark-pi-driver-pod.yaml") for f in other_resources: k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst) # Set ownership k8s_object_dict["metadata"]["ownerReferences"] = owner_refs pprint(k8s_object_dict) utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose) print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"])) if __name__ == "__main__": main()
Приложения обычно завершены
Когда приложение завершается нормально, PODS исполнителя завершается и очищается, но POD драйвера сохраняется журналы и остается в «завершенном» состоянии в API Kubernetes «до тех пор, пока он в конечном итоге не будет собран или убран вручную».
Обратите внимание, что в завершенном состоянии драйвер POD не использует никаких вычислений или ресурсов памяти.
Оператор Spark имеет поддержку TTL для Свечания
Через необязательное поле с именем .spec.timetoliveseconds
, который, если установлен, определяет продолжительность Time-To-Live (TTL) в считанные секунды для Свеча
после его прекращения. Свечающее применение
Объект будет мусор, собранный, если текущее время больше, чем .spec.timetolivesecons.
С момента его прекращения. Пример ниже иллюстрирует, как использовать поле:
spec: timeToLiveSeconds: 86400
На родной искренной стороне, в докторе нет ничего, что указывает, как срабатывает драйверы в конечном итоге удалены. Мы могли бы создать простые кубераты Cronjob
Это будет периодически запускать, чтобы удалить их автоматически.
На момент написания этой статьи есть ожидающие запросы в Kubernetes для поддержки TTL в Стручки
как в Рабочие места
: «Контроллер TTL только обрабатывает задания в настоящее время и может быть расширен для обработки других ресурсов, которые завершит выполнение, такие как PODS и пользовательские ресурсы».
Предпосылка каскадного удаления
При убийстве приложения из кода Python мы удаляем объект владельца с помощью Фон Каскадная политика удаления. В фоновом фоне каскадного удаления Kubernetes немедленно удаляет объект владельца, а сборщик мусора затем удаляет иждивенцев на заднем плане. Это полезно, чтобы не отложить основную тему выполнения.
Чтобы удалить искровую работу, запущенную Spark-Отправить
:
from kubernetes import client core_v1_api = client.CoreV1Api() core_v1_api.delete_namespaced_pod("driver-pod-name", "spark-jobs", propagation_policy="Background")
Чтобы удалить искровую работу, запущенную с оператором Spark, мы должны удалить ограждение Свечающее применение
ресурс:
from kubernetes import client, config custom_object_api = client.CustomObjectsApi() custom_object_api.delete_namespaced_custom_object( group="sparkoperator.k8s.io", version="v1beta2", namespace="spark-jobs", plural="sparkapplications", name="app_name", propagation_policy="Background")
Будь то вулкан или по умолчанию Kube-планировщик
Удаление работы полагается на приоритеты работы. Для двух заданий планировщик решает, чья приоритет выше, сравнивая .spec.priorityClassname
(Тогда CreateTime
).
Приоритет распространяется на Pods водителя и исполнителя, будь то с собственной искрой или с помощью оператора SPARE, и независимо от близости узла.
Как узнать, какие стручки были выгружены
Вы можете получить информацию высокого уровня по происходящему в кластере. Чтобы перечислить все события в пространстве имен Зажигание
ты можешь использовать:
# List Events sorted by timestamp kubectl get events --sort-by=.metadata.creationTimestamp --namespace=spark-jobs
LAST SEEN TYPE REASON OBJECT MESSAGE 69s Normal Scheduled podgroup/podgroup-6083b4f9-f240-4a0e-95f2-06882aec2942 pod group is ready 93s Normal Scheduled pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Successfully assigned spark-jobs/pyspark-pi-driver-routine-bf20cae50b6a8253 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp 92s Normal Started pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Started container pyspark-pi 92s Normal Pulled pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 92s Normal Created pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Created container pyspark-pi 82s Normal Scheduled pod/pythonpi-34b3597593d246a1-exec-2 Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl 82s Normal Scheduled pod/pythonpi-34b3597593d246a1-exec-1 Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck 82s Normal Created pod/pythonpi-34b3597593d246a1-exec-1 Created container spark-kubernetes-executor 82s Normal Started pod/pythonpi-34b3597593d246a1-exec-1 Started container spark-kubernetes-executor 82s Normal Pulled pod/pythonpi-34b3597593d246a1-exec-1 Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 81s Normal Created pod/pythonpi-34b3597593d246a1-exec-2 Created container spark-kubernetes-executor 81s Normal Pulled pod/pythonpi-34b3597593d246a1-exec-2 Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 80s Normal Started pod/pythonpi-34b3597593d246a1-exec-2 Started container spark-kubernetes-executor 42s Normal Killing pod/pythonpi-34b3597593d246a1-exec-1 Stopping container spark-kubernetes-executor 42s Normal Killing pod/pythonpi-34b3597593d246a1-exec-2 Stopping container spark-kubernetes-executor 42s Warning FailedScheduling pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d all nodes are unavailable: 3 node(s) resource fit failed. 42s Normal Killing pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Stopping container pyspark-pi 42s Warning Evict pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Pod is evicted, because of preempt 34s Warning Unschedulable podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816 0/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable. 41s Warning FailedScheduling pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 1/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable. 18s Normal Scheduled podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816 pod group is ready 33s Normal Scheduled pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d Successfully assigned spark-jobs/pyspark-pi-driver-rush-bb25fc0b8efe7c4d to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp 32s Normal Started pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d Started container pyspark-pi 32s Normal Created pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d Created container pyspark-pi 32s Normal Pulled pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 22s Normal Scheduled pod/pythonpi-f36cce7593d332f1-exec-1 Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck 22s Normal Scheduled pod/pythonpi-f36cce7593d332f1-exec-2 Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl 22s Normal Pulled pod/pythonpi-f36cce7593d332f1-exec-1 Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 21s Normal Started pod/pythonpi-f36cce7593d332f1-exec-1 Started container spark-kubernetes-executor 21s Normal Created pod/pythonpi-f36cce7593d332f1-exec-1 Created container spark-kubernetes-executor 21s Normal Pulled pod/pythonpi-f36cce7593d332f1-exec-2 Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine 21s Normal Created pod/pythonpi-f36cce7593d332f1-exec-2 Created container spark-kubernetes-executor 21s Normal Started pod/pythonpi-f36cce7593d332f1-exec-2 Started container spark-kubernetes-executor
На выходе выше, мы видим, что POD Pyspark-Pi-Driver-Rigines-BF20CAE50B6A8253
был «выселен из-за упреждений» другой работой с приоритетом «спешной».
Будущая работа
Kubernetes обеспечивает контейнеры с крючками жизненного цикла. Крючки позволяют контейнерам осознавать события в их жизненном цикле управления и запущенным кодом, реализованным в обработчике, когда выполняется соответствующий крючок жизненного цикла.
В частности, Престоп
Крючок можно назвать непосредственно перед прекращением контейнера из-за упрека (среди других событий). Таким образом, мы можем рассмотреть действие, что бы она ни было, чтобы быть вызванным в случае упреждения. Все, что вам нужно сделать, это реализовать и зарегистрировать обработчик для этого крючка.
Посмотреть Крючки жизненного цикла контейнера Отказ
Мы скоро дойдем до конца нашего путешествия. Прежде чем желать вам спокойной ночи, давайте посмотрим на то, как мы можем отслеживать искровое приложение из кода Python.
Получение статуса искровой заявки
Многие операции в клиенте Kubernetes Python могут быть Смотрел Отказ Это позволяет нашу программу Python наблюдать за изменениями конкретного ресурса, пока не получите желаемый результат или истечения срока действия часов.
Здесь мы хотим отслеживать жизненный цикл драйвера POD, начиная с В ожидании
фаза, движущаяся через Бег
Если зажигающий контейнер начинается нормально, а затем через либо Преуспел
или Не удалось
Фазы:
from kubernetes import client, watch app_name = 'pyspark-pi-routine-bf20cae50b6a8253' v1 = client.CoreV1Api() count = 2 w = watch.Watch() label_selector = 'app-name=%s,spark-role=driver' % app_name for event in w.stream(v1.list_namespaced_pod, namespace='spark-jobs', label_selector=label_selector, timeout_seconds=60): print('Event: %s' % event['object'].status.phase) count -= 1 if not count: w.stop()
Здесь, ожидается, что водитель POD (отсюда, зажигание), как ожидается, будет выполнен, успешно или нет, в течение 60 секунд или меньше. Его статус должен меняться только дважды в течение этого периода: в идеале В ожидании
> Бег
> Преуспел
Отказ
Получение журналов
Можно извлечь журналы драйвера POD и смешать их в те из хост-приложения. Получение журналов так же просто:
from kubernetes import client, watch from threading import Thread v1 = client.CoreV1Api() pod_name = 'pyspark-pi-routine-bf20cae50b6a8253-driver' def logs(pod_name): w = watch.Watch() for event in w.stream(v1.read_namespaced_pod_log, pod_name=pod_name, namespace='spark-jobs', _request_timeout=300): yield event # We surely don't want to block the main thread while reading the logs def consumer(): for log in logs(pod_name): print(log) t = Thread(target=consumer) t.start()
Получение вхождения URI
Как только исходное приложение началось, проникновение (по меньшей мере, его публичный IP-адрес), который обнажает UI SPORK, может занять некоторое время, прежде чем она станет доступным. Здесь также мы можем следить за ресурсом входа следующим образом:
from kubernetes import client, watch app_name = 'pyspark-pi-routine-bf20cae50b6a8253' networking_v1_beta1_api = client.NetworkingV1beta1Api() w = watch.Watch() label_selector = 'app-name=%s' % app_name for event in w.stream(networking_v1_beta1_api.list_namespaced_ingress, namespace=namespace, label_selector=label_selector, timeout_seconds=30): ingress = event['object'].status.load_balancer.ingress if ingress: external_ip = ingress[0].ip print('Event: The Spark Web UI is available at http://%s/%s' % (external_ip, app_name)) w.stop() else: print('Event: Ingress not yet available')
Что, черт возьми!
Мы видели, что код Python для запуска или удаления зажигательных приложений немного отличается в зависимости от того, используете ли мы оператор Spark или Spark-sead. Но так как мы называем и отметим объекты Kubernetes последовательно между ними, и, поскольку мы устанавливаем правильные отношения владения, мы можем контролировать наши зажимать приложения и управлять своим жизненным циклом одинаково.
Хочешь узнать больше? Сценарии Python, объясненные в этой последней статье, доступны в этом Репозиторий GitHub . Служить себе.
Оригинал: “https://dev.to/stack-labs/my-journey-with-spark-on-kubernetes-in-python-3-3-536e”