Рубрики
Без рубрики

Параллелизовать обработку большого файла AWS S3

Этот пост демонстрирует подход к обработке большого файла S3 в управляемые кусочки, работающие параллельно, используя выбирать AWS S3. Это самый быстрый и самый дешевый подход к обработке файлов за считанные минуты. Помечено с AWS, Python, Showdev, производительностью.

В моем последнем посте мы обсудили достижение эффективности в обработке большого файла AWS S3 через S3 Select. Обработка была своего рода последовательной, и она может занять века для большого файла. Итак, как мы распараллелизуем обработку в нескольких единицах? 🤔 Ну, в этом посте мы будем реализовать его и увидеть его работать!

📝 Я настоятельно рекомендую проверить мой последний пост на Потоковое файло S3 через S3-Select Чтобы установить контекст для этого поста.

Я всегда люблю сломать проблему на меньшие кусочки, необходимые для ее решения (аналитический подход). Давайте попробуем решить это в 3 простых шагах:

1. Найти общие байты файла S3

Очень похоже на 1-й шаг нашего последнего сообщения, а также мы пытаемся найти размер файла сначала. Следующий фрагмент кода показывает функцию, которая выполнит запрос на голову в нашем файле S3 и определяет размер файла в байтах.

# core/utils.py

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size

2. Создать целлюлозное задание для обработки куска

Здесь мы определим задачу сельдерей для обработки куска файлов (который будет выполнен параллельно позже). Общая обработка здесь будет выглядеть так:

  • Получить Начать и Конец байты этого куска как аргумент
  • Получите эту часть файла S3 через S3-Select и храните его локально во временном файле (в качестве CSV в этом примере)
  • Прочитайте этот временный файл и выполните любую обработку
  • Удалить этот временный файл

📝 Я срок этой задачи в качестве процессора куска файлов. Это обрабатывает кусок из файла. Запуск нескольких этих задач завершает обработку всего файла.

# core/tasks.py

@celery.task(name='core.tasks.chunk_file_processor', bind=True)
def chunk_file_processor(self, **kwargs):
    """ Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    filename = kwargs.get('filename')
    start_byte_range = kwargs.get('start_byte_range')
    end_byte_range = kwargs.get('end_byte_range')
    header_row_str = kwargs.get('header_row_str')
    local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
    file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)

    logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
    try:
        # 1. fetch data from S3 and store it in a file
        store_scrm_file_s3_content_in_local_file(
            bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
            end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)

        # 2. Process the chunk file in temp folder
        id_set = set()
        with open(file_path) as csv_file:
            csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
            for row in csv_reader:
                # perform any other processing here
                id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')

        # 3. delete local file
        if path.exists(file_path):
            unlink(file_path)
    except Exception:
        logger.exception(f'Error in file processor: {filename}')

3. Выполнить несколько целлюлозных задач параллельно

Это самый интересный шаг в этом потоке. Мы создадим несколько целлюлозных задач, чтобы работать параллельно через Группа сельдерей Отказ Как только мы знаем общие байты файла в S3 (с шага 1), мы рассчитываем Начните и Конец байты Для куска и вызовите задачу, которую мы создали на шаге 2 через группу сельдерея. Начать и Конец байты Диапазон – это непрерывный диапазон размера файла. Необязательно, мы также можем вызвать задачу обратного вызова (результат) после завершения всех наших задач обработки.

# core/tasks.py

@celery.task(name='core.tasks.s3_parallel_file_processing', bind=True)
def s3_parallel_file_processing_task(self, **kwargs):
    """ Creates celery tasks to process chunks of file in parallel
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    try:
        filename = key
        # 1. Check file headers for validity -> if failed, stop processing
        desired_row_headers = (
            'id',
            'name',
            'age',
            'latitude',
            'longitude',
            'monthly_income',
            'experienced'
        )
        is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select(
            bucket=bucket,
            key=key,
            delimiter=S3_FILE_DELIMITER,
            desired_headers=desired_row_headers)
        if not is_headers_valid:
            logger.error(f'{filename} file headers validation failed')
            return False
        logger.info(f'{filename} file headers validation successful')

        # 2. fetch file size via S3 HEAD
        file_size = get_s3_file_size(bucket=bucket, key=key)
        if not file_size:
            logger.error(f'{filename} file size invalid {file_size}')
            return False
        logger.info(f'We are processing {filename} file about {file_size} bytes :-o')

        # 2. Create celery group tasks for chunk of this file size for parallel processing
        start_range = 0
        end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size)
        tasks = []
        while start_range < file_size:
            tasks.append(
                chunk_file_processor.signature(
                    kwargs={
                        'bucket': bucket,
                        'key': key,
                        'filename': filename,
                        'start_byte_range': start_range,
                        'end_byte_range': end_range,
                        'header_row_str': header_row_str
                    }
                )
            )
            start_range = end_range
            end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range)
        job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename}))
        _ = job.apply_async()
    except Exception:
        logger.exception(f'Error processing file: {filename}')


@celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False)
def chunk_file_processor_callback(self, *args, **kwargs):
    """ Callback task called post chunk_file_processor()
    """
    logger.info('Callback called')
# core/utils.py

def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int,
                                             delimiter: str, header_row: str):
    """Retrieves S3 file content via S3 Select ScanRange and store it in a local file.
       Make sure the header validation is done before calling this.

    Args:
        bucket (str): S3 bucket
        key (str): S3 key
        file_path (str): Local file path to store the contents
        start_range (int): Start range of ScanRange parameter of S3 Select
        end_range (int): End range of ScanRange parameter of S3 Select
        delimiter (str): S3 file delimiter
        header_row (str): Header row of the local file. This will be inserted as first line in local file.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    try:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'CSV': {
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n',
                },
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        """
        f = open(file_path, 'wb')  # we receive data in bytes and hence opening file in bytes
        f.write(header_row.encode())
        f.write('\n'.encode())
        for event in response['Payload']:
            if records := event.get('Records'):
                f.write(records['Payload'])
        f.close()
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    except Exception:
        logger.exception(f'Error reading S3 file {bucket} : {key}')

Вот и все! 😎 Теперь, вместо того, чтобы потокотать файл S3 bytes bytes, мы распарализовываем обработку одновременно обработкой кусков. Это было не так жестко, не так ли? 😅

🔍 Сравнение времени обработки

Если мы сравним время обработки одного файла, который мы обработали в нашем последнем посте с таким подходом, обработка работает примерно 68% быстрее (с тем же оборудованием и конфигурацией). 😆

Размер файла 4.8mb. 4.8mb.
Время обработки ~ 37 секунд ~ 12 секунд

✔️ Преимущества этого подхода

  • Очень большой файл, содержащий миллионы записей, могут быть обработаны в течение нескольких минут. Я использовал этот подход в производственной среде некоторое время, и это очень блаженно
  • Вычисление и обработка распределены среди распределенных работников
  • Скорость обработки может быть настроен на наличие рабочих бассейнов
  • Нет больше проблем с памятью

📌 Вы можете проверить мою репозиторий GitHub для полного рабочего примера этого подхода 👇

Idris-Rampurawala/S3-Select-Demo

Этот проект демонстрирует функцию Beich AWS S3 Select, чтобы поток большого файла данных в странном стиле.

Этот проект демонстрирует богатые AWS S3 Выберите Функция, чтобы поток большого файла данных в Пэгированный стиль Отказ

В настоящее время S3 Выберите не поддерживает Смещение И, следовательно, мы не можем запугать результаты запроса. Следовательно, мы используем ScanRange функция, чтобы поток содержимого файла S3.

Импорт (чтение) Большой файл ведет Из памяти ошибка. Это также может привести к мероприятию системы сбоя системы. Есть библиотеки, а именно. Панды, дач и др. которые очень хороши при обработке больших файлов, но опять же файл должен присутствовать локально то есть. Нам придется импортировать его с S3 на нашу локальную машину. Но что, если мы не хотим получать и хранить весь файл S3 локально сразу? 🤔

Ну, мы можем использовать AWS S3 Выберите Чтобы поток большого файла через это ScanRange параметр. Этот подход…

📑 ресурсы

Оригинал: “https://dev.to/idrisrampurawala/parallelize-processing-a-large-aws-s3-file-8eh”