Рубрики
Без рубрики

Поток данных Python в Google Cloud Storage с возобновляемыми загрузками

Потоковая произвольная длина двоичных данных в Google Cloud Storage. Tagged с GCP, Cloud, Storage, Python.

Несколько дней назад я провел большую часть своего дня, работая над реализацией потоковой передачи данных в Google Cloud Storage (GCS) со среды выполнения Python.

По пути было несколько препятствий, и я хотел бы создать документацию, которую я хотел бы найти, работая над этим вопросом.

В этой статье используется Python 3.6.4, но может быть адаптирована для других версий Python.

Поддержка GCS в модуле Google-Cloud

Google-Cloud Пакет представляет собой гигантскую коллекцию модулей, которые можно использовать для взаимодействия со всеми сервисами Google Cloud Platform Platform, так что это отличное место для начала.

Python -M PIP установка -u Google -Cloud

В пределах Google-Cloud Пакет – это модуль под названием Google.Cloud.Storage который имеет дело со всеми вещами GCS.

Я скачал и настроил свой Google_application_credentials локально и открыл консоль Python, чтобы проверить часть функциональности. Я смог быстро подключиться к GCS, создать ведро, создать каплей и Загрузите двоичные данные на Blob Анкет

from google.cloud import storage

client = storage.Client()
bucket = client.create_bucket('test-bucket')
blob = client.blob('test-blob')

blob.upload_from_string(
    data=b'x' * 1024,
    content_type='application/octet-stream',
    client=client
)

Одна вещь, которую я сразу заметил, заключалась в том, что для строительства Армонавт Мой вариант использования будет постепенно транслировать вывод в GCS без сохранения вывода в файловую систему вычислительного экземпляра. Должен был быть способ потоковой передачи данных, а не загружать все за один раз.

Восстанавливаемая загрузка на помощь!

Первоначальное исследование, которое я сделал, обнаружил Восстанавливаемые загрузки В качестве опции для Google Cloud Storage. Из их описания говорится, что у них есть следующие варианты использования:

  • Вы загружаете большой файл.
  • Шансы на сбой сети высоки.
  • Вы не знаете размер файла, когда загрузка начинается.

Причины № 1 и № 3 применялись к моему варианту использования, поэтому я начал исследовать дальше.

Я искал Google-Cloud Документация для упоминания о возобновляемых загрузках, которые дали Blob.create_resumable_upload_session () метод Этот метод начинает возобновляемую загрузку и возвращает URL.

Восстановление медиа -пакет

Набор взаимодействий, которые должны происходить для успешной загрузки, была довольно сложной, и я подозревал, что уже есть пакет, который обрабатывает этот обмен. Я нашел Google-Resumble-Media Пакет с небольшим количеством гуглей. 😉

Python -M PIP установка -u Google -reesumble -Media

Ключевой частью этого пакета меня интересовал, является Google.Resumable_Media.Requests. ResumbleUpload Класс, который принимает авторизованный транспорт, а затем позволяет загружать данные в куски и восстанавливаться, когда обнаруживаются ошибки.

Пока что это был код, с которым я работал:

import io
from google.auth.transport.requests import AuthorizedSession
from google.cloud import storage
from google.resumable_media.requests import ResumableUpload

chunk_size = 256 * 1024  # Minimum chunk-size supported by GCS
stream = io.BytesIO(b'x' * (1024 * 1024))  # Fake data stream

client = storage.Client()
bucket = client.bucket('test-bucket')
blob = client.blob('test-blob')

# Create a Resumable Upload
url = blob.create_resumable_upload_session(
    content_type='application/octet-stream',
    client=client
)

# Pass the URL off to the ResumableUpload object
upload = ResumableUpload(
    upload_url=url,
    chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)

# Start using the Resumable Upload
upload.initiate(
    transport=transport,
    content_type='application/octet-stream',
    stream=stream,
    metadata={'name': blob.name}
)

Проблема заключалась в том, что я получал ошибку на upload.initiate () Анкет Жаловался, что не было Место Заголовок на ответе. Я исследовал эту проблему и обнаружил, что create_resumable_upload_session () делал работу upload.initiate () ! Я удалил этот шаг и вместо этого использовал конечную точку API, представленную в документации «Восстановление загрузки».

# Create a Resumable Upload
url = (
    f'https://www.googleapis.com/upload/storage/v1/b/'
    f'{bucket.name}/o?uploadType=resumable'
)
upload = ResumableUpload(
    upload_url=url,
    chunk_size=chunk_size
)
transport = AuthorizedSession(credentials=client._credentials)

# Start using the Resumable Upload
upload.initiate(
    transport=transport,
    content_type='application/octet-stream',
    stream=stream,
    metadata={'name': blob.name}
)

Этот фрагмент работал, чтобы начать возобновляемую загрузку! Теперь для трансляции данных.

Потоковые данные и

ResumbleUpload У объекта есть метод под названием передача_next_chunk Что говорит о загрузке, что следующий кусок может быть загружен. Читая документацию об этом методе, я нашел Stream_final который был параметром Resumableupload.initaite метод

Я обнаружил, что если Stream_final установлен на ЛОЖЬ тогда ResumbleUpload обнаружит «конец» потока, когда передается кусок, что меньше, чем chunk_size параметр установлен в его конструкторе. Это означало, что для потоковой передачи неизвестного объема данных, которые каждый кусок должен быть> 256kib, и должен был бы буферизировать выход, пока этот размер не будет достигнут, чтобы не быть передано.

Наслаждаясь этим постом? Проверьте мой блог Dev для получения дополнительной информации.

Сделать все это вместе

Получив простой пример работы, я создал класс, который обрабатывает один поток данных неизвестной длины, загружаемых на каплю постепенно, и восстанавливается из сетевых ошибок, если обнаружено.

Для этого я реализовал объект, который оба буферизовали данные и имели файловый интерфейс, чтобы он использовался с помощью ResumbleUpload как поток и передаваться в другие функции, которые требуют файловых объектов для написания данных.

Вот моя последняя реализация:

from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from google.cloud import storage

class GCSObjectStreamUpload(object):
    def __init__(
            self, 
            client: storage.Client,
            bucket_name: str,
            blob_name: str,
            chunk_size: int=256 * 1024
        ):
        self._client = client
        self._bucket = self._client.bucket(bucket_name)
        self._blob = self._bucket.blob(blob_name)

        self._buffer = b''
        self._buffer_size = 0
        self._chunk_size = chunk_size
        self._read = 0

        self._transport = AuthorizedSession(
            credentials=self._client._credentials
        )
        self._request = None  # type: requests.ResumableUpload

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, *_):
        if exc_type is None:
            self.stop()

    def start(self):
        url = (
            f'https://www.googleapis.com/upload/storage/v1/b/'
            f'{self._bucket.name}/o?uploadType=resumable'
        )
        self._request = requests.ResumableUpload(
            upload_url=url, chunk_size=self._chunk_size
        )
        self._request.initiate(
            transport=self._transport,
            content_type='application/octet-stream',
            stream=self,
            stream_final=False,
            metadata={'name': self._blob.name},
        )

    def stop(self):
        self._request.transmit_next_chunk(self._transport)

    def write(self, data: bytes) -> int:
        data_len = len(data)
        self._buffer_size += data_len
        self._buffer += data
        del data
        while self._buffer_size >= self._chunk_size:
            try:
                self._request.transmit_next_chunk(self._transport)
            except common.InvalidResponse:
                self._request.recover(self._transport)
        return data_len

    def read(self, chunk_size: int) -> bytes:
        # I'm not good with efficient no-copy buffering so if this is
        # wrong or there's a better way to do this let me know! :-)
        to_read = min(chunk_size, self._buffer_size)
        memview = memoryview(self._buffer)
        self._buffer = memview[to_read:].tobytes()
        self._read += to_read
        self._buffer_size -= to_read
        return memview[:to_read].tobytes()

    def tell(self) -> int:
        return self._read

Класс можно использовать так:

client = storage.Client()

with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s:
    for _ in range(1024):
        s.write(b'x' * 1024)

Спасибо за чтение!

Оригинал: “https://dev.to/sethmlarson/python-data-streaming-to-google-cloud-storage-with-resumable-uploads-458h”