Мы кратко смотрим в шаблон автоматического выключателя, прежде чем прыгать в код.
Что такое взлом схемы?
В приложениях реального мира сервисы могут пойти вниз и начать резервную копию все время (или они могут просто остаться вниз). Идея состоит в том, что, когда вы делаете удаленный звонок ( HTTP-запрос/RPC ) на другой сервис, есть шансы, что удаленный звонок может потерпеть неудачу. После определенного количества неудачных удаленных вызовов мы прекращаем сделать удаленные вызовы и отправлять кэшированный ответ или ошибку как ответ. После определенной задержки мы допускаем выполнение одного удаленного вызова на провал сервера, если он удается, мы допускаем последующие удаленные вызовы на сервер, если он не добился успеха, мы продолжим отправку кэшированного ответа или ошибки и будет Не делайте никаких удаленных звонков в неспособность в течение некоторого времени.
Когда все услуги работали, и удаленные звонки возвращались без каких-либо ошибок, мы называем это государство – « закрыто ».
Когда удаленные звонки продолжали провалиться, и когда мы перестали делать более удаленные звонки в неспособность, мы называем это государство – « открыть »
После определенной задержки, когда мы делаем удаленный звонок в провал службы, состояние переходов от « открыть » на « наполовину открытой ». Если удаленный звонок не потерпит неудачу, то мы переходим на состояние от « наполовину открытой » – « закрыто », а последующие удаленные вызовы могут быть сделаны. Если удаленный вызов не удался, мы переходим состояние от « наполовину открытой », обратно в « открыть » состояние, и мы ждем определенного периода времени, пока мы не сможем сделать следующий удаленный звонок (в Половина открытого государство)
Диаграмма с переходной экономикой: image src.
Чтобы узнать больше, читать Это и это
Зачем тебе это?
- Для предотвращения отказа сети или обслуживания из каскада в другие услуги.
- Сохраняет пропускную способность, не делая запросов по сети, когда обслуживание, которую вы запрашиваете.
- Время для неудачной службы восстановления или запуска резервного копирования.
Кодовый марафон
Давайте теперь попробуем построить простой выключатель с помощью Python
Отказ от ответственности: Это никоим образом не готово к производству. Есть несколько отличных библиотек, которые доступны онлайн и хорошо проверены. Я упомянул два из них здесь: Автоматический выключатель и Pyreaker Отказ
Давайте сначала определимся на API для автоматического выключателя, который мы собираемся построить, а также определить ожидаемое поведение.
Я большой поклонник Повторите попытку Библиотечный синтаксис. Давайте попробуем использовать это здесь. Мы можем это к этому API к концу поста блога.
def circuit_breaker(exceptions=(Exception,), threshold=5, delay=60): """Returns a circuit decorator. :param exceptions: an exception or a tuple of exceptions to catch. default: Exception. :param threshold: the number of failed attempts before changing the state to Open :param delay: delay in seconds between Closed and Half Open state :param logger: logger.warning(fmt, error, delay) will be called on failed attempts. default: retry.logging_logger. if None, logging is disabled. :returns: a circuit_breaker decorator. """
@circuit_breaker(exceptions=Exception, threshold=5, delay=60) def make_api_call(url, data): # function that makes an api-request to another server/application pass
Давайте определим все возможные состояния
# circuit_breaker.py class StateChoices: OPEN = "open" CLOSED = "closed" HALF_OPEN = "half_open"
Давайте создадим класс, который обрабатывает всю логику выключателя.
# circuit_breaker.py logging.basicConfig( level=logging.INFO, format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s", datefmt="%H:%M:%S", ) class CircuitBreaker: def __init__(self, func, exceptions, threshold, delay): """ :param func: method that makes the remote call :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions) :param threshold: number of failed attempts before the state is changed to "Open" :param delay: delay in seconds between "Closed" and "Half-Open" state """ self.func = func self.exceptions_to_catch = exceptions self.threshold = threshold self.delay = delay # by default set the state to closed self.state = StateChoices.CLOSED self.last_attempt_timestamp = None # keep track of failed attemp count self._failed_attempt_count = 0 def update_last_attempt_timestamp(self): self.last_attempt_timestamp = datetime.utcnow().timestamp() def set_state(self, state): """To track the state changes by logging the information""" prev_state = self.state self.state = state logging.info(f"Changed state from {prev_state} to {self.state}") def handle_closed_state(self, *args, **kwargs): pass def handle_open_state(self, *args, **kwargs): pass def make_remote_call(self, *args, **kwargs): if self.state == StateChoices.CLOSED: return self.handle_closed_state(*args, **kwargs) if self.state == StateChoices.OPEN: return self.handle_open_state(*args, **kwargs)
Конструктор принимает следующие параметры
ФУНК
– метод/функция, которая делает удаленный вызовИсключения
– исключение или кортеж исключений, чтобы поймать (в идеале должны быть сетевыми исключениями)порог
– количество неудачных попыток до того, как состояние изменено на «открытие»задержка
– задержка в секундах между «закрытым» и «полуоткрытым» состоянием
make_remote_call
принимает параметры, которые лежащие в основе необходимости удаленного вызова ( Func
)
Если кажется запутанным, посмотрите на следующий фрагмент
def make_request(url): print(f"Url is {url}") obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10) obj.make_remote_call(url="www.google.com")
make_request
передается как первая классная функция к классу отъемки. Пармы, требуемые make_request
отправляются через make_remote_call.
Давайте теперь попробуем завершить Handle_Closed_state.
и handle_open_state.
# circuit_breaker.py class RemoteCallFailedException(Exception): pass class CircuitBreaker: def handle_closed_state(self, *args, **kwargs): allowed_exceptions = self.exceptions_to_catch try: ret_val = self.func(*args, **kwargs) logging.info("Success: Remote call") self.update_last_attempt_timestamp() return ret_val except allowed_exceptions as e: # remote call has failed logging.info("Failure: Remote call") # increment the failed attempt count self._failed_attempt_count += 1 # update last_attempt_timestamp self.update_last_attempt_timestamp() # if the failed attempt count is more than the threshold # then change the state to OPEN if self._failed_attempt_count >= self.threshold: self.set_state(StateChoices.OPEN) # re-raise the exception raise RemoteCallFailedException from e def make_remote_call(self, *args, **kwargs): if self.state == StateChoices.CLOSED: return self.handle_closed_state(*args, **kwargs) if self.state == StateChoices.OPEN: return self.handle_open_state(*args, **kwargs)
handle_closed_state
делает удаленный звонок, если это успех, то мы обновляем last_attempt_timestamp
и вернуть результат удаленного вызова. Если удаленный звонок не удается, то _failed_attempt_count.
увеличивается. Если _failed_attempt_count.
не достиг порогового значения, затем просто поднять исключение. Если _ failed_attempt_count.
больше или равно порогу, мы изменяем состояние, чтобы открыть, и, наконец, поднят исключение.
# circuit_breaker.py class CircuitBreaker: def handle_open_state(self, *args, **kwargs): current_timestamp = datetime.utcnow().timestamp() # if `delay` seconds have not elapsed since the last attempt, raise an exception if self.last_attempt_timestamp + self.delay >= current_timestamp: raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs") # after `delay` seconds have elapsed since the last attempt, try making the remote call # update the state to half open state self.set_state(StateChoices.HALF_OPEN) allowed_exceptions = self.exceptions_to_catch try: ret_val = self.func(*args, **kwargs) # the remote call was successful # now reset the state to Closed self.set_state(StateChoices.CLOSED) # reset the failed attempt counter self._failed_attempt_count = 0 # update the last_attempt_timestamp self.update_last_attempt_timestamp() # return the remote call's response return ret_val except allowed_exceptions as e: # the remote call failed again # increment the failed attempt count self._failed_attempt_count += 1 # update last_attempt_timestamp self.update_last_attempt_timestamp() # set the state to "OPEN" self.set_state(StateChoices.OPEN) # raise the error raise RemoteCallFailedException from e def make_remote_call(self, *args, **kwargs): if self.state == StateChoices.CLOSED: return self.handle_closed_state(*args, **kwargs) if self.state == StateChoices.OPEN: return self.handle_open_state(*args, **kwargs)
handle_open_state
Первые проверки, если задержка
Секунды прошло с момента последней попытки сделать удаленный звонок. Если нет, то он повышает исключение. Если задержка
Секунды прошло с момента последней попытки, тогда мы изменим государство OT «Половина открытой». Теперь мы стараемся сделать один удаленный звонок в неспособность. Если удаленный звонок был успешным, то мы изменяем состояние на «закрытый» и сбросив _failed_attempt_count.
до 0 и вернуть ответ удаленного вызова. Если удаленный вызов не удался, когда он был в состоянии «половина открытия», то состояние снова установлено на «Open», и мы поднимаем исключение.
Полный код
# circuit_breaker.py import functools import http import logging from datetime import datetime import requests logging.basicConfig( level=logging.INFO, format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s", datefmt="%H:%M:%S", ) class StateChoices: OPEN = "open" CLOSED = "closed" HALF_OPEN = "half_open" class RemoteCallFailedException(Exception): pass class CircuitBreaker: def __init__(self, func, exceptions, threshold, delay): """ :param func: method that makes the remote call :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions) :param threshold: number of failed attempts before the state is changed to "Open" :param delay: delay in seconds between "Closed" and "Half-Open" state """ self.func = func self.exceptions_to_catch = exceptions self.threshold = threshold self.delay = delay # by default set the state to closed self.state = StateChoices.CLOSED self.last_attempt_timestamp = None # keep track of failed attemp count self._failed_attempt_count = 0 def update_last_attempt_timestamp(self): self.last_attempt_timestamp = datetime.utcnow().timestamp() def set_state(self, state): prev_state = self.state self.state = state logging.info(f"Changed state from {prev_state} to {self.state}") def handle_closed_state(self, *args, **kwargs): allowed_exceptions = self.exceptions_to_catch try: ret_val = self.func(*args, **kwargs) logging.info("Success: Remote call") self.update_last_attempt_timestamp() return ret_val except allowed_exceptions as e: # remote call has failed logging.info("Failure: Remote call") # increment the failed attempt count self._failed_attempt_count += 1 # update last_attempt_timestamp self.update_last_attempt_timestamp() # if the failed attempt count is more than the threshold # then change the state to OPEN if self._failed_attempt_count >= self.threshold: self.set_state(StateChoices.OPEN) # re-raise the exception raise RemoteCallFailedException from e def handle_open_state(self, *args, **kwargs): current_timestamp = datetime.utcnow().timestamp() # if `delay` seconds have not elapsed since the last attempt, raise an exception if self.last_attempt_timestamp + self.delay >= current_timestamp: raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs") # after `delay` seconds have elapsed since the last attempt, try making the remote call # update the state to half open state self.set_state(StateChoices.HALF_OPEN) allowed_exceptions = self.exceptions_to_catch try: ret_val = self.func(*args, **kwargs) # the remote call was successful # now reset the state to Closed self.set_state(StateChoices.CLOSED) # reset the failed attempt counter self._failed_attempt_count = 0 # update the last_attempt_timestamp self.update_last_attempt_timestamp() # return the remote call's response return ret_val except allowed_exceptions as e: # the remote call failed again # increment the failed attempt count self._failed_attempt_count += 1 # update last_attempt_timestamp self.update_last_attempt_timestamp() # set the state to "OPEN" self.set_state(StateChoices.OPEN) # raise the error raise RemoteCallFailedException from e def make_remote_call(self, *args, **kwargs): if self.state == StateChoices.CLOSED: return self.handle_closed_state(*args, **kwargs) if self.state == StateChoices.OPEN: return self.handle_open_state(*args, **kwargs)
Теперь, чтобы проверить это. Давайте создадим MOCK Server.
Установите колбу и запросы. IPyton не является обязательным
pip install requests pip install Flask pip install ipython
Давайте создадим некоторые конечные точки, чтобы издеваться с сервером
# main.py import random import time from flask import Flask app = Flask(__name__) @app.route('/success') def success_endpoint(): return { "msg": "Call to this endpoint was a smashing success." }, 200 @app.route('/failure') def faulty_endpoint(): r = random.randint(0, 1) if r == 0: time.sleep(2) return { "msg": "I will fail." }, 500 @app.route('/random') def fail_randomly_endpoint(): r = random.randint(0, 1) if r == 0: return { "msg": "Success msg" }, 200 return { "msg": "I will fail (sometimes)." }, 500
Запустите сервер разработки
export FLASK_APP=main.py; flask run
По умолчанию он работает на порту 5000
Теперь, чтобы проверить это. Вы можете использовать эти фрагменты, чтобы проверить это.
# snippets.py faulty_endpoint = "http://localhost:5000/failure" success_endpoint = "http://localhost:5000/success" random_status_endpoint = "http://localhost:5000/random" def make_request(url): try: response = requests.get(url, timeout=0.3) if response.status_code == http.HTTPStatus.OK: print(f"Call to {url} succeed with status code = {response.status_code}") return response if 500 <= response.status_code < 600: print(f"Call to {url} failed with status code = {response.status_code}") raise Exception("Server Issue") except Exception: print(f"Call to {url} failed") raise
(circuit-breaker) ➜ circuit-breaker git:(master) ✗ ipython In [1]: from circuit_breaker import CircuitBreaker In [2]: from snippets import make_request, faulty_endpoint, success_endpoint In [3]: obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10) In [4]: obj.make_remote_call(success_endpoint) Call to http://localhost:5000/success succeed with status code = 200 06:07:51,255 INFO: Success: Remote call Out[4]:In [5]: obj.make_remote_call(success_endpoint) Call to http://localhost:5000/success succeed with status code = 200 06:07:53,610 INFO: Success: Remote call Out[5]: In [6]: vars(obj) Out[6]: {'func': , 'exceptions_to_catch': (Exception,), 'threshold': 5, 'delay': 10, 'state': 'closed', 'last_attempt_timestamp': 1607800073.610199, '_failed_attempt_count': 0}
Линия 1 и строка 2 – просто импорт. В строке 3 мы создаем объект отъемки для make_request
Отказ Здесь мы устанавливаем Исключения = (исключение)
Это поймает все исключения. Мы должны в идеале в идеале сузить исключение с тем, который мы фактически хотим поймать, в этом случае, сетевые исключения, но мы собираемся оставить его там для этого демо.
Теперь сделайте последовательные звонки на неисправен
конечная точка.
In [7]: obj.make_remote_call(faulty_endpoint) In [8]: obj.make_remote_call(faulty_endpoint) In [9]: obj.make_remote_call(faulty_endpoint) In [10]: obj.make_remote_call(faulty_endpoint) In [11]: obj.make_remote_call(faulty_endpoint) In [12]: obj.make_remote_call(faulty_endpoint) --------------------------------------------------------------------------- Traceback data .......... RemoteCallFailedException: Retry after 8.688776969909668 secs In [13]: obj.make_remote_call(success_endpoint) --------------------------------------------------------------------------- Traceback data...... RemoteCallFailedException: Retry after 6.096494913101196 secs
Попробуйте сделать эти звонки как можно быстрее. После первых пяти Calls на неторочную точку следующий вызов (строка 12) не сделает API-запрос на сервер Flask вместо этого будет поднять исключение, упомянутое повторение после указанного количества секунды. Даже если вы сделаете вызов API к Success_endpoint
Конечная точка (линия 13), она все равно будет поднять ошибку. Это в «открытом» состоянии.
Теперь, после истечения времени задержки, если мы позвоните на неисправную конечную точку, он будет переходить от половины открытого для открытия состояния.
In [18]: obj.make_remote_call(faulty_endpoint) 06:21:24,959 INFO: Changed state from open to half_open ... 06:21:24,964 INFO: Changed state from half_open to open
Теперь после того, как задержка прошла, если мы позвоните в успех_енду, он будет переходить от половины открытого для закрытого состояния
In [19]: obj.make_remote_call(success_endpoint) 06:25:10,673 INFO: Changed state from open to half_open ... 06:25:10,678 INFO: Changed state from half_open to closed Out[19]:
Теперь у нас есть рабочий выключатель. Мы могли бы ввести ответную кэширование, мониторинг и сделать его Threadsafe. Ошибки могут быть обработаны лучше. Больше типов исключений может помочь. Все эти функции остаются в качестве упражнения для читателей.
Наконец, улучшение API не должно занимать много времени. Я добавил быстро грязную версию здесь
# circuit_breaker.py class APICircuitBreaker: def __init__(self, exceptions=(Exception,), threshold=5, delay=60): self.obj = functools.partial( CircuitBreaker, exceptions=exceptions, threshold=threshold, delay=delay ) def __call__(self, func): self.obj = self.obj(func=func) def decorator(*args, **kwargs): ret_val = self.obj.make_remote_call(*args, **kwargs) return ret_val return decorator def __getattr__(self, item): return getattr(self.obj, item) circuit_breaker = APICircuitBreaker
# snippets.py @circuit_breaker() def make_request(url): try: response = requests.get(url, timeout=0.3) if response.status_code == http.HTTPStatus.OK: print(f"Call to {url} succeed with status code = {response.status_code}") return response if 500 <= response.status_code < 600: print(f"Call to {url} failed with status code = {response.status_code}") raise Exception("Server Issue") except Exception: print(f"Call to {url} failed") raise
Все образцы кода можно найти здесь
Теперь у нас есть рабочий выключатель. Мы могли бы ввести ответную кэширование, мониторинг и сделать его безопасным. Ошибки могут быть обработаны лучше. Больше типов исключений может помочь. Все эти функции остаются в качестве упражнения для читателей.
Связь со мной на Твиттер
Использованная литература:
- https://dzone.com/articles/circuit-breaker-pattern
- https://medium.com/@narengowda/what-is-circuitbreaking-in-microservices-2053f4f66882
- https://martinfowler.com/bliki/CircuitBreaker.html
- https://microservices.io/patterns/reliability/circuit-breaker.html
Оригинал: “https://dev.to/bhavesh_praveen/implementing-circuit-breaker-pattern-from-scratch-in-python-4c6g”