Автор оригинала: Doug Hellmann.
Как и в случае с потоками, обычным шаблоном использования нескольких процессов является разделение задания между несколькими рабочими процессами для параллельного выполнения. Эффективное использование нескольких процессов обычно требует некоторой связи между ними, чтобы можно было разделить работу и агрегировать результаты. Простой способ связи между процессами с помощью multiprocessing
– использовать Queue
для передачи сообщений туда и обратно. Любой объект, который может быть сериализован с помощью pickle, может проходить через Queue
.
multiprocessing_queue.py
import multiprocessing class MyFancyClass: def __init__(self, name): self.name name def do_something(self): proc_name multiprocessing.current_process().name print('Doing something fancy in {} for {}!'.format( proc_name, self.name)) def worker(q): obj q.get() obj.do_something() if __name__ '__main__': queue multiprocessing.Queue() p multiprocessing.Process(targetworker, args(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join()
Этот короткий пример передает только одно сообщение одному исполнителю, а затем основной процесс ожидает завершения рабочего.
$ python3 multiprocessing_queue.py Doing something fancy in Process-1 for Fancy Dan!
В более сложном примере показано, как управлять несколькими рабочими процессами, которые потребляют данные из JoinableQueue
и передают результаты обратно родительскому процессу. Техника отравляющих таблеток используется, чтобы остановить рабочих. После настройки реальных задач основная программа добавляет одно значение «остановки» для каждого рабочего в очередь заданий. Когда рабочий встречает специальное значение, он выходит из цикла обработки. Главный процесс использует метод join ()
очереди задач, чтобы дождаться завершения всех задач перед обработкой результатов.
multiprocessing_producer_consumer.py
import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue task_queue self.result_queue result_queue def run(self): proc_name self.name while True: next_task self.task_queue.get() if next_task is None: # Poison pill means shutdown print('{}: Exiting'.format(proc_name)) self.task_queue.task_done() break print('{}: {}'.format(proc_name, next_task)) answer next_task() self.task_queue.task_done() self.result_queue.put(answer) class Task: def __init__(self, a, b): self.a a self.b b def __call__(self): time.sleep(0.1) # pretend to take time to do the work return '{self.a} * {self.b} = {product}'.format( selfself, productself.a * self.b) def __str__(self): return '{self.a} * {self.b}'.format(selfself) if __name__ '__main__': # Establish communication queues tasks multiprocessing.JoinableQueue() results multiprocessing.Queue() # Start consumers num_consumers multiprocessing.cpu_count() * 2 print('Creating {} consumers'.format(num_consumers)) consumers [ Consumer(tasks, results) for i in range(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result results.get() print('Result:', result) num_jobs 1
Хотя задания поступают в очередь по порядку, их выполнение распараллеливается, поэтому нет гарантии, в каком порядке они будут выполнены.
$ python3 -u multiprocessing_producer_consumer.py Creating 8 consumers Consumer-1: 0 * 0 Consumer-2: 1 * 1 Consumer-3: 2 * 2 Consumer-4: 3 * 3 Consumer-5: 4 * 4 Consumer-6: 5 * 5 Consumer-7: 6 * 6 Consumer-8: 7 * 7 Consumer-3: 8 * 8 Consumer-7: 9 * 9 Consumer-4: Exiting Consumer-1: Exiting Consumer-2: Exiting Consumer-5: Exiting Consumer-6: Exiting Consumer-8: Exiting Consumer-7: Exiting Consumer-3: Exiting Result: 6 * 6 = 36 Result: 2 * 2 = 4 Result: 3 * 3 = 9 Result: 0 * 0 = 0 Result: 1 * 1 = 1 Result: 7 * 7 = 49 Result: 4 * 4 = 16 Result: 5 * 5 = 25 Result: 8 * 8 = 64 Result: 9 * 9 = 81