Рубрики
Без рубрики

Реализация выключателя выключателя с нуля в Python

Мы кратко смотрим в шаблон автоматического выключателя, прежде чем прыгать в код. Что такое цепь BR … Теги с Python, микросервисами, архитектурой, производительностью.

Мы кратко смотрим в шаблон автоматического выключателя, прежде чем прыгать в код.

Что такое взлом схемы?

В приложениях реального мира сервисы могут пойти вниз и начать резервную копию все время (или они могут просто остаться вниз). Идея состоит в том, что, когда вы делаете удаленный звонок ( HTTP-запрос/RPC ) на другой сервис, есть шансы, что удаленный звонок может потерпеть неудачу. После определенного количества неудачных удаленных вызовов мы прекращаем сделать удаленные вызовы и отправлять кэшированный ответ или ошибку как ответ. После определенной задержки мы допускаем выполнение одного удаленного вызова на провал сервера, если он удается, мы допускаем последующие удаленные вызовы на сервер, если он не добился успеха, мы продолжим отправку кэшированного ответа или ошибки и будет Не делайте никаких удаленных звонков в неспособность в течение некоторого времени.

Когда все услуги работали, и удаленные звонки возвращались без каких-либо ошибок, мы называем это государство – « закрыто ».

Когда удаленные звонки продолжали провалиться, и когда мы перестали делать более удаленные звонки в неспособность, мы называем это государство – « открыть »

После определенной задержки, когда мы делаем удаленный звонок в провал службы, состояние переходов от « открыть » на « наполовину открытой ». Если удаленный звонок не потерпит неудачу, то мы переходим на состояние от « наполовину открытой » – « закрыто », а последующие удаленные вызовы могут быть сделаны. Если удаленный вызов не удался, мы переходим состояние от « наполовину открытой », обратно в « открыть » состояние, и мы ждем определенного периода времени, пока мы не сможем сделать следующий удаленный звонок (в Половина открытого государство)

Диаграмма с переходной экономикой: image src.

Чтобы узнать больше, читать Это и это

Зачем тебе это?

  • Для предотвращения отказа сети или обслуживания из каскада в другие услуги.
  • Сохраняет пропускную способность, не делая запросов по сети, когда обслуживание, которую вы запрашиваете.
  • Время для неудачной службы восстановления или запуска резервного копирования.

Кодовый марафон

Давайте теперь попробуем построить простой выключатель с помощью Python

Отказ от ответственности: Это никоим образом не готово к производству. Есть несколько отличных библиотек, которые доступны онлайн и хорошо проверены. Я упомянул два из них здесь: Автоматический выключатель и Pyreaker Отказ

Давайте сначала определимся на API для автоматического выключателя, который мы собираемся построить, а также определить ожидаемое поведение.

Я большой поклонник Повторите попытку Библиотечный синтаксис. Давайте попробуем использовать это здесь. Мы можем это к этому API к концу поста блога.

def circuit_breaker(exceptions=(Exception,), threshold=5, delay=60):
      """Returns a circuit decorator.

    :param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
    :param threshold: the number of failed attempts before changing the state to Open
    :param delay: delay in seconds between Closed and Half Open state
    :param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
                   default: retry.logging_logger. if None, logging is disabled.
    :returns: a circuit_breaker decorator.
    """
@circuit_breaker(exceptions=Exception, threshold=5, delay=60)
def make_api_call(url, data):
  # function that makes an api-request to another server/application
  pass

Давайте определим все возможные состояния

# circuit_breaker.py
class StateChoices:
    OPEN = "open"
    CLOSED = "closed"
    HALF_OPEN = "half_open"

Давайте создадим класс, который обрабатывает всю логику выключателя.

# circuit_breaker.py
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

class CircuitBreaker:
    def __init__(self, func, exceptions, threshold, delay):
        """
        :param func: method that makes the remote call
        :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
        :param threshold: number of failed attempts before the state is changed to "Open"
        :param delay: delay in seconds between "Closed" and "Half-Open" state
        """
        self.func = func
        self.exceptions_to_catch = exceptions
        self.threshold = threshold
        self.delay = delay

        # by default set the state to closed
        self.state = StateChoices.CLOSED


        self.last_attempt_timestamp = None
        # keep track of failed attemp count
        self._failed_attempt_count = 0

    def update_last_attempt_timestamp(self):
        self.last_attempt_timestamp = datetime.utcnow().timestamp()

    def set_state(self, state):
        """To track the state changes by logging the information"""
        prev_state = self.state
        self.state = state
        logging.info(f"Changed state from {prev_state} to {self.state}")

    def handle_closed_state(self, *args, **kwargs):
        pass

    def handle_open_state(self, *args, **kwargs):
        pass

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)

Конструктор принимает следующие параметры

  • ФУНК – метод/функция, которая делает удаленный вызов
  • Исключения – исключение или кортеж исключений, чтобы поймать (в идеале должны быть сетевыми исключениями)
  • порог – количество неудачных попыток до того, как состояние изменено на «открытие»
  • задержка – задержка в секундах между «закрытым» и «полуоткрытым» состоянием

make_remote_call принимает параметры, которые лежащие в основе необходимости удаленного вызова ( Func )

Если кажется запутанным, посмотрите на следующий фрагмент

def make_request(url):
  print(f"Url is {url}")

obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)
obj.make_remote_call(url="www.google.com")

make_request передается как первая классная функция к классу отъемки. Пармы, требуемые make_request отправляются через make_remote_call.

Давайте теперь попробуем завершить Handle_Closed_state. и handle_open_state.

# circuit_breaker.py
class RemoteCallFailedException(Exception):
    pass

class CircuitBreaker:

    def handle_closed_state(self, *args, **kwargs):
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            logging.info("Success: Remote call")
            self.update_last_attempt_timestamp()
            return ret_val
        except allowed_exceptions as e:
            # remote call has failed
            logging.info("Failure: Remote call")
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # if the failed attempt count is more than the threshold
            # then change the state to OPEN
            if self._failed_attempt_count >= self.threshold:
                self.set_state(StateChoices.OPEN)
            # re-raise the exception
            raise RemoteCallFailedException from e


    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)

handle_closed_state делает удаленный звонок, если это успех, то мы обновляем last_attempt_timestamp и вернуть результат удаленного вызова. Если удаленный звонок не удается, то _failed_attempt_count. увеличивается. Если _failed_attempt_count. не достиг порогового значения, затем просто поднять исключение. Если _ failed_attempt_count. больше или равно порогу, мы изменяем состояние, чтобы открыть, и, наконец, поднят исключение.

# circuit_breaker.py
class CircuitBreaker:

    def handle_open_state(self, *args, **kwargs):
        current_timestamp = datetime.utcnow().timestamp()
        # if `delay` seconds have not elapsed since the last attempt, raise an exception
        if self.last_attempt_timestamp + self.delay >= current_timestamp:
            raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")

        # after `delay` seconds have elapsed since the last attempt, try making the remote call
        # update the state to half open state
        self.set_state(StateChoices.HALF_OPEN)
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            # the remote call was successful
            # now reset the state to Closed
            self.set_state(StateChoices.CLOSED)
            # reset the failed attempt counter
            self._failed_attempt_count = 0
            # update the last_attempt_timestamp
            self.update_last_attempt_timestamp()
            # return the remote call's response
            return ret_val
        except allowed_exceptions as e:
            # the remote call failed again
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # set the state to "OPEN"
            self.set_state(StateChoices.OPEN)

            # raise the error
            raise RemoteCallFailedException from e

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)

handle_open_state Первые проверки, если задержка Секунды прошло с момента последней попытки сделать удаленный звонок. Если нет, то он повышает исключение. Если задержка Секунды прошло с момента последней попытки, тогда мы изменим государство OT «Половина открытой». Теперь мы стараемся сделать один удаленный звонок в неспособность. Если удаленный звонок был успешным, то мы изменяем состояние на «закрытый» и сбросив _failed_attempt_count. до 0 и вернуть ответ удаленного вызова. Если удаленный вызов не удался, когда он был в состоянии «половина открытия», то состояние снова установлено на «Open», и мы поднимаем исключение.

Полный код

# circuit_breaker.py

import functools
import http
import logging
from datetime import datetime

import requests

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


class StateChoices:
    OPEN = "open"
    CLOSED = "closed"
    HALF_OPEN = "half_open"


class RemoteCallFailedException(Exception):
    pass


class CircuitBreaker:
    def __init__(self, func, exceptions, threshold, delay):
        """
        :param func: method that makes the remote call
        :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
        :param threshold: number of failed attempts before the state is changed to "Open"
        :param delay: delay in seconds between "Closed" and "Half-Open" state
        """
        self.func = func
        self.exceptions_to_catch = exceptions
        self.threshold = threshold
        self.delay = delay

        # by default set the state to closed
        self.state = StateChoices.CLOSED


        self.last_attempt_timestamp = None
        # keep track of failed attemp count
        self._failed_attempt_count = 0

    def update_last_attempt_timestamp(self):
        self.last_attempt_timestamp = datetime.utcnow().timestamp()

    def set_state(self, state):
        prev_state = self.state
        self.state = state
        logging.info(f"Changed state from {prev_state} to {self.state}")

    def handle_closed_state(self, *args, **kwargs):
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            logging.info("Success: Remote call")
            self.update_last_attempt_timestamp()
            return ret_val
        except allowed_exceptions as e:
            # remote call has failed
            logging.info("Failure: Remote call")
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # if the failed attempt count is more than the threshold
            # then change the state to OPEN
            if self._failed_attempt_count >= self.threshold:
                self.set_state(StateChoices.OPEN)
            # re-raise the exception
            raise RemoteCallFailedException from e

    def handle_open_state(self, *args, **kwargs):
        current_timestamp = datetime.utcnow().timestamp()
        # if `delay` seconds have not elapsed since the last attempt, raise an exception
        if self.last_attempt_timestamp + self.delay >= current_timestamp:
            raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")

        # after `delay` seconds have elapsed since the last attempt, try making the remote call
        # update the state to half open state
        self.set_state(StateChoices.HALF_OPEN)
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            # the remote call was successful
            # now reset the state to Closed
            self.set_state(StateChoices.CLOSED)
            # reset the failed attempt counter
            self._failed_attempt_count = 0
            # update the last_attempt_timestamp
            self.update_last_attempt_timestamp()
            # return the remote call's response
            return ret_val
        except allowed_exceptions as e:
            # the remote call failed again
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # set the state to "OPEN"
            self.set_state(StateChoices.OPEN)

            # raise the error
            raise RemoteCallFailedException from e

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)

Теперь, чтобы проверить это. Давайте создадим MOCK Server.

Установите колбу и запросы. IPyton не является обязательным

pip install requests
pip install Flask
pip install ipython 

Давайте создадим некоторые конечные точки, чтобы издеваться с сервером

# main.py

import random
import time

from flask import Flask
app = Flask(__name__)


@app.route('/success')
def success_endpoint():
    return {
        "msg": "Call to this endpoint was a smashing success."
    }, 200


@app.route('/failure')
def faulty_endpoint():
    r = random.randint(0, 1)
    if r == 0:
        time.sleep(2)

    return {
        "msg": "I will fail."
    }, 500


@app.route('/random')
def fail_randomly_endpoint():
    r = random.randint(0, 1)
    if r == 0:
        return {
            "msg": "Success msg"
        }, 200

    return {
        "msg": "I will fail (sometimes)."
    }, 500

Запустите сервер разработки

export FLASK_APP=main.py; flask run

По умолчанию он работает на порту 5000

Теперь, чтобы проверить это. Вы можете использовать эти фрагменты, чтобы проверить это.

# snippets.py

faulty_endpoint = "http://localhost:5000/failure"
success_endpoint = "http://localhost:5000/success"
random_status_endpoint = "http://localhost:5000/random"

def make_request(url):
    try:
        response = requests.get(url, timeout=0.3)
        if response.status_code == http.HTTPStatus.OK:
            print(f"Call to {url} succeed with status code = {response.status_code}")
            return response
        if 500 <= response.status_code < 600:
            print(f"Call to {url} failed with status code = {response.status_code}")
            raise Exception("Server Issue")
    except Exception:
        print(f"Call to {url} failed")
        raise
(circuit-breaker) ➜  circuit-breaker git:(master) ✗ ipython

In [1]: from circuit_breaker import CircuitBreaker

In [2]: from snippets import make_request, faulty_endpoint, success_endpoint

In [3]: obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)

In [4]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:51,255 INFO: Success: Remote call
Out[4]: 

In [5]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:53,610 INFO: Success: Remote call
Out[5]: 

In [6]: vars(obj)
Out[6]:
{'func': ,
 'exceptions_to_catch': (Exception,),
 'threshold': 5,
 'delay': 10,
 'state': 'closed',
 'last_attempt_timestamp': 1607800073.610199,
 '_failed_attempt_count': 0}

Линия 1 и строка 2 – просто импорт. В строке 3 мы создаем объект отъемки для make_request Отказ Здесь мы устанавливаем Исключения = (исключение) Это поймает все исключения. Мы должны в идеале в идеале сузить исключение с тем, который мы фактически хотим поймать, в этом случае, сетевые исключения, но мы собираемся оставить его там для этого демо.

Теперь сделайте последовательные звонки на неисправен конечная точка.

In [7]: obj.make_remote_call(faulty_endpoint)

In [8]: obj.make_remote_call(faulty_endpoint)

In [9]: obj.make_remote_call(faulty_endpoint)

In [10]: obj.make_remote_call(faulty_endpoint)

In [11]: obj.make_remote_call(faulty_endpoint)

In [12]: obj.make_remote_call(faulty_endpoint)
---------------------------------------------------------------------------
Traceback data ..........

RemoteCallFailedException: Retry after 8.688776969909668 secs  

In [13]: obj.make_remote_call(success_endpoint)
---------------------------------------------------------------------------
Traceback data......

RemoteCallFailedException: Retry after 6.096494913101196 secs

Попробуйте сделать эти звонки как можно быстрее. После первых пяти Calls на неторочную точку следующий вызов (строка 12) не сделает API-запрос на сервер Flask вместо этого будет поднять исключение, упомянутое повторение после указанного количества секунды. Даже если вы сделаете вызов API к Success_endpoint Конечная точка (линия 13), она все равно будет поднять ошибку. Это в «открытом» состоянии.

Теперь, после истечения времени задержки, если мы позвоните на неисправную конечную точку, он будет переходить от половины открытого для открытия состояния.

In [18]: obj.make_remote_call(faulty_endpoint)
06:21:24,959 INFO: Changed state from open to half_open
...
06:21:24,964 INFO: Changed state from half_open to open

Теперь после того, как задержка прошла, если мы позвоните в успех_енду, он будет переходить от половины открытого для закрытого состояния

In [19]: obj.make_remote_call(success_endpoint)
06:25:10,673 INFO: Changed state from open to half_open
...
06:25:10,678 INFO: Changed state from half_open to closed
Out[19]: 

Теперь у нас есть рабочий выключатель. Мы могли бы ввести ответную кэширование, мониторинг и сделать его Threadsafe. Ошибки могут быть обработаны лучше. Больше типов исключений может помочь. Все эти функции остаются в качестве упражнения для читателей.

Наконец, улучшение API не должно занимать много времени. Я добавил быстро грязную версию здесь

# circuit_breaker.py

class APICircuitBreaker:
    def __init__(self, exceptions=(Exception,), threshold=5, delay=60):
        self.obj = functools.partial(
            CircuitBreaker,
            exceptions=exceptions,
            threshold=threshold,
            delay=delay
        )

    def __call__(self, func):
        self.obj = self.obj(func=func)

        def decorator(*args, **kwargs):
            ret_val = self.obj.make_remote_call(*args, **kwargs)
            return ret_val

        return decorator

    def __getattr__(self, item):
        return getattr(self.obj, item)


circuit_breaker = APICircuitBreaker
# snippets.py

@circuit_breaker()
def make_request(url):
    try:
        response = requests.get(url, timeout=0.3)
        if response.status_code == http.HTTPStatus.OK:
            print(f"Call to {url} succeed with status code = {response.status_code}")
            return response
        if 500 <= response.status_code < 600:
            print(f"Call to {url} failed with status code = {response.status_code}")
            raise Exception("Server Issue")
    except Exception:
        print(f"Call to {url} failed")
        raise

Все образцы кода можно найти здесь

Теперь у нас есть рабочий выключатель. Мы могли бы ввести ответную кэширование, мониторинг и сделать его безопасным. Ошибки могут быть обработаны лучше. Больше типов исключений может помочь. Все эти функции остаются в качестве упражнения для читателей.

Связь со мной на Твиттер

Использованная литература:

  1. https://dzone.com/articles/circuit-breaker-pattern
  2. https://medium.com/@narengowda/what-is-circuitbreaking-in-microservices-2053f4f66882
  3. https://martinfowler.com/bliki/CircuitBreaker.html
  4. https://microservices.io/patterns/reliability/circuit-breaker.html

Оригинал: “https://dev.to/bhavesh_praveen/implementing-circuit-breaker-pattern-from-scratch-in-python-4c6g”