streaming de dados com python

Esses dias estava testando a nova linguagem suportada pela google cloud function (python) e resolvi fazer um streaming de dados.
Para realizar o streaming vamos utilizar as bibliotecas google.resumable, StringIO, io.
Primeiro vamos fazer os imports necessários:
from google.resumable_media.requests import ChunkedDownload
from google.resumable_media.requests import ResumableUpload
import google.auth
import google.auth.transport.requests as tr_requests
import io
from io import StringIO
  • ChunkedDownload => responsável por realizar o download em partes.
  • ResumableUpload => responsável por realizar o upload em partes.
  • google.auth => responsável pela autenticação no google cloud platform
  • io, StringIO => responsável pelo streaming
Após realizar os imports vamos começar a configurar as variáveis necessárias para o funcionamento do código:
ro_scope = u'https://www.googleapis.com/auth/devstorage.read_only'
credentials, _ = google.auth.default(scopes=(ro_scope,))
transport = tr_requests.AuthorizedSession(credentials)
bucket = 'source_bucket_name'
bucket_upload = 'destiny_bucket_name'
blob_name = 'source_filename'
blob_name_upload = 'destiny_filename'
content_type = u'text/plain'
  • ro_scope => autenticação para api do google
  • credentials => credencial para a autenticação
  • bucket => nome do bucket de origem, onde vai ser realizado o download do arquivo
  • bucket_upload => nome do bucket de destino, onde vai ser realizado o upload do arquivo
  • blob_name => nome do arquivo de origem
  • blob_name_upload => nome do arquivo de destino
  • content_type => tipo do arquivo
Agora vamos configurar as url de download e upload
url_template = (
    u'https://www.googleapis.com/download/storage/v1/b/'
    u'{bucket}/o/{blob_name}?alt=media')
url_template_upload = (
     u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
     u'uploadType=resumable')
upload_url = url_template_upload.format(bucket=bucket_upload)
media_url = url_template.format(
    bucket=bucket, blob_name=blob_name)
  • url_template =>url onde será realizado o download do arquivo, como as variáveis bucket e blob_name foram configuradas no step acima, não precisa se preocupar com essa variável.
  • url_template_upload =>url onde será realizado o upload do arquivo.
  • upload_url => estamos formatando a variável url_template_upload, substituindo o bucket pelo bucket_upload
  • media_url =>formatando a variável url_template
Agora vamos especificar o tamanho de cada parte que vai ser realizado o download e configurar as variáveis para realizar o streaming
chunk_size = 1 * 1024 * 1024
stream = io.BytesIO()
download = ChunkedDownload(media_url, chunk_size, stream)
upload = ResumableUpload(upload_url, chunk_size)
  • chunk_size => tamanho de cada parte, neste exemplo estamos pegando a cada 1MB
  • stream => chamando a biblioteca para começar a realizar o streaming
  • download => chamando a biblioteca para realizar o download do arquivo
  • upload => chamando a biblioteca para realizar o upload do arquivo
Agora vamos começar o streaming, primeiro começamos com um loop ate o download terminar, após realizar o looping de download vamos armazenar o download em uma lista e começar a fazer o upload

data = []
while download.finished != True:
 response = download.consume_next_chunk(transport)
 data.append(response.content.decode("utf-8").replace(',','|'))
new_data = ''.join(data) stream_upload = io.BytesIO(bytes(new_data, 'UTF-8')) metadata = {u'name': blob_name_upload} response_upload = upload.initiate(transport, stream_upload, metadata, content_type)
while upload.finished != True: upload.transmit_next_chunk(transport)
  • response => realizado o download de uma parte do dados com o tamanho do chunk_size, no caso 1MB
  • data => transformando os dados retornados no download em utf-8 e substituindo a ',' por '|'
  • new_data => uma lista contendo todo o download realizado acima
  • stream_upload => configurando o upload para UTF-8
  • metadata => configurando os metadados para realizar o upload
  • response_upload => realizando o upload dentro do bucket de destino com o nome do arquivo de destino
Codigo final
from google.resumable_media.requests import ChunkedDownload
from google.resumable_media.requests import ResumableUpload
import google.auth
import google.auth.transport.requests as tr_requests
import io
from io import StringIO

ro_scope = u'https://www.googleapis.com/auth/devstorage.read_only'
credentials, _ = google.auth.default(scopes=(ro_scope,))
transport = tr_requests.AuthorizedSession(credentials)
bucket = 'source_bucket_name'
bucket_upload = 'destiny_bucket_name'
blob_name = 'source_filename'
blob_name_upload = 'destiny_filename'
content_type = u'text/plain'

url_template = (
    u'https://www.googleapis.com/download/storage/v1/b/'
    u'{bucket}/o/{blob_name}?alt=media')

url_template_upload = (
     u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
     u'uploadType=resumable')

upload_url = url_template_upload.format(bucket=bucket_upload)
media_url = url_template.format(

    bucket=bucket, blob_name=blob_name)

chunk_size = 1 * 1024 * 1024
stream = io.BytesIO()

download = ChunkedDownload(media_url, chunk_size, stream)
upload = ResumableUpload(upload_url, chunk_size)

data = []
while download.finished != True:
 response = download.consume_next_chunk(transport)
 data.append(response.content.decode("utf-8").replace(',','|'))

new_data = ''.join(data)
stream_upload = io.BytesIO(bytes(new_data, 'UTF-8'))
metadata = {u'name': blob_name_upload}
reponse_upload = upload.initiate(transport, stream_upload, metadata, content_type)

while upload.finished != True:
 upload.transmit_next_chunk(transport)
Share: