Рубрики
Без рубрики

Передача сообщений процессам

Автор оригинала: Doug Hellmann.

Как и в случае с потоками, обычным шаблоном использования нескольких процессов является разделение задания между несколькими рабочими процессами для параллельного выполнения. Эффективное использование нескольких процессов обычно требует некоторой связи между ними, чтобы можно было разделить работу и агрегировать результаты. Простой способ связи между процессами с помощью multiprocessing – использовать Queue для передачи сообщений туда и обратно. Любой объект, который может быть сериализован с помощью pickle, может проходить через Queue .

multiprocessing_queue.py

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name  name

    def do_something(self):
        proc_name  multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj  q.get()
    obj.do_something()


if __name__  '__main__':
    queue  multiprocessing.Queue()

    p  multiprocessing.Process(targetworker, args(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

Этот короткий пример передает только одно сообщение одному исполнителю, а затем основной процесс ожидает завершения рабочего.

$ python3 multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!

В более сложном примере показано, как управлять несколькими рабочими процессами, которые потребляют данные из JoinableQueue и передают результаты обратно родительскому процессу. Техника отравляющих таблеток используется, чтобы остановить рабочих. После настройки реальных задач основная программа добавляет одно значение «остановки» для каждого рабочего в очередь заданий. Когда рабочий встречает специальное значение, он выходит из цикла обработки. Главный процесс использует метод join () очереди задач, чтобы дождаться завершения всех задач перед обработкой результатов.

multiprocessing_producer_consumer.py

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue  task_queue
        self.result_queue  result_queue

    def run(self):
        proc_name  self.name
        while True:
            next_task  self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer  next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a  a
        self.b  b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            selfself, productself.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(selfself)


if __name__  '__main__':
    # Establish communication queues
    tasks  multiprocessing.JoinableQueue()
    results  multiprocessing.Queue()

    # Start consumers
    num_consumers  multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers  [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs  10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result  results.get()
        print('Result:', result)
        num_jobs  1

Хотя задания поступают в очередь по порядку, их выполнение распараллеливается, поэтому нет гарантии, в каком порядке они будут выполнены.

$ python3 -u multiprocessing_producer_consumer.py

Creating 8 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-3: 8 * 8
Consumer-7: 9 * 9
Consumer-4: Exiting
Consumer-1: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-6: Exiting
Consumer-8: Exiting
Consumer-7: Exiting
Consumer-3: Exiting
Result: 6 * 6 = 36
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 7 * 7 = 49
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 8 * 8 = 64
Result: 9 * 9 = 81