Resolvendo OOM (Out of Memory) em Pipelines de Dataflow
- Michel Souza Santana
- 21 de out.
- 6 min de leitura
Pare de usar o Dataflow para transportar terabytes de dados. Use-o para orquestrar milhares de tarefas e evite o temido Out of Memory.
Olá, pessoal. Sou Michel Santana. Como Engenheiro de Dados, um dos problemas mais frustrantes (e caros) que vejo em projetos é o famoso Out of Memory (OOM) em pipelines de Dataflow.
Você já passou por isso? Você configura um pipeline para processar um grande volume de dados talvez alguns terabytes de logs, ele começa a rodar, o gráfico de execução parece saudável e, de repente, tudo para. Workers falham, o pipeline entra em stuck, e a fatura do Google Cloud começa a dar sinais de vida indesejados.
Na maioria das vezes, o culpado não é o Dataflow. É a forma como estamos pedindo a ele para trabalhar. O culpado quase sempre tem nome e sobrenome: operações de shuffle, como GroupByKey, em dados com alta cardinalidade ou "hot keys".
Hoje, vamos desmistificar por que isso acontece e apresentar uma mudança de mentalidade fundamental: o Padrão Orquestrador. Vamos transformar o Dataflow de um "caminhão de dados" sobrecarregado em um "general" que comanda um exército de workers eficientes.
O Problema: O Pipeline "Ingênuo" e a "Hot Key"
O problema de memória (OOM) em pipelines de Dataflow quase sempre acontece por causa de uma operação de shuffle (embaralhamento de dados), como o GroupByKey, Combine, ou CoGroupByKey.
Imagine que você precisa processar 1TB de dados de logs. Uma abordagem ingênua, que muitos de nós tentamos no início, seria:
ReadFromText("gs://bucket/logs/*"): Ler todos os arquivos de log.
Map(lambda log: (log.user_id, log.data)): Mapear por user_id.
GroupByKey(): Agrupar todos os logs por user_id.
ParDo(ProcessUserData): Processar os dados agrupados.
Onde isso falha: Se um único user_id (uma "hot key") tiver 300GB de logs, o GroupByKey tentará enviar todos esses 300GB para um único worker processar. Esse worker, inevitavelmente, ficará sem memória e o pipeline falhará. Mesmo que não falhe, ele ficará extremamente lento e caro, pois um único worker se torna o gargalo de todo o processamento.

A Solução: Dataflow como Orquestrador de Tarefas
Nesse padrão, mudamos nossa mentalidade. O pipeline principal do Dataflow não transporta os dados brutos, mas sim as instruções de trabalho (metadados).
O Dataflow atua como um "exército" de workers. O pipeline principal é o "general" que distribui tarefas pequenas e independentes para cada "soldado" (worker).
O fluxo se torna:
Entrada (Fila de Trabalho): A PCollection inicial contém metadados sobre o trabalho a ser feito.
Exemplos: Uma lista de nomes de arquivos em GCS, uma lista de partições de tabela, mensagens do Pub/Sub com IDs de cliente.
Processamento (ParDo): Um ParDo (executando um DoFn) recebe uma única instrução de trabalho (ex: um nome de arquivo).
Execução Pesada (Dentro do Worker): O worker, dentro do DoFn, executa a carga pesada de forma independente. Ele pode:
Usar a biblioteca de cliente (ex: google.cloud.storage, google.cloud.bigquery) para se conectar ao serviço.
Ler o arquivo (ex: gs://bucket/arquivo_A.csv).
Processar o arquivo linha por linha (em streaming, sem carregar tudo na memória).
Gravar o resultado (ex: em outro arquivo ou no BigQuery).
Saída (Opcional): O DoFn pode emitir um resultado (ex: "Sucesso" ou "Falha").
A PCollection que passa pelo pipeline não contém terabytes de dados, mas sim alguns kilobytes ou megabytes de strings (as instruções). O shuffle desaparece.
Exemplo Prático: Processando Milhares de Arquivos do GCS
Vamos supor que temos 10.000 arquivos CSV no GCS para processar e carregar no BigQuery.
Abordagem 1: Padrão "Orquestrador" (Recomendado)
Nesta abordagem, o Dataflow distribui a tarefa de processar cada arquivo.
O pipeline não vai ler os dados dos 10.000 arquivos de uma vez. Em vez disso, vamos criar uma PCollection que contém apenas a lista dos 10.000 caminhos de arquivo (ex: gs://my-bucket/file1.csv, gs://my-bucket/file2.csv, ...).
Usaremos um ParDo para que cada worker receba um caminho de arquivo. Dentro desse DoFn, o worker usará a biblioteca cliente do GCS para ler aquele arquivo específico, processá-lo linha a linha e emitir os resultados para a próxima etapa, que pode ser um WriteToBigQuery nativo.
Exemplo (Python)
Python
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage, bigquery
import csv
import io
# 1. O DoFn que faz o trabalho pesado
# Cada worker que executar isso o fará de forma independente.
class ProcessFileDoFn(beam.DoFn):
def setup(self):
# Instancia os clientes UMA VEZ por worker, não por elemento
# Isso é crucial para a performance.
self.storage_client = storage.Client()
# self.bigquery_client = bigquery.Client() # Desnecessário se usarmos WriteToBigQuery
def process(self, file_path):
"""
Recebe UM file_path (ex: "gs://my-bucket/data/file1.csv")
"""
print(f"WORKER: Processando arquivo: {file_path}")
try:
# Extrai bucket e nome do arquivo
bucket_name, blob_name = file_path.replace("gs://", "").split("/", 1)
bucket = self.storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# Lê o arquivo do GCS (em modo streaming, sem baixar tudo)
content = blob.download_as_text()
# Processa o CSV em memória (para arquivos pequenos/médios)
# Para arquivos GIGANTES, preferiríamos blob.open() e ler linha a linha
csv_reader = csv.reader(io.StringIO(content))
# Pula o cabeçalho
next(csv_reader, None)
processed_rows = 0
for row in csv_reader:
# Lógica de transformação pesada aqui
# Ex: transformar colunas, enriquecer dados, etc.
# Emitimos a linha processada para o próximo passo do pipeline
yield {
'col1': row[0],
'col2': int(row[1])
}
processed_rows += 1
# Usamos "Tagged Outputs" para logar sucesso/falha
yield beam.pvalue.TaggedOutput('status', f"Sucesso: {file_path}, Linhas: {processed_rows}")
except Exception as e:
yield beam.pvalue.TaggedOutput('status', f"Erro: {file_path}, Erro: {str(e)}")
def run():
options = PipelineOptions(streaming=False) # Exemplo Batch
# 2. Lista de "Instruções de Trabalho"
# Em um caso real, você poderia ler isso de um arquivo de manifesto
# ou usar o MatchFiles do Beam (beam.io.fileio.MatchFiles).
file_paths = [
"gs://my-bucket/data/file1.csv",
"gs://my-bucket/data/file2.csv",
"gs://my-bucket/data/file3.csv",
# ... 10.000 outros arquivos
]
with beam.Pipeline(options=options) as p:
# 3. O pipeline "Orquestrador"
trabalho = p | "Criar Fila de Trabalho" >> beam.Create(file_paths)
# 4. Distribuição do Trabalho
# O Dataflow distribuirá os 10.000 caminhos de arquivo
# para N workers em paralelo.
resultados = trabalho | "Processar Arquivos" >> beam.ParDo(
ProcessFileDoFn()
).with_outputs('status', main='dados')
dados_processados = resultados.dados
status_logs = resultados.status
# 5. Carregamento (usando o conector otimizado)
# O WriteToBigQuery é altamente otimizado para escritas em paralelo.
dados_processados | "Carregar no BigQuery" >> beam.io.WriteToBigQuery(
table='meu-projeto:meu_dataset.minha_tabela',
schema='col1:STRING, col2:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
status_logs | "Logar Status" >> beam.Map(print)
if __name__ == '__main__':
run()
Análise do Resultado
Por que isso não sobrecarrega a memória?
Sem Shuffle Massivo: Não há GroupByKey nos dados brutos. O trabalho já está "agrupado" por natureza (um arquivo = uma tarefa).
Paralelismo Horizontal: O Dataflow pode escalar para 1.000 workers, e cada um processa 10 arquivos simultaneamente. O pipeline termina muito mais rápido.
Processamento em Streaming (no Worker): Dentro do DoFn, usamos blob.download_as_text() e csv.reader (ou blob.open() para arquivos maiores) que processam o arquivo de forma eficiente, sem tentar carregar 500MB de um CSV na memória do worker de uma só vez.
O Próximo Nível: Splittable DoFn
Você pode estar pensando: "Certo, Michel, mas e se um único arquivo tiver 1TB? O padrão acima falharia, pois um worker tentaria processar 1TB."
Correto. É aqui que entra o Splittable DoFn (SDF).
O Splittable DoFn é um padrão do Apache Beam onde você ensina ao seu DoFn como "dividir" seu próprio trabalho.
O que é: Um DoFn que pode ser "fatiado" dinamicamente pelo Dataflow.
Como funciona (Conceitualmente):
Um worker pega o DoFn para processar arquivo_1TB.csv.
Ele começa a processar e lê os primeiros 100GB.
Ele faz um "checkpoint" e diz ao Dataflow: "Eu processei os bytes 0-100GB. Ainda faltam os bytes 100GB-1TB."
O Dataflow vê isso e pode criar novas tarefas (novos splits) para o trabalho restante (ex: 100GB-200GB, 200GB-300GB, etc.) e distribuí-las para outros workers.
É exatamente assim que os conectores nativos do Beam (ReadFromText, ReadFromBigQuery) funcionam! Eles são todos implementados como Splittable DoFns. Quando você faz ReadFromText("gs://bucket/*"), o Beam não lê tudo. Ele usa o SDF para dividir os arquivos em offsets de bytes e distribuir esses offsets entre os workers.
Quando usar um Splittable DoFn customizado? Raramente. Você só precisa disso se estiver criando um conector para uma fonte de dados que o Beam não suporta nativamente (ex: um banco de dados obscuro ou um formato de arquivo proprietário) e você precisa que o Dataflow paralelize a leitura de uma única fonte/arquivo grande.
Resumo: Qual Padrão Usar?
Para 99% dos casos de "grande volumetria" que causam OOM, o Padrão Orquestrador é o caminho.
O Problema: A carga de trabalho é muito grande e indivisível (ex: 10.000 arquivos processados após um GroupByKey problemático).
A Solução: Divida o trabalho na origem (ex: gere os 10.000 arquivos).
O Pipeline: Use o Dataflow para ler a lista de tarefas (os 10.000 caminhos de arquivo) e use um ParDo simples para executar cada tarefa de forma independente.
Pense no seu pipeline Dataflow como um orquestrador massivamente paralelo, não como um caminhão de dados centralizado. Essa mudança de mentalidade é o que separa pipelines frágeis, que quebram com OOM, de soluções de dados robustas, escaláveis e eficientes.
Espero que isso ajude a tornar seus pipelines mais resilientes. Até a próxima!




Comentários