Esses dias estava testando a nova linguagem suportada pela google cloud function (python) e resolvi fazer um streaming de dados.
Para realizar o streaming vamos utilizar as bibliotecas google.resumable, StringIO, io.
Primeiro vamos fazer os imports necessários:
from google.resumable_media.requests import ChunkedDownload from google.resumable_media.requests import ResumableUpload import google.auth import google.auth.transport.requests as tr_requests import io from io import StringIO
- ChunkedDownload => responsável por realizar o download em partes.
- ResumableUpload => responsável por realizar o upload em partes.
- google.auth => responsável pela autenticação no google cloud platform
- io, StringIO => responsável pelo streaming
Após realizar os imports vamos começar a configurar as variáveis necessárias para o funcionamento do código:
ro_scope = u'https://www.googleapis.com/auth/devstorage.read_only'
credentials, _ = google.auth.default(scopes=(ro_scope,))
transport = tr_requests.AuthorizedSession(credentials)
bucket = 'source_bucket_name'
bucket_upload = 'destiny_bucket_name'
blob_name = 'source_filename'
blob_name_upload = 'destiny_filename'
content_type = u'text/plain'
- ro_scope => autenticação para api do google
- credentials => credencial para a autenticação
- bucket => nome do bucket de origem, onde vai ser realizado o download do arquivo
- bucket_upload => nome do bucket de destino, onde vai ser realizado o upload do arquivo
- blob_name => nome do arquivo de origem
- blob_name_upload => nome do arquivo de destino
- content_type => tipo do arquivo
Agora vamos configurar as url de download e upload
url_template = ( u'https://www.googleapis.com/download/storage/v1/b/' u'{bucket}/o/{blob_name}?alt=media') url_template_upload = ( u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?' u'uploadType=resumable') upload_url = url_template_upload.format(bucket=bucket_upload) media_url = url_template.format( bucket=bucket, blob_name=blob_name)
- url_template =>url onde será realizado o download do arquivo, como as variáveis bucket e blob_name foram configuradas no step acima, não precisa se preocupar com essa variável.
- url_template_upload =>url onde será realizado o upload do arquivo.
- upload_url => estamos formatando a variável url_template_upload, substituindo o bucket pelo bucket_upload
- media_url =>formatando a variável url_template
Agora vamos especificar o tamanho de cada parte que vai ser realizado o download e configurar as variáveis para realizar o streaming
chunk_size = 1 * 1024 * 1024 stream = io.BytesIO() download = ChunkedDownload(media_url, chunk_size, stream) upload = ResumableUpload(upload_url, chunk_size)
- chunk_size => tamanho de cada parte, neste exemplo estamos pegando a cada 1MB
- stream => chamando a biblioteca para começar a realizar o streaming
- download => chamando a biblioteca para realizar o download do arquivo
- upload => chamando a biblioteca para realizar o upload do arquivo
Agora vamos começar o streaming, primeiro começamos com um loop ate o download terminar, após realizar o looping de download vamos armazenar o download em uma lista e começar a fazer o upload
data = [] while download.finished != True: response = download.consume_next_chunk(transport) data.append(response.content.decode("utf-8").replace(',','|')) new_data = ''.join(data) stream_upload = io.BytesIO(bytes(new_data, 'UTF-8')) metadata = {u'name': blob_name_upload} response_upload = upload.initiate(transport, stream_upload, metadata, content_type) while upload.finished != True: upload.transmit_next_chunk(transport)
- response => realizado o download de uma parte do dados com o tamanho do chunk_size, no caso 1MB
- data => transformando os dados retornados no download em utf-8 e substituindo a ',' por '|'
- new_data => uma lista contendo todo o download realizado acima
- stream_upload => configurando o upload para UTF-8
- metadata => configurando os metadados para realizar o upload
- response_upload => realizando o upload dentro do bucket de destino com o nome do arquivo de destino
Codigo final
from google.resumable_media.requests import ChunkedDownload
from google.resumable_media.requests import ResumableUpload
import google.auth
import google.auth.transport.requests as tr_requests
import io
from io import StringIO
ro_scope = u'https://www.googleapis.com/auth/devstorage.read_only'
credentials, _ = google.auth.default(scopes=(ro_scope,))
transport = tr_requests.AuthorizedSession(credentials)
bucket = 'source_bucket_name'
bucket_upload = 'destiny_bucket_name'
blob_name = 'source_filename'
blob_name_upload = 'destiny_filename'
content_type = u'text/plain'
url_template = (
u'https://www.googleapis.com/download/storage/v1/b/'
u'{bucket}/o/{blob_name}?alt=media')
url_template_upload = (
u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
u'uploadType=resumable')
upload_url = url_template_upload.format(bucket=bucket_upload)
media_url = url_template.format(
bucket=bucket, blob_name=blob_name)
chunk_size = 1 * 1024 * 1024
stream = io.BytesIO()
download = ChunkedDownload(media_url, chunk_size, stream)
upload = ResumableUpload(upload_url, chunk_size)
data = []
while download.finished != True:
response = download.consume_next_chunk(transport)
data.append(response.content.decode("utf-8").replace(',','|'))
new_data = ''.join(data)
stream_upload = io.BytesIO(bytes(new_data, 'UTF-8'))
metadata = {u'name': blob_name_upload}
reponse_upload = upload.initiate(transport, stream_upload, metadata, content_type)
while upload.finished != True:
upload.transmit_next_chunk(transport)
0 comentários:
Postar um comentário