Несколько дней назад я провел большую часть своего дня, работая над реализацией потоковой передачи данных в Google Cloud Storage (GCS) со среды выполнения Python.
По пути было несколько препятствий, и я хотел бы создать документацию, которую я хотел бы найти, работая над этим вопросом.
В этой статье используется Python 3.6.4, но может быть адаптирована для других версий Python.
Поддержка GCS в модуле Google-Cloud
Google-Cloud
Пакет представляет собой гигантскую коллекцию модулей, которые можно использовать для взаимодействия со всеми сервисами Google Cloud Platform Platform, так что это отличное место для начала.
Python -M PIP установка -u Google -Cloud
В пределах Google-Cloud
Пакет – это модуль под названием Google.Cloud.Storage
который имеет дело со всеми вещами GCS.
Я скачал и настроил свой Google_application_credentials
локально и открыл консоль Python, чтобы проверить часть функциональности. Я смог быстро подключиться к GCS, создать ведро, создать каплей и Загрузите двоичные данные на Blob Анкет
from google.cloud import storage client = storage.Client() bucket = client.create_bucket('test-bucket') blob = client.blob('test-blob') blob.upload_from_string( data=b'x' * 1024, content_type='application/octet-stream', client=client )
Одна вещь, которую я сразу заметил, заключалась в том, что для строительства Армонавт Мой вариант использования будет постепенно транслировать вывод в GCS без сохранения вывода в файловую систему вычислительного экземпляра. Должен был быть способ потоковой передачи данных, а не загружать все за один раз.
Восстанавливаемая загрузка на помощь!
Первоначальное исследование, которое я сделал, обнаружил Восстанавливаемые загрузки В качестве опции для Google Cloud Storage. Из их описания говорится, что у них есть следующие варианты использования:
- Вы загружаете большой файл.
- Шансы на сбой сети высоки.
- Вы не знаете размер файла, когда загрузка начинается.
Причины № 1 и № 3 применялись к моему варианту использования, поэтому я начал исследовать дальше.
Я искал Google-Cloud
Документация для упоминания о возобновляемых загрузках, которые дали Blob.create_resumable_upload_session ()
метод Этот метод начинает возобновляемую загрузку и возвращает URL.
Восстановление медиа -пакет
Набор взаимодействий, которые должны происходить для успешной загрузки, была довольно сложной, и я подозревал, что уже есть пакет, который обрабатывает этот обмен. Я нашел Google-Resumble-Media
Пакет с небольшим количеством гуглей. 😉
Python -M PIP установка -u Google -reesumble -Media
Ключевой частью этого пакета меня интересовал, является Google.Resumable_Media.Requests. ResumbleUpload
Класс, который принимает авторизованный транспорт, а затем позволяет загружать данные в куски и восстанавливаться, когда обнаруживаются ошибки.
Пока что это был код, с которым я работал:
import io from google.auth.transport.requests import AuthorizedSession from google.cloud import storage from google.resumable_media.requests import ResumableUpload chunk_size = 256 * 1024 # Minimum chunk-size supported by GCS stream = io.BytesIO(b'x' * (1024 * 1024)) # Fake data stream client = storage.Client() bucket = client.bucket('test-bucket') blob = client.blob('test-blob') # Create a Resumable Upload url = blob.create_resumable_upload_session( content_type='application/octet-stream', client=client ) # Pass the URL off to the ResumableUpload object upload = ResumableUpload( upload_url=url, chunk_size=chunk_size ) transport = AuthorizedSession(credentials=client._credentials) # Start using the Resumable Upload upload.initiate( transport=transport, content_type='application/octet-stream', stream=stream, metadata={'name': blob.name} )
Проблема заключалась в том, что я получал ошибку на upload.initiate ()
Анкет Жаловался, что не было Место
Заголовок на ответе. Я исследовал эту проблему и обнаружил, что create_resumable_upload_session ()
делал работу upload.initiate ()
! Я удалил этот шаг и вместо этого использовал конечную точку API, представленную в документации «Восстановление загрузки».
# Create a Resumable Upload url = ( f'https://www.googleapis.com/upload/storage/v1/b/' f'{bucket.name}/o?uploadType=resumable' ) upload = ResumableUpload( upload_url=url, chunk_size=chunk_size ) transport = AuthorizedSession(credentials=client._credentials) # Start using the Resumable Upload upload.initiate( transport=transport, content_type='application/octet-stream', stream=stream, metadata={'name': blob.name} )
Этот фрагмент работал, чтобы начать возобновляемую загрузку! Теперь для трансляции данных.
Потоковые данные и
ResumbleUpload
У объекта есть метод под названием передача_next_chunk
Что говорит о загрузке, что следующий кусок может быть загружен. Читая документацию об этом методе, я нашел Stream_final
который был параметром Resumableupload.initaite
метод
Я обнаружил, что если Stream_final
установлен на ЛОЖЬ
тогда ResumbleUpload
обнаружит «конец» потока, когда передается кусок, что меньше, чем chunk_size
параметр установлен в его конструкторе. Это означало, что для потоковой передачи неизвестного объема данных, которые каждый кусок должен быть> 256kib, и должен был бы буферизировать выход, пока этот размер не будет достигнут, чтобы не быть передано.
Наслаждаясь этим постом? Проверьте мой блог Dev для получения дополнительной информации.
Сделать все это вместе
Получив простой пример работы, я создал класс, который обрабатывает один поток данных неизвестной длины, загружаемых на каплю постепенно, и восстанавливается из сетевых ошибок, если обнаружено.
Для этого я реализовал объект, который оба буферизовали данные и имели файловый интерфейс, чтобы он использовался с помощью ResumbleUpload
как поток
и передаваться в другие функции, которые требуют файловых объектов для написания данных.
Вот моя последняя реализация:
from google.auth.transport.requests import AuthorizedSession from google.resumable_media import requests, common from google.cloud import storage class GCSObjectStreamUpload(object): def __init__( self, client: storage.Client, bucket_name: str, blob_name: str, chunk_size: int=256 * 1024 ): self._client = client self._bucket = self._client.bucket(bucket_name) self._blob = self._bucket.blob(blob_name) self._buffer = b'' self._buffer_size = 0 self._chunk_size = chunk_size self._read = 0 self._transport = AuthorizedSession( credentials=self._client._credentials ) self._request = None # type: requests.ResumableUpload def __enter__(self): self.start() return self def __exit__(self, exc_type, *_): if exc_type is None: self.stop() def start(self): url = ( f'https://www.googleapis.com/upload/storage/v1/b/' f'{self._bucket.name}/o?uploadType=resumable' ) self._request = requests.ResumableUpload( upload_url=url, chunk_size=self._chunk_size ) self._request.initiate( transport=self._transport, content_type='application/octet-stream', stream=self, stream_final=False, metadata={'name': self._blob.name}, ) def stop(self): self._request.transmit_next_chunk(self._transport) def write(self, data: bytes) -> int: data_len = len(data) self._buffer_size += data_len self._buffer += data del data while self._buffer_size >= self._chunk_size: try: self._request.transmit_next_chunk(self._transport) except common.InvalidResponse: self._request.recover(self._transport) return data_len def read(self, chunk_size: int) -> bytes: # I'm not good with efficient no-copy buffering so if this is # wrong or there's a better way to do this let me know! :-) to_read = min(chunk_size, self._buffer_size) memview = memoryview(self._buffer) self._buffer = memview[to_read:].tobytes() self._read += to_read self._buffer_size -= to_read return memview[:to_read].tobytes() def tell(self) -> int: return self._read
Класс можно использовать так:
client = storage.Client() with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s: for _ in range(1024): s.write(b'x' * 1024)
Спасибо за чтение!
Оригинал: “https://dev.to/sethmlarson/python-data-streaming-to-google-cloud-storage-with-resumable-uploads-458h”