Рубрики
Без рубрики

Реализация MapReduce

Автор оригинала: Doug Hellmann.

Класс Pool можно использовать для создания простой односерверной реализации MapReduce. Хотя он не дает всех преимуществ распределенной обработки, он показывает, насколько легко разбить некоторые проблемы на распределяемые единицы работы.

В системе на основе MapReduce входные данные разбиваются на части для обработки различными рабочими экземплярами. Каждый фрагмент входных данных отображается в промежуточное состояние с помощью простого преобразования. Затем промежуточные данные собираются вместе и разделяются на основе значения ключа, так что все связанные значения находятся вместе. Наконец, разделенные данные сокращаются до набора результатов.

multiprocessing_mapreduce.py

import collections
import itertools
import multiprocessing


class SimpleMapReduce:

    def __init__(self, map_func, reduce_func, num_workersNone):
        """
        map_func

          Function to map inputs to intermediate data. Takes as
          argument one input value and returns a tuple with the
          key and a value to be reduced.

        reduce_func

          Function to reduce partitioned version of intermediate
          data to final output. Takes as argument a key as
          produced by map_func and a sequence of the values
          associated with that key.

        num_workers

          The number of workers to create in the pool. Defaults
          to the number of CPUs available on the current host.
        """
        self.map_func  map_func
        self.reduce_func  reduce_func
        self.pool  multiprocessing.Pool(num_workers)

    def partition(self, mapped_values):
        """Organize the mapped values by their key.
        Returns an unsorted sequence of tuples with a key
        and a sequence of values.
        """
        partitioned_data  collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize1):
        """Process the inputs through the map and reduce functions
        given.

        inputs
          An iterable containing the input data to be processed.

       >
          The portion of the input data to hand to each worker.
          This can be used to tune performance during the mapping
          phase.
        """
        map_responses  self.pool.map(
            self.map_func,
            inputs,
            chunksizechunksize,
        )
        partitioned_data  self.partition(
            itertools.chain(*map_responses)
        )
        reduced_values  self.pool.map(
            self.reduce_func,
            partitioned_data,
        )
        return reduced_values

В следующем примере сценария используется SimpleMapReduce для подсчета «слов» в источнике reStructuredText для этой статьи, игнорируя некоторую разметку.

multiprocessing_wordcount.py

import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce


def file_to_words(filename):
    """Read a file and return a sequence of
    (word, occurences) values.
    """
    STOP_WORDS  set([
        'a', 'an', 'and', 'are', 'as', 'be', 'by', 'for', 'if',
        'in', 'is', 'it', 'of', 'or', 'py', 'rst', 'that', 'the',
        'to', 'with',
    ])
    TR  str.maketrans({
        p: ' '
        for p in string.punctuation
    })

    print('{} reading {}'.format(
        multiprocessing.current_process().name, filename))
    output  []

    with open(filename, 'rt') as f:
        for line in f:
            # Skip comment lines.
            if line.lstrip().startswith('..'):
                continue
            line  line.translate(TR)  # Strip punctuation
            for word in line.split():
                word  word.lower()
                if word.isalpha() and word not in STOP_WORDS:
                    output.append((word, 1))
    return output


def count_words(item):
    """Convert the partitioned data for a word to a
    tuple containing the word and the number of occurences.
    """
    word, occurences  item
    return (word, sum(occurences))


if __name__  '__main__':
    import operator
    import glob

    input_files  glob.glob('*.rst')

    mapper  SimpleMapReduce(file_to_words, count_words)
    word_counts  mapper(input_files)
    word_counts.sort(keyoperator.itemgetter(1))
    word_counts.reverse()

    print('\nTOP 20 WORDS BY FREQUENCY\n')
    top20  word_counts[:20]
    longest  max(len(word) for word, count in top20)
    for word, count in top20:
        print('{word:<{len}}: {count:5}'.format(
            lenlongest + 1,
            wordword,
            countcount)
        )

Функция file_to_words () преобразует каждый входной файл в последовательность кортежей, содержащих слово и число 1 (представляющих одно вхождение). Данные разделяются с помощью partition () с использованием слова в качестве ключа, поэтому результирующая структура состоит из ключа и последовательности значений 1 , представляющих каждое вхождение слова. Разделенные данные преобразуются в набор кортежей, содержащих слово и счетчик этого слова с помощью count_words () на этапе сокращения.

$ python3 -u multiprocessing_wordcount.py

ForkPoolWorker-1 reading basics.rst
ForkPoolWorker-2 reading communication.rst
ForkPoolWorker-3 reading index.rst
ForkPoolWorker-4 reading mapreduce.rst

TOP 20 WORDS BY FREQUENCY

process         :    83
running         :    45
multiprocessing :    44
worker          :    40
starting        :    37
now             :    35
after           :    34
processes       :    31
start           :    29
header          :    27
pymotw          :    27
caption         :    27
end             :    27
daemon          :    22
can             :    22
exiting         :    21
forkpoolworker  :    21
consumer        :    20
main            :    18
event           :    16