Рубрики
Без рубрики

Создайте свою собственную систему событий в Python

Система событий не сложно построить самостоятельно. Существует множество библиотек, готовых к использованию, но для … с меткой Python, 5days5blogposts, Eventdriven, Design.

Система событий не сложно построить самостоятельно. Есть много библиотек, готовых к использованию, но для лучшего понимания я хочу реализовать его самостоятельно.

Идея этого поста исходит от это видео. Сегодня я искал в Интернете советы, как управлять моим проектом, в котором много разных модулей, и я хотел сделать это красиво и чисто. Одна из идей, которые я нашел, – Наблюдатель Образец из «Банды четырех» книги. Это должно быть очень простой реализацией этого.

Прежде всего, у вас есть подписчики. Они подписываются на различные виды событий. Каждый раз, когда мероприятие публикуется, вы должны уведомить абонента об этом.

Сначала создайте два метода подписки на событие и для публикации.

# src/event.py

from collections import defaultdict

subscribers = defaultdict(list)

def subscribe(event_type, fn):
    subscribers[event_type].append(fn)

def post_event(event_type, data): 
    if event_type in subscribers:
        for fn in subscribers[event_type]:
            fn(data)

В этом примере я буду создавать нового пользователя, который вызовет событие new_user_created Анкет Подписчик для этого события будет модуль для отправки приветственного сообщения по электронной почте.

Давайте создадим пользователя. Это будет простой словарь, но это тоже может быть объект.

# src/user.py

from .event import post_event

def register_new_user(name, password, email ):
    user = dict( name = name, 
                password = password, 
                email = email)

    post_event("new_user_created", user)

Теперь обрабатывайте отправку электронных писем. Нам понадобится две вещи здесь. Какой -то поставщик электронной почты и какой -то обработчик для этого. Я хочу, чтобы мой класс поставщиков электронной почты занимался только «бизнес -логикой», поэтому я создал модули папка для такого.

# src/modules/email.py 

class Email:
    def sendEmail( email, subject, message ):
        print()
        print(f"From: {email}")
        print(f"Subject: {subject}")
        print(message)
        print()

И обработчик по электронной почте.

# src/email_handler.py

from .modules.email import Email 
from .event import subscribe

def handle_user_registered_event(user):
    Email.sendEmail(user['email'], 'Welcome!', 'Some welcome message')

def setup_email_event_handlers():
    subscribe('new_user_created', handle_user_registered_event)

Теперь давайте соединим все вместе. Сделайте файл приложения.

# src/app.py

from src.user import register_new_user
from src.email_handler import setup_email_event_handlers

setup_email_event_handlers()

register_new_user('Jakub', 'secret', 'name@domain.com')

Первое, что нужно сделать, это установить события обработчика электронной почты. Это означает, что подпишитесь на событие new_user_created Анкет

Если все подписки сделаны, зарегистрируйте нового пользователя. Вывод должен быть следующим образом.

==========================
From: name@domain.com
Subject: Welcome!
Some welcome message
==========================

Расширить систему

Что если мы хотим расширить это. Давайте добавим немного базы данных. В модулях добавить Database.py Файл и образец реализации базы данных в качестве списка пользователей.

# src/modules/database.py 

class DB:
    users = [ ]
    def register_new_user( user ):
        DB.users.append(user)
        print('=======================')
        print(DB.users)
        print('=======================')

И обработчик для этого.

# src/database_handler.py

from .modules.database import DB
from .event import subscribe

def handle_user_registered_event(user):
    DB.register_new_user(user)

def setup_database_event_handlers():
    subscribe('new_user_created', handle_user_registered_event)

Теперь Единственное, что нужно сделать, это установить обработчик базы данных в файле приложения.

# src/app.py

from src.user import register_new_user
from src.email_handler import setup_email_event_handlers
from src.database_handler import setup_database_event_handlers 

setup_email_event_handlers()
setup_database_event_handlers()

register_new_user('Jakub', 'secret', 'name@domain.com')

Теперь результат должен быть:

==========================
From: name@domain.com
Subject: Welcome!
Some welcome message
==========================
==========================
[{'name': 'Jakub', 'password': 'secret', 'email': 'name@domain.com'}]
==========================

Хорошая вещь в этом, так это то, что этот путь держит ваши модули независимыми друг от друга и сохраняет их сплоченность. Кроме того, это позволяет вам очень быстро манипулировать порядок слушателей. Нет больше ‘ctrl + c | Ctrl + V ‘весь код 😉.

Комментарий что вы думаете об этом! Как вы создаете рабочий процесс в своих приложениях между несколькими модулями?

Оригинал: “https://dev.to/kuba_szw/build-your-own-event-system-in-python-5hk6”