Рубрики
Без рубрики

Давайте синхронизируем потоки в Python

(почти) вводный пост о стандартных примитивах синхронизации Python

Автор оригинала: Saurabh Chaturvedi.

Потому что синхронность-это гармония

Внутри множества функций Python язык поддерживает множество примитивов синхронизации. Источник изображения: Искусство Хайку Heroku Python

Для меня это был волшебный момент “ага”, когда я впервые узнал о многопоточности. Тот факт, что я могу попросить свой компьютер выполнять действия параллельно, привел меня в восторг (хотя здесь следует отметить, что на одном базовом компьютере все происходит не совсем параллельно, и, что более важно, они не выполняются в параллельном смысле в Python из-за глобальной блокировки интерпретатора языка). Многопоточность открывает новые возможности для вычислений, но вместе с мощью приходит и ответственность.

Есть очевидные проблемы, которые можно себе представить с многопоточностью — многие потоки, пытающиеся получить доступ к одному и тому же фрагменту данных, могут привести к таким проблемам, как несогласованность данных или искажение вывода (например, наличие HWeolrldo вместо Hello World на вашей консоли). Такие проблемы могут возникнуть, когда мы не сообщаем компьютеру, как организованно управлять потоками.

Но как мы можем “сказать” компьютеру, чтобы он синхронизировал потоки нашей программы? Мы делаем это с помощью примитивов синхронизации . Это простые программные механизмы для обеспечения того, чтобы ваши потоки работали гармонично друг с другом.

В этом посте представлены некоторые из самых популярных примитивов синхронизации в Python, определенных в его стандарте threading.py модуль. Большинство методов блокировки (т. Е. Методов, Которые Блокируют выполнение определенного потока до тех пор, пока не будет выполнено какое-либо условие) этих примитивов обеспечивают необязательную функциональность timeout, но я не включил ее здесь для простоты. Кроме того, я только что включил основные функциональные возможности этих объектов, опять же для простоты. В этом посте предполагается, что у вас есть базовые знания о реализации многопоточности с использованием Python.

Мы будем изучать Блокировки , RLocks , Семафоры , События , Условия и Барьеры . Конечно, вы можете создать свои собственные примитивы синхронизации, создав подклассы этих классов. Мы начнем с Замков , поскольку они являются простейшими примитивами, и постепенно перейдем к примитивам со все большей и большей сложностью.

Замки

Блокировки – это, пожалуй, самый простой примитив синхронизации в Python. A Lock имеет только два состояния — locked и unlocked (сюрприз). Он создается в разблокированном состоянии и имеет два основных метода — acquire() и release() . Метод acquire() блокирует Lock и блокирует выполнение до тех пор, пока метод release() в какой-либо другой сопрограмме не установит его разблокированным. Затем он снова блокирует Lock и возвращает True . Метод release() должен вызываться только в заблокированном состоянии, он устанавливает состояние разблокированным и немедленно возвращается. Если release() вызывается в разблокированном состоянии, возникает ошибка времени выполнения .

Вот код, который использует примитив Lock для безопасного доступа к общей переменной:

#lock_tut.py

from threading import Lock, Thread
lock = Lock()
g = 0

def add_one():
   """
   Just used for demonstration. It's bad to use the 'global'
   statement in general.
   """
   global g
   lock.acquire()
   g += 1
   lock.release()

def add_two():
   global g
   lock.acquire()
   g += 2
   lock.release()

threads = []
for func in [add_one, add_two]:
   threads.append(Thread(target=func))
   threads[-1].start()

for thread in threads:
   """
   Waits for threads to complete before moving on with the main
   script.
   """
   thread.join()

print(g)

Это просто дает результат 3, но теперь мы уверены, что две функции не изменяют значение глобальной переменной g одновременно, хотя они работают в двух разных потоках. Таким образом, Блокировки можно использовать, чтобы избежать несогласованного вывода, позволяя только одному потоку изменять данные одновременно.

Горные породы

Стандартный Lock не знает, какой поток в данный момент удерживает lock. Если блокировка удерживается, любой поток, который пытается ее получить, будет заблокирован, даже если сам тот же поток уже удерживает блокировку. В таких случаях используется Locks (reentrantlock). Вы можете расширить код в следующем фрагменте, добавив инструкции вывода для демонстрации того, как Блокировки могут предотвратить нежелательную блокировку.

#rlock_tut.py

import threading

num = 0
lock = Threading.Lock()

lock.acquire()
num += 1
lock.acquire() # This will block.
num += 2
lock.release()

# With RLock, that problem doesn't happen.
lock = Threading.RLock()

lock.acquire()
num += 3
lock.acquire() # This won't block.
num += 4
lock.release()
lock.release() # You need to call release once for each call to acquire.

Одним из хороших вариантов использования RLock -s является рекурсия, когда родительский вызов функции в противном случае заблокировал бы ее вложенный вызов. Таким образом, основное использование Lock s-это вложенный доступ к общим ресурсам.

Семафоры

Семафоры-это просто продвинутые счетчики. Вызов семафора acquire() будет заблокирован только после того, как несколько потоков будут иметь acquire() -edit. Соответствующий счетчик уменьшается на вызов acquire() и увеличивается на вызов release () . Ошибка ValueError произойдет, если вызовы release() попытаются увеличить счетчик сверх назначенного максимального значения (которое является числом потоков, которые могут получить() семафор до того, как произойдет блокировка). Следующий код демонстрирует использование семафоров в простой задаче “производитель-потребитель”.

#semaphores_tut.py

import random, time
from threading import BoundedSemaphore, Thread
max_items = 5
"""
Consider 'container' as a container, of course, with a capacity of 5
items. Defaults to 1 item if 'max_items' is passed.
"""
container = BoundedSemaphore(max_items)
def producer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        try:
            container.release()
            print("Produced an item.")
        except ValueError:
            print("Full, skipping.")
def consumer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        """
        In the following if statement we disable the default
        blocking behaviour by passing False for the blocking flag.
        """
        if container.acquire(False):
            print("Consumed an item.")
        else:
            print("Empty, skipping.")
threads = []
nloops = random.randrange(3, 6)
print("Starting with %s items." % max_items)
threads.append(Thread(target=producer, args=(nloops,)))
threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
for thread in threads:  # Starts all the threads.
    thread.start()
for thread in threads:  # Waits for threads to complete before moving on with the main script.
    thread.join()
print("All done.")

semaphore_tut.py в действии

Модуль threading также предоставляет простой класс Semaphore . Семафор предоставляет не ограниченный счетчик, который позволяет вызывать release() любое количество раз для увеличения. Однако, чтобы избежать ошибок программирования, обычно правильным выбором является использование BoundedSemaphore , который вызывает ошибку, если вызов release() пытается увеличить счетчик сверх его максимального размера.

Семафоры обычно используются для ограничения ресурса, например, для того, чтобы сервер мог обрабатывать только 10 клиентов одновременно. В таком случае несколько потоковых соединений конкурируют за ограниченный ресурс (в нашем примере это сервер).

События

Примитив синхронизации Event действует как простой коммуникатор между потоками. Они основаны на внутреннем флаге, который потоки могут установить() или очистить() . Другие потоки могут ждать () , пока внутренний флаг будет установлен() . Метод wait() блокируется до тех пор, пока флаг не станет истинным. Следующий фрагмент демонстрирует, как События можно использовать для запуска действий.

#event_tut.py

import random, time
from threading import Event, Thread

event = Event()

def waiter(event, nloops):
    for i in range(nloops):
    print("%s. Waiting for the flag to be set." % (i+1))
    event.wait() # Blocks until the flag becomes true.
    print("Wait complete at:", time.ctime())
    event.clear() # Resets the flag.
    print()

def setter(event, nloops):
    for i in range(nloops):
    time.sleep(random.randrange(2, 5)) # Sleeps for some time.
    event.set()

threads = []
nloops = random.randrange(3, 6)

threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()

for thread in threads:
    thread.join()

print("All done.")

Исполнение event_tut.py

Условия

Объект Condition – это просто более продвинутая версия объекта Event . Он также действует как коммуникатор между потоками и может использоваться для уведомления() других потоков об изменении состояния программы. Например, он может использоваться для сигнализации о наличии ресурса для потребления. Другие потоки также должны получить() условие (и, следовательно, связанную с ним блокировку) до ожидания () выполнения условия. Кроме того, поток должен освободить() Условие после завершения соответствующих действий, чтобы другие потоки могли получить условие для своих целей. Следующий код демонстрирует реализацию еще одной простой проблемы "производитель-потребитель" с помощью объекта Condition .

#condition_tut.py

import random, time
from threading import Condition, Thread
"""
'condition' variable will be used to represent the availability of a produced
item.
"""
condition = Condition()
box = []
def producer(box, nitems):
    for i in range(nitems):
        time.sleep(random.randrange(2, 5))  # Sleeps for some time.
        condition.acquire()
        num = random.randint(1, 10)
        box.append(num)  # Puts an item into box for consumption.
        condition.notify()  # Notifies the consumer about the availability.
        print("Produced:", num)
        condition.release()
def consumer(box, nitems):
    for i in range(nitems):
        condition.acquire()
        condition.wait()  # Blocks until an item is available for consumption.
        print("%s: Acquired: %s" % (time.ctime(), box.pop()))
        condition.release()
threads = []
"""
'nloops' is the number of times an item will be produced and
consumed.
"""
nloops = random.randrange(3, 6)
for func in [producer, consumer]:
    threads.append(Thread(target=func, args=(box, nloops)))
    threads[-1].start()  # Starts the thread.
for thread in threads:
    """Waits for the threads to complete before moving on
       with the main script.
    """
    thread.join()
print("All done.")

Выход condition_tut.py

Могут быть и другие варианты использования Условий . Я думаю, что они будут полезны, когда вы разрабатываете потоковый API, который уведомляет ожидающего клиента, как только часть данных доступна.

Барьеры

A Барьер – это простой примитив синхронизации, который может использоваться различными потоками для ожидания друг друга. Каждый поток пытается пройти барьер, вызывая метод wait () , который будет блокироваться до тех пор, пока все потоки не выполнят этот вызов. Как только это происходит, потоки освобождаются одновременно. Следующий фрагмент демонстрирует использование Барьеров .

#barrier_tut.py

from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num = 4
# 4 threads will need to pass this barrier to get released.
b = Barrier(num)
names = ["Harsh", "Lokesh", "George", "Iqbal"]

def player():
    name = names.pop()
    sleep(randrange(2, 5))
    print("%s reached the barrier at: %s" % (name, ctime()))
    b.wait()
    
threads = []
print("Race starts now…")

for i in range(num):
    threads.append(Thread(target=player))
    threads[-1].start()
"""
Following loop enables waiting for the threads to complete before moving on with the main script.
"""
for thread in threads:
    thread.join()
print()
print("Race over!")

Вот результат barrier_tut.py

Барьеры могут найти много применений; одним из них является синхронизация сервера и клиента, поскольку сервер должен ждать клиента после инициализации.

На этом мы подошли к концу нашего обсуждения примитивов синхронизации в Python. Я написал этот пост как решение упражнения в книге “Программирование основных приложений Python” Уэсли Чуна. Если вам понравился этот пост, подумайте о том, чтобы взглянуть на другие мои работы из этой книги на GitHub и в репозитории.. Суть кода, упомянутого в этой статье, также доступна в моем профиле.

Источники: effbot.org , bogotobogo.com , Документы Python

Я новичок в блогах, поэтому конструктивная критика не только приветствуется, но и очень нужна!

Первоначально я написал этот пост на Medium .