Автор оригинала: Doug Hellmann.
Класс Pool
можно использовать для создания простой односерверной реализации MapReduce. Хотя он не дает всех преимуществ распределенной обработки, он показывает, насколько легко разбить некоторые проблемы на распределяемые единицы работы.
В системе на основе MapReduce входные данные разбиваются на части для обработки различными рабочими экземплярами. Каждый фрагмент входных данных отображается в промежуточное состояние с помощью простого преобразования. Затем промежуточные данные собираются вместе и разделяются на основе значения ключа, так что все связанные значения находятся вместе. Наконец, разделенные данные сокращаются до набора результатов.
multiprocessing_mapreduce.py
import collections import itertools import multiprocessing class SimpleMapReduce: def __init__(self, map_func, reduce_func, num_workersNone): """ map_func Function to map inputs to intermediate data. Takes as argument one input value and returns a tuple with the key and a value to be reduced. reduce_func Function to reduce partitioned version of intermediate data to final output. Takes as argument a key as produced by map_func and a sequence of the values associated with that key. num_workers The number of workers to create in the pool. Defaults to the number of CPUs available on the current host. """ self.map_func map_func self.reduce_func reduce_func self.pool multiprocessing.Pool(num_workers) def partition(self, mapped_values): """Organize the mapped values by their key. Returns an unsorted sequence of tuples with a key and a sequence of values. """ partitioned_data collections.defaultdict(list) for key, value in mapped_values: partitioned_data[key].append(value) return partitioned_data.items() def __call__(self, inputs, chunksize1): """Process the inputs through the map and reduce functions given. inputs An iterable containing the input data to be processed. > The portion of the input data to hand to each worker. This can be used to tune performance during the mapping phase. """ map_responses self.pool.map( self.map_func, inputs, chunksizechunksize, ) partitioned_data self.partition( itertools.chain(*map_responses) ) reduced_values self.pool.map( self.reduce_func, partitioned_data, ) return reduced_values
В следующем примере сценария используется SimpleMapReduce для подсчета «слов» в источнике reStructuredText для этой статьи, игнорируя некоторую разметку.
multiprocessing_wordcount.py
import multiprocessing import string from multiprocessing_mapreduce import SimpleMapReduce def file_to_words(filename): """Read a file and return a sequence of (word, occurences) values. """ STOP_WORDS set([ 'a', 'an', 'and', 'are', 'as', 'be', 'by', 'for', 'if', 'in', 'is', 'it', 'of', 'or', 'py', 'rst', 'that', 'the', 'to', 'with', ]) TR str.maketrans({ p: ' ' for p in string.punctuation }) print('{} reading {}'.format( multiprocessing.current_process().name, filename)) output [] with open(filename, 'rt') as f: for line in f: # Skip comment lines. if line.lstrip().startswith('..'): continue line line.translate(TR) # Strip punctuation for word in line.split(): word word.lower() if word.isalpha() and word not in STOP_WORDS: output.append((word, 1)) return output def count_words(item): """Convert the partitioned data for a word to a tuple containing the word and the number of occurences. """ word, occurences item return (word, sum(occurences)) if __name__ '__main__': import operator import glob input_files glob.glob('*.rst') mapper SimpleMapReduce(file_to_words, count_words) word_counts mapper(input_files) word_counts.sort(keyoperator.itemgetter(1)) word_counts.reverse() print('\nTOP 20 WORDS BY FREQUENCY\n') top20 word_counts[:20] longest max(len(word) for word, count in top20) for word, count in top20: print('{word:<{len}}: {count:5}'.format( lenlongest + 1, wordword, countcount) )
Функция file_to_words ()
преобразует каждый входной файл в последовательность кортежей, содержащих слово и число 1
(представляющих одно вхождение). Данные разделяются с помощью partition ()
с использованием слова в качестве ключа, поэтому результирующая структура состоит из ключа и последовательности значений 1
, представляющих каждое вхождение слова. Разделенные данные преобразуются в набор кортежей, содержащих слово и счетчик этого слова с помощью count_words ()
на этапе сокращения.
$ python3 -u multiprocessing_wordcount.py ForkPoolWorker-1 reading basics.rst ForkPoolWorker-2 reading communication.rst ForkPoolWorker-3 reading index.rst ForkPoolWorker-4 reading mapreduce.rst TOP 20 WORDS BY FREQUENCY process : 83 running : 45 multiprocessing : 44 worker : 40 starting : 37 now : 35 after : 34 processes : 31 start : 29 header : 27 pymotw : 27 caption : 27 end : 27 daemon : 22 can : 22 exiting : 21 forkpoolworker : 21 consumer : 20 main : 18 event : 16