top of page

JSONs Aninhados com PySpark no Databricks: Da API ao DataFrame

ree

Olá, comunidade de dados!


Quem trabalha com engenharia e análise de dados sabe que uma das tarefas mais comuns é a integração com fontes externas. Muitas vezes, precisamos enriquecer nossos datasets com informações de APIs públicas ou privadas, como as fornecidas pelo IBGE, dados de mercado ou sistemas internos. O desafio? Esses dados frequentemente chegam em formato JSON, e quase sempre, com estruturas aninhadas (ou nested, para os mais íntimos do termo).


Navegar por múltiplos níveis de um JSON pode parecer complexo à primeira vista, mas o Databricks, com o poder do Apache Spark, nos oferece ferramentas elegantes e eficientes para resolver essa questão. Neste artigo, vamos explorar de forma prática como consumir e "achatar" (flatten) esses dados para torná-los úteis em nossas análises.


O Cenário: Consumindo uma API e o Desafio do JSON Aninhado


Imagine que fizemos uma requisição a uma API de localidades do IBGE e recebemos o seguinte JSON como resposta, já carregado em um DataFrame inicial chamado df_raw:

{
  "id": 1,
  "municipio": "São Paulo",
  "microrregiao": {
    "id": 3515,
    "nome": "São Paulo",
    "mesorregiao": {
      "id": 3503,
      "nome": "Metropolitana de São Paulo",
      "UF": {
        "id": 35,
        "sigla": "SP",
        "nome": "São Paulo"
      }
    }
  },
  "populacao_estimada": [
    {
      "ano": 2023,
      "quantidade": 12400234
    },
    {
      "ano": 2024,
      "quantidade": 12510345
    }
  ]
}

Ao carregar esse dado no Databricks, o schema do nosso DataFrame df_raw ficaria parecido com isto:

root
 |-- id: long (nullable = true)
 |-- municipio: string (nullable = true)
 |-- microrregiao: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- mesorregiao: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- nome: string (nullable = true)
 |    |    |    |-- sigla: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |-- populacao_estimada: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ano: long (nullable = true)
 |    |    |-- quantidade: long (nullable = true)

Como podemos ver, temos campos do tipo struct (que representam objetos aninhados) e array (listas). Nosso objetivo é transformar essa estrutura em um formato tabular, plano, onde cada informação ocupe sua própria coluna.


Abordagem 1: Acesso Direto com a Notação de Ponto (Dot Notation)


A maneira mais direta e intuitiva de acessar campos dentro de uma estrutura aninhada é utilizando a "notação de ponto". É uma ótima solução quando você já conhece o schema e precisa de poucos campos específicos.


Explicação:

A sintaxe é simples: df.select("coluna_pai.coluna_filha.campo"). Para cada nível de aninhamento, adicionamos um ponto para navegar mais fundo na estrutura.


Exemplo Prático:

Vamos extrair o nome do município, a sigla da UF e o nome da mesorregião.

Python

from pyspark.sql.functions import col

df_campos_especificos = df_raw.select(
    col("id"),
    col("municipio"),
    col("microrregiao.mesorregiao.UF.sigla").alias("uf_sigla"),
    col("microrregiao.mesorregiao.nome").alias("mesorregiao_nome")
)

display(df_campos_especificos)

Análise do Resultado:

Esta abordagem é extremamente eficiente para seleções rápidas. O resultado é um DataFrame limpo e com as colunas renomeadas (alias), pronto para uso.

id

municipio

uf_sigla

mesorregiao_nome

1

São Paulo

SP

Metropolitana de São Paulo

No entanto, e se a estrutura for muito complexa, com dezenas de campos aninhados? Selecionar um por um seria improdutivo e propenso a erros.


Abordagem 2: Descoberta Dinâmica e Flattening Programático


Aqui entra uma técnica mais avançada e escalável. E se pudéssemos inspecionar o DataFrame, descobrir todos os "caminhos" (paths) para os campos aninhados e, em seguida, usar essa informação para criar as colunas de forma programática?


O comando DESCRIBE EXTENDED é nosso aliado aqui.


Passo 1: Inspecionando o Schema Detalhado

Quando lidamos com schemas complexos, o printSchema() é bom, mas o DESCRIBE EXTENDED em SQL nos dá uma visão tabular e detalhada, perfeita para extrair os nomes das colunas.

ree

Explicação:

Este comando retorna metadados sobre cada coluna, incluindo seu nome (col_name) e tipo de dado (data_type). Para estruturas aninhadas, o nome da coluna já vem no formato pai.filho, exatamente o que precisamos.


Exemplo Prático:

Primeiro, vamos registrar nosso DataFrame como uma tabela temporária para podermos usar SQL.

# Registrando o DataFrame como uma view temporária
df_raw.createOrReplaceTempView("vw_dados_brutos")

# Executando o describe extended
df_schema_info = spark.sql("DESCRIBE EXTENDED vw_dados_brutos")

display(df_schema_info)

Análise do Resultado:

O display(df_schema_info) nos mostrará uma tabela com todos os campos, incluindo os aninhados. O que nos interessa é a coluna col_name. Note como os campos dentro de microrregiao já aparecem com a notação de ponto:

col_name

data_type

id

bigint

municipio

string

microrregiao

struct<id:bigint,mesorregiao:struct...

bigint

microrregiao.mesorregiao

struct<UF:struct...

microrregiao.mesorregiao.UF

bigint

...

...

populacao_estimada

array<struct...

Passo 2: Construindo a Lista de Colunas e Achatando o DataFrame

Agora, podemos coletar esses nomes de coluna (que não são struct ou array) para construir nossa seleção dinâmica.


Explicação:

Vamos filtrar o DataFrame df_schema_info para pegar apenas os campos atômicos (que não são estruturas complexas), coletar seus nomes em uma lista Python e, em seguida, usar essa lista para construir a seleção de colunas no DataFrame original.


Exemplo Prático:

from pyspark.sql.functions import col

# Filtrar para obter apenas os campos que não são struct ou array
# e que contêm um ponto (indicando que são aninhados) ou são de primeiro nível.
schema_info_filtrado = df_schema_info.filter(
    ~col("data_type").startswith("struct") & ~col("data_type").startswith("array")
)

# Criar a lista com os nomes completos das colunas
colunas_para_selecionar = [row['col_name'] for row in schema_info_filtrado.collect()]

# Vamos criar um nome de alias mais limpo para cada coluna
expressoes_select = [
    col(c).alias(c.replace(".", "_")) for c in colunas_para_selecionar
]

# Aplicar a seleção ao DataFrame original
df_achatado = df_raw.select(expressoes_select)

display(df_achatado)

Análise do Resultado:

Este método produziu um DataFrame totalmente plano de forma automática. O laço for percorreu a lista de nomes que extraímos e aplicou a seleção, renomeando as colunas para um formato mais amigável (ex: microrregiao.mesorregiao.nome virou microrregiao_mesorregiao_nome).

id

municipio

microrregiao_id

microrregiao_mesorregiao_UF_id

microrregiao_mesorregiao_UF_nome

microrregiao_mesorregiao_UF_sigla

...

1

São Paulo

3515

35

São Paulo

SP

...


Nota sobre Arrays: A coluna populacao_estimada é um array. Para achatá-la, usaríamos a função explode(), que cria uma nova linha para cada elemento do array. Isso poderia ser um passo adicional após o flattening inicial das structs.

from pyspark.sql.functions import explode

# Primeiro, explodimos o array
df_com_populacao_explodida = df_raw.withColumn("populacao", explode("populacao_estimada"))

# Agora, podemos aplicar a notação de ponto
df_final = df_com_populacao_explodida.select(
    "id",
    "municipio",
    "microrregiao.mesorregiao.UF.sigla as uf",
    "populacao.ano as ano_estimativa",
    "populacao.quantidade as populacao_estimada"
)

display(df_final)

Conclusão e Boas Práticas


Lidar com JSONs aninhados é uma realidade no mundo dos dados. Felizmente, o ecossistema Databricks e Spark nos dá o poder de escolha.


  1. Notação de Ponto: Use para seleções rápidas, específicas e quando o schema é bem conhecido. É a sua ferramenta de precisão.

  2. DESCRIBE EXTENDED e Abordagem Dinâmica: A melhor escolha para schemas complexos, desconhecidos ou que podem mudar. Ela torna seu código mais robusto, escalável e menos suscetível a erros manuais. É a sua ferramenta de automação.

  3. Use alias(): Sempre renomeie suas colunas para nomes claros e sem caracteres especiais. Isso facilita não só a leitura do código, mas também a integração com outras ferramentas e sistemas de BI.

  4. Função explode() para Arrays: Lembre-se que para achatar arrays (listas) de estruturas, a função explode() é a ferramenta correta, transformando elementos da lista em novas linhas.


Adotar essas técnicas não apenas otimiza seu trabalho, mas também demonstra maturidade e profundidade técnica no desenvolvimento de pipelines de dados. A capacidade de inspecionar e adaptar seu código dinamicamente ao schema dos dados é uma habilidade valiosa para qualquer profissional da área.


Espero que este guia prático ajude você a navegar pelas complexidades dos dados JSON no seu dia a dia.


Até a próxima!

Comentários


bottom of page