В моем последнем посте мы обсудили достижение эффективности в обработке большого файла AWS S3 через S3 Select. Обработка была своего рода последовательной, и она может занять века для большого файла. Итак, как мы распараллелизуем обработку в нескольких единицах? 🤔 Ну, в этом посте мы будем реализовать его и увидеть его работать!
📝 Я настоятельно рекомендую проверить мой последний пост на Потоковое файло S3 через S3-Select Чтобы установить контекст для этого поста.
Я всегда люблю сломать проблему на меньшие кусочки, необходимые для ее решения (аналитический подход). Давайте попробуем решить это в 3 простых шагах:
1. Найти общие байты файла S3
Очень похоже на 1-й шаг нашего последнего сообщения, а также мы пытаемся найти размер файла сначала. Следующий фрагмент кода показывает функцию, которая выполнит запрос на голову в нашем файле S3 и определяет размер файла в байтах.
# core/utils.py def get_s3_file_size(bucket: str, key: str) -> int: """Gets the file size of S3 object by a HEAD request Args: bucket (str): S3 bucket key (str): S3 object path Returns: int: File size in bytes. Defaults to 0 if any error. """ aws_profile = current_app.config.get('AWS_PROFILE_NAME') s3_client = boto3.session.Session(profile_name=aws_profile).client('s3') file_size = 0 try: response = s3_client.head_object(Bucket=bucket, Key=key) if response: file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length')) except ClientError: logger.exception(f'Client error reading S3 file {bucket} : {key}') return file_size
2. Создать целлюлозное задание для обработки куска
Здесь мы определим задачу сельдерей для обработки куска файлов (который будет выполнен параллельно позже). Общая обработка здесь будет выглядеть так:
- Получить
Начать
иКонец байты
этого куска как аргумент - Получите эту часть файла S3 через S3-Select и храните его локально во временном файле (в качестве CSV в этом примере)
- Прочитайте этот временный файл и выполните любую обработку
- Удалить этот временный файл
📝 Я срок этой задачи в качестве процессора куска файлов. Это обрабатывает кусок из файла. Запуск нескольких этих задач завершает обработку всего файла.
# core/tasks.py @celery.task(name='core.tasks.chunk_file_processor', bind=True) def chunk_file_processor(self, **kwargs): """ Creates and process a single file chunk based on S3 Select ScanRange start and end bytes """ bucket = kwargs.get('bucket') key = kwargs.get('key') filename = kwargs.get('filename') start_byte_range = kwargs.get('start_byte_range') end_byte_range = kwargs.get('end_byte_range') header_row_str = kwargs.get('header_row_str') local_file = filename.replace('.csv', f'.{start_byte_range}.csv') file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file) logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}') try: # 1. fetch data from S3 and store it in a file store_scrm_file_s3_content_in_local_file( bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range, end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str) # 2. Process the chunk file in temp folder id_set = set() with open(file_path) as csv_file: csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER) for row in csv_reader: # perform any other processing here id_set.add(int(row.get('id'))) logger.info(f'{min(id_set)} --> {max(id_set)}') # 3. delete local file if path.exists(file_path): unlink(file_path) except Exception: logger.exception(f'Error in file processor: {filename}')
3. Выполнить несколько целлюлозных задач параллельно
Это самый интересный шаг в этом потоке. Мы создадим несколько целлюлозных задач, чтобы работать параллельно через Группа сельдерей Отказ Как только мы знаем общие байты файла в S3 (с шага 1), мы рассчитываем Начните
и Конец байты
Для куска и вызовите задачу, которую мы создали на шаге 2 через группу сельдерея. Начать
и Конец байты
Диапазон – это непрерывный диапазон размера файла. Необязательно, мы также можем вызвать задачу обратного вызова (результат) после завершения всех наших задач обработки.
# core/tasks.py @celery.task(name='core.tasks.s3_parallel_file_processing', bind=True) def s3_parallel_file_processing_task(self, **kwargs): """ Creates celery tasks to process chunks of file in parallel """ bucket = kwargs.get('bucket') key = kwargs.get('key') try: filename = key # 1. Check file headers for validity -> if failed, stop processing desired_row_headers = ( 'id', 'name', 'age', 'latitude', 'longitude', 'monthly_income', 'experienced' ) is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select( bucket=bucket, key=key, delimiter=S3_FILE_DELIMITER, desired_headers=desired_row_headers) if not is_headers_valid: logger.error(f'{filename} file headers validation failed') return False logger.info(f'{filename} file headers validation successful') # 2. fetch file size via S3 HEAD file_size = get_s3_file_size(bucket=bucket, key=key) if not file_size: logger.error(f'{filename} file size invalid {file_size}') return False logger.info(f'We are processing {filename} file about {file_size} bytes :-o') # 2. Create celery group tasks for chunk of this file size for parallel processing start_range = 0 end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size) tasks = [] while start_range < file_size: tasks.append( chunk_file_processor.signature( kwargs={ 'bucket': bucket, 'key': key, 'filename': filename, 'start_byte_range': start_range, 'end_byte_range': end_range, 'header_row_str': header_row_str } ) ) start_range = end_range end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range) job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename})) _ = job.apply_async() except Exception: logger.exception(f'Error processing file: {filename}') @celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False) def chunk_file_processor_callback(self, *args, **kwargs): """ Callback task called post chunk_file_processor() """ logger.info('Callback called')
# core/utils.py def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int, delimiter: str, header_row: str): """Retrieves S3 file content via S3 Select ScanRange and store it in a local file. Make sure the header validation is done before calling this. Args: bucket (str): S3 bucket key (str): S3 key file_path (str): Local file path to store the contents start_range (int): Start range of ScanRange parameter of S3 Select end_range (int): End range of ScanRange parameter of S3 Select delimiter (str): S3 file delimiter header_row (str): Header row of the local file. This will be inserted as first line in local file. """ aws_profile = current_app.config.get('AWS_PROFILE_NAME') s3_client = boto3.session.Session(profile_name=aws_profile).client('s3') expression = 'SELECT * FROM S3Object' try: response = s3_client.select_object_content( Bucket=bucket, Key=key, ExpressionType='SQL', Expression=expression, InputSerialization={ 'CSV': { 'FileHeaderInfo': 'USE', 'FieldDelimiter': delimiter, 'RecordDelimiter': '\n' } }, OutputSerialization={ 'CSV': { 'FieldDelimiter': delimiter, 'RecordDelimiter': '\n', }, }, ScanRange={ 'Start': start_range, 'End': end_range }, ) """ select_object_content() response is an event stream that can be looped to concatenate the overall result set """ f = open(file_path, 'wb') # we receive data in bytes and hence opening file in bytes f.write(header_row.encode()) f.write('\n'.encode()) for event in response['Payload']: if records := event.get('Records'): f.write(records['Payload']) f.close() except ClientError: logger.exception(f'Client error reading S3 file {bucket} : {key}') except Exception: logger.exception(f'Error reading S3 file {bucket} : {key}')
Вот и все! 😎 Теперь, вместо того, чтобы потокотать файл S3 bytes bytes, мы распарализовываем обработку одновременно обработкой кусков. Это было не так жестко, не так ли? 😅
🔍 Сравнение времени обработки
Если мы сравним время обработки одного файла, который мы обработали в нашем последнем посте с таким подходом, обработка работает примерно 68% быстрее (с тем же оборудованием и конфигурацией). 😆
Размер файла | 4.8mb. | 4.8mb. |
Время обработки | ~ 37 секунд | ~ 12 секунд |
✔️ Преимущества этого подхода
- Очень большой файл, содержащий миллионы записей, могут быть обработаны в течение нескольких минут. Я использовал этот подход в производственной среде некоторое время, и это очень блаженно
- Вычисление и обработка распределены среди распределенных работников
- Скорость обработки может быть настроен на наличие рабочих бассейнов
- Нет больше проблем с памятью
📌 Вы можете проверить мою репозиторий GitHub для полного рабочего примера этого подхода 👇
Idris-Rampurawala/S3-Select-Demo
Этот проект демонстрирует функцию Beich AWS S3 Select, чтобы поток большого файла данных в странном стиле.
Этот проект демонстрирует богатые AWS S3 Выберите
Функция, чтобы поток большого файла данных в Пэгированный стиль
Отказ
В настоящее время S3 Выберите
не поддерживает Смещение
И, следовательно, мы не можем запугать результаты запроса. Следовательно, мы используем ScanRange
функция, чтобы поток содержимого файла S3.
Импорт (чтение) Большой файл ведет Из памяти
ошибка. Это также может привести к мероприятию системы сбоя системы. Есть библиотеки, а именно. Панды, дач и др. которые очень хороши при обработке больших файлов, но опять же файл должен присутствовать локально то есть. Нам придется импортировать его с S3 на нашу локальную машину. Но что, если мы не хотим получать и хранить весь файл S3 локально сразу?
Ну, мы можем использовать AWS S3 Выберите
Чтобы поток большого файла через это ScanRange
параметр. Этот подход…
📑 ресурсы
- Мой репозиторий Github, демонстрируя вышеуказанный подход
- AWS S3 Выберите ссылку Boto3
- AWS S3 Выберите UserGuide
Оригинал: “https://dev.to/idrisrampurawala/parallelize-processing-a-large-aws-s3-file-8eh”