top of page

3 _ [LAB - Hotelaria] Ingestão Source → Landing com Databricks CLI, Asset Bundles e Unity Catalog Volumes


Nesta etapa do laboratório, o objetivo é estruturar a primeira parte do pipeline de engenharia de dados: a ingestão incremental dos dados do Neon PostgreSQL para a camada Landing no Databricks.

O fluxo implementado até aqui é:

Neon PostgreSQL
        ↓
Foreign Catalog no Databricks
        ↓
Job com Databricks Asset Bundles
        ↓
Landing em Unity Catalog Volume
        ↓
Arquivos Parquet particionados por tabela, data e batch

Nesta fase ainda não estamos carregando a Bronze. O foco é apenas garantir que a camada Landing receba snapshots incrementais do banco de origem com controle por marca d’água.


1. Por que usar Databricks CLI e Asset Bundles?

O Databricks CLI permite interagir com o workspace Databricks a partir do terminal local, usando comandos para autenticar, validar, implantar e executar recursos. A Databricks recomenda o uso da CLI moderna, versão 0.205.0 ou superior, para trabalhar com Bundles e automações. (Documentação Databricks)

Os Databricks Asset Bundles, atualmente documentados também como Declarative Automation Bundles, permitem declarar recursos como jobs, pipelines, notebooks, variáveis e targets em arquivos YAML versionáveis. Isso aproxima o desenvolvimento local de uma prática real de CI/CD para engenharia de dados. (Documentação Databricks)

Neste laboratório, usaremos o Bundle para controlar a implantação e execução do job de ingestão da Landing.


2. Por que usar Volumes na Landing?

A camada Landing precisa armazenar arquivos físicos extraídos da origem. Esses arquivos representam snapshots ou fatias incrementais do banco, preservando o histórico de ingestão.

Por isso, usamos Unity Catalog Volumes. Volumes são objetos governados pelo Unity Catalog usados para armazenar, organizar e controlar acesso a arquivos e dados não tabulares. Enquanto tabelas governam dados tabulares, volumes governam arquivos em formatos como Parquet, JSON, CSV e outros. (Documentação Databricks)

A Landing ficará em Volume porque queremos manter arquivos físicos como:

/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=2026-05-09/batch_timestamp=20260509143000/

Dentro desse diretório, o Spark grava arquivos part-*.parquet.

Essa escolha é melhor do que tentar gravar a Landing diretamente como tabela Delta, porque a Landing deve preservar a extração original como arquivo de reprocessamento.


3. Estratégia incremental adotada

A ingestão da Landing usa o conceito de marca d’água, ou seja, uma coluna de controle temporal que indica até onde a origem já foi processada.

No nosso caso, cada tabela possui a coluna:

update_date

A tabela de controle armazena o último update_date processado com sucesso por tabela.

A regra usada é:

where update_date >= last_watermark_value - lookback

Ou seja, não buscamos apenas registros maiores que o último valor. Aplicamos também um lookback de segurança.

Exemplo:

last_watermark_value = 2026-05-09 10:30:00
lookback_minutes     = 1440

filtro aplicado:
update_date >= 2026-05-08 10:30:00

Esse lookback reduz risco de perda de dados em cenários como:

atraso de atualização na origem
diferença de timezone
registros com timestamps iguais
falha parcial de carga
necessidade de reprocessamento

A Landing recebe os dados novamente se caírem dentro da janela de lookback. Isso é aceitável porque a Bronze será append-only e, depois, a Silver tratará deduplicação e regra de negócio.


4. Estratégia de retenção da Landing

A Landing será mantida como histórico de extração por até 15 dias.

A estrutura do arquivo será baseada em:

source_system
source_table
ingestion_date
batch_timestamp

Exemplo:

/Volumes/hotelaria_dev/landing/neon_postgresql/reservas/ingestion_date=2026-05-09/batch_timestamp=20260509143000/

Essa abordagem permite:

reprocessar a Bronze a partir dos arquivos da Landing
auditar o que foi extraído em cada execução
comparar diferentes batches
evitar dependência direta da origem em caso de reprocessamento

5. Limitação do Databricks Free Edition

Como este laboratório está sendo executado no Databricks Free Edition, precisamos adaptar algumas decisões. A edição gratuita possui limitações como uso de compute serverless, restrição de customização de compute e limite de recursos. (Documentação Databricks)

Além disso, a Free Edition possui apenas um workspace e um metastore por conta, então a separação de ambientes será simulada por catálogos, e não por workspaces separados. (Documentação Databricks)

Por isso usamos:

hotelaria_dev
hotelaria_prod

Neste passo, trabalhamos somente com:

hotelaria_dev

Parte 1 — Instalação da Databricks CLI no Linux Mint/Ubuntu


1.1 Atualizar pacotes básicos

No terminal:

sudo apt update

Opcionalmente:

sudo apt upgrade -y

1.2 Instalar dependências úteis

sudo apt install curl git nano tree python3 python3-pip -y

Esses pacotes serão usados para:

curl    → baixar o instalador da CLI
git     → versionamento do projeto
nano    → editar arquivos pelo terminal
tree    → visualizar estrutura de pastas
python3 → validações locais
pip     → instalar bibliotecas auxiliares, se necessário

1.3 Instalar a Databricks CLI

No nosso ambiente Linux, o comando correto usado foi:

curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/v0.299.1/install.sh | sudo sh

O repositório oficial databricks/setup-cli permite instalar a versão mais recente usando main ou uma versão específica trocando main pela tag da versão, como v0.299.1. (GitHub)


1.4 Validar instalação

Execute:

databricks -v

ou:

databricks version

O esperado é retornar uma versão da CLI, por exemplo:

Databricks CLI v0.299.1

Parte 2 — Autenticação da CLI com o Databricks


2.1 Criar profile de autenticação

Usamos o profile:

hotelaria-free

Esse profile é apenas um apelido local para a autenticação da CLI. Ele não é ambiente, não é catálogo e não é workspace.

Execute:

databricks auth login \
  --host https://dbc-26a3d512-ff21.cloud.databricks.com \
  --profile hotelaria-free

A CLI abrirá o navegador para autenticação.


2.2 Validar profile

databricks auth describe --profile hotelaria-free

Depois:

databricks workspace list / --profile hotelaria-free

Se listar o workspace, a autenticação está funcionando.


2.3 Conferir arquivo local de configuração

cat ~/.databrickscfg

O resultado esperado deve ter algo parecido com:

[hotelaria-free]
host         = https://dbc-26a3d512-ff21.cloud.databricks.com
account_id   = ...
workspace_id = ...
auth_type    = databricks-cli

[__settings__]
default_profile = hotelaria-free

Se o profile estiver com outro nome, os comandos precisam usar exatamente o nome existente.


Parte 3 — Estrutura local do projeto


3.1 Acessar a pasta do projeto

cd ~/Documentos/Laboratorio-ETL-end-to-end-with-assets-budles-and-DLT

3.2 Criar estrutura de diretórios

mkdir -p resources/jobs
mkdir -p resources/pipelines
mkdir -p src/common
mkdir -p src/landing
mkdir -p src/bronze
mkdir -p sql/setup
mkdir -p sql/control
mkdir -p contracts
mkdir -p tests

3.3 Criar arquivos iniciais

touch databricks.yml
touch README.md

3.4 Validar estrutura

tree -L 3

Estrutura esperada:

Laboratorio-ETL-end-to-end-with-assets-budles-and-DLT/
├── databricks.yml
├── resources
│   ├── jobs
│   └── pipelines
├── src
│   ├── bronze
│   ├── common
│   └── landing
├── sql
│   ├── control
│   └── setup
├── contracts
├── tests
└── README.md

Parte 4 — Configuração inicial do Bundle


4.1 Criar o databricks.yml

Abra o arquivo:

nano databricks.yml

Cole:

bundle:
  name: laboratorio-etl-hotelaria

workspace:
  host: https://dbc-26a3d512-ff21.cloud.databricks.com

include:
  - resources/jobs/*.yml
  - resources/pipelines/*.yml

variables:
  source_catalog:
    default: neon_hotelaria

  source_schema:
    default: public

  source_system:
    default: neon_postgresql

  target_catalog:
    default: hotelaria_dev

targets:
  dev:
    mode: development
    default: true
    workspace:
      host: https://dbc-26a3d512-ff21.cloud.databricks.com
    variables:
      target_catalog:
        default: hotelaria_dev
      source_catalog:
        default: neon_hotelaria
      source_schema:
        default: public
      source_system:
        default: neon_postgresql

  prod:
    mode: production
    workspace:
      host: https://dbc-26a3d512-ff21.cloud.databricks.com
    variables:
      target_catalog:
        default: hotelaria_prod
      source_catalog:
        default: neon_hotelaria
      source_schema:
        default: public
      source_system:
        default: neon_postgresql

Salve:

CTRL + O
ENTER
CTRL + X

4.2 Validação importante sobre YAML

O formato correto dentro de targets.dev.variables é:

variables:
  target_catalog:
    default: hotelaria_dev

E não:

variables:
  target_catalog: hotelaria_dev

Essa foi uma das correções aplicadas durante o laboratório.


4.3 Validar o Bundle

databricks bundle validate --target dev --profile hotelaria-free

Se tudo estiver correto, a validação passa.


Parte 5 — Criar objetos base no Databricks

Agora criaremos os objetos base no Unity Catalog.


5.1 Criar script de setup

No terminal:

nano sql/setup/00_setup_catalog_schemas_volumes.sql

Cole:

-- ============================================================
-- Setup inicial do laboratório Hotelaria
-- Catálogos, schemas, volume e tabelas de controle
-- ============================================================

create catalog if not exists hotelaria_dev;

create schema if not exists hotelaria_dev.control;
create schema if not exists hotelaria_dev.landing;
create schema if not exists hotelaria_dev.bronze;
create schema if not exists hotelaria_dev.silver;
create schema if not exists hotelaria_dev.gold;

-- Volume para arquivos físicos da Landing
create volume if not exists hotelaria_dev.landing.neon_postgresql;

-- ============================================================
-- Tabela de controle de watermark
-- ============================================================

create table if not exists hotelaria_dev.control.ingestion_watermark (
    source_system string,
    source_catalog string,
    source_schema string,
    source_table string,
    target_catalog string,
    target_schema string,
    target_table string,
    watermark_column string,
    last_watermark_value timestamp,
    lookback_minutes int,
    last_ingestion_id string,
    last_status string,
    last_success_at timestamp,
    created_at timestamp,
    updated_at timestamp
)
using delta;

-- ============================================================
-- Tabela de controle de arquivos processados na Bronze
-- ============================================================

create table if not exists hotelaria_dev.control.processed_files (
    source_system string,
    source_table string,
    source_file_path string,
    source_file_name string,
    file_modification_time timestamp,
    file_size bigint,
    ingestion_id string,
    processed_at timestamp,
    target_table string,
    status string,
    created_at timestamp
)
using delta;

5.2 Criar seed da watermark

nano sql/control/01_seed_ingestion_watermark.sql

Cole:

-- ============================================================
-- Seed inicial da tabela de watermark
-- Uma linha por tabela origem
-- ============================================================

insert into hotelaria_dev.control.ingestion_watermark
select
    source_system,
    source_catalog,
    source_schema,
    source_table,
    target_catalog,
    target_schema,
    target_table,
    watermark_column,
    last_watermark_value,
    lookback_minutes,
    last_ingestion_id,
    last_status,
    last_success_at,
    current_timestamp() as created_at,
    current_timestamp() as updated_at
from values
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'hoteis',
        'hotelaria_dev',
        'landing',
        'hoteis',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'hospedes',
        'hotelaria_dev',
        'landing',
        'hospedes',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'quartos',
        'hotelaria_dev',
        'landing',
        'quartos',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'reservas',
        'hotelaria_dev',
        'landing',
        'reservas',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'consumos',
        'hotelaria_dev',
        'landing',
        'consumos',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'faturas',
        'hotelaria_dev',
        'landing',
        'faturas',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    ),
    (
        'neon_postgresql',
        'neon_hotelaria',
        'public',
        'reservas_ota',
        'hotelaria_dev',
        'landing',
        'reservas_ota',
        'update_date',
        timestamp('1900-01-01 00:00:00'),
        1440,
        null,
        'pending',
        null
    )
as t (
    source_system,
    source_catalog,
    source_schema,
    source_table,
    target_catalog,
    target_schema,
    target_table,
    watermark_column,
    last_watermark_value,
    lookback_minutes,
    last_ingestion_id,
    last_status,
    last_success_at
);

5.3 Executar os scripts no Databricks

Neste laboratório, executamos essa parte manualmente pelo Databricks SQL Editor.

Passo a passo:

1. Abrir o workspace Databricks no navegador
2. Clicar em SQL Editor
3. Criar uma New Query
4. Colar o conteúdo de sql/setup/00_setup_catalog_schemas_volumes.sql
5. Clicar em Run
6. Apagar o conteúdo anterior
7. Colar o conteúdo de sql/control/01_seed_ingestion_watermark.sql
8. Clicar em Run

5.4 Validar objetos criados

No SQL Editor, execute:

show schemas in hotelaria_dev;

Resultado esperado:

bronze
control
gold
landing
silver

Validar volume:

show volumes in hotelaria_dev.landing;

Resultado esperado:

neon_postgresql

Validar watermark:

select *
from hotelaria_dev.control.ingestion_watermark
order by source_table;

Resultado esperado: 7 linhas.

Validar controle de arquivos:

select *
from hotelaria_dev.control.processed_files;

Resultado esperado: 0 linhas.


Parte 6 — Criar script de ingestão Source → Landing


6.1 Criar arquivo Python

No terminal:

nano src/landing/ingest_neon_to_landing.py

Cole:

# Databricks notebook source

from datetime import datetime, timezone
from pyspark.sql import functions as F
import uuid

# ============================================================
# Widgets / parâmetros
# ============================================================

dbutils.widgets.text("target_catalog", "hotelaria_dev")
dbutils.widgets.text("source_catalog", "neon_hotelaria")
dbutils.widgets.text("source_schema", "public")
dbutils.widgets.text("source_system", "neon_postgresql")
dbutils.widgets.text("table_name", "all")

target_catalog = dbutils.widgets.get("target_catalog")
source_catalog = dbutils.widgets.get("source_catalog")
source_schema = dbutils.widgets.get("source_schema")
source_system = dbutils.widgets.get("source_system")
table_name_param = dbutils.widgets.get("table_name")

control_schema = "control"
landing_schema = "landing"

watermark_table = f"{target_catalog}.{control_schema}.ingestion_watermark"

landing_volume_base_path = f"/Volumes/{target_catalog}/{landing_schema}/{source_system}"

batch_timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
ingestion_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
ingestion_id = str(uuid.uuid4())

print("=" * 100)
print("INÍCIO DA INGESTÃO SOURCE → LANDING")
print(f"target_catalog..........: {target_catalog}")
print(f"source_catalog..........: {source_catalog}")
print(f"source_schema...........: {source_schema}")
print(f"source_system...........: {source_system}")
print(f"table_name_param........: {table_name_param}")
print(f"batch_timestamp.........: {batch_timestamp}")
print(f"ingestion_date..........: {ingestion_date}")
print(f"ingestion_id............: {ingestion_id}")
print(f"landing_volume_base_path: {landing_volume_base_path}")
print("=" * 100)


# ============================================================
# Funções auxiliares
# ============================================================

def get_tables_to_process():
    df = (
        spark.table(watermark_table)
        .filter(F.col("source_system") == source_system)
        .filter(F.col("source_catalog") == source_catalog)
        .filter(F.col("source_schema") == source_schema)
    )

    if table_name_param.lower() != "all":
        df = df.filter(F.col("source_table") == table_name_param)

    rows = df.collect()

    if not rows:
        raise Exception(
            f"Nenhuma tabela encontrada na watermark para table_name={table_name_param}"
        )

    return rows


def read_source_table(row):
    source_table_full_name = (
        f"`{row.source_catalog}`.`{row.source_schema}`.`{row.source_table}`"
    )

    last_watermark_value = row.last_watermark_value
    lookback_minutes = row.lookback_minutes
    watermark_column = row.watermark_column

    print("-" * 100)
    print(f"Lendo origem.............: {source_table_full_name}")
    print(f"watermark_column.........: {watermark_column}")
    print(f"last_watermark_value.....: {last_watermark_value}")
    print(f"lookback_minutes.........: {lookback_minutes}")

    df_source = spark.table(source_table_full_name)

    if watermark_column not in df_source.columns:
        raise Exception(
            f"Coluna de watermark '{watermark_column}' não existe na tabela {source_table_full_name}"
        )

    watermark_start_expr = F.expr(
        f"timestampadd(MINUTE, -{lookback_minutes}, timestamp('{last_watermark_value}'))"
    )

    df_filtered = (
        df_source
        .filter(F.col(watermark_column) >= watermark_start_expr)
    )

    return df_filtered


def add_landing_metadata(df, row):
    return (
        df
        .withColumn("_metadata_ingestion_id", F.lit(ingestion_id))
        .withColumn("_metadata_ingestion_at", F.current_timestamp())
        .withColumn("_metadata_batch_timestamp", F.lit(batch_timestamp))
        .withColumn("_metadata_source_system", F.lit(source_system))
        .withColumn("_metadata_source_catalog", F.lit(row.source_catalog))
        .withColumn("_metadata_source_schema", F.lit(row.source_schema))
        .withColumn("_metadata_source_table", F.lit(row.source_table))
        .withColumn("_metadata_watermark_column", F.lit(row.watermark_column))
        .withColumn("_metadata_landing_ingestion_date", F.lit(ingestion_date))
    )


def write_landing_file(df, row):
    target_path = (
        f"{landing_volume_base_path}/"
        f"{row.source_table}/"
        f"ingestion_date={ingestion_date}/"
        f"batch_timestamp={batch_timestamp}/"
    )

    print(f"Gravando Landing em......: {target_path}")

    (
        df.write
        .format("parquet")
        .mode("overwrite")
        .save(target_path)
    )

    return target_path


def update_watermark(row, df_landing):
    watermark_column = row.watermark_column

    max_watermark = (
        df_landing
        .select(F.max(F.col(watermark_column)).alias("max_watermark"))
        .collect()[0]["max_watermark"]
    )

    if max_watermark is None:
        print(f"Nenhum max_watermark encontrado para {row.source_table}. Watermark não será atualizada.")
        return

    spark.sql(f"""
        update {watermark_table}
        set
            last_watermark_value = timestamp('{max_watermark}'),
            last_ingestion_id = '{ingestion_id}',
            last_status = 'success',
            last_success_at = current_timestamp(),
            updated_at = current_timestamp()
        where source_system = '{source_system}'
          and source_catalog = '{row.source_catalog}'
          and source_schema = '{row.source_schema}'
          and source_table = '{row.source_table}'
    """)

    print(f"Watermark atualizada para: {max_watermark}")


# ============================================================
# Execução principal
# ============================================================

tables_to_process = get_tables_to_process()

for row in tables_to_process:
    print("=" * 100)
    print(f"Processando tabela: {row.source_table}")

    try:
        df_source = read_source_table(row)

        total_rows = df_source.count()
        print(f"Registros encontrados....: {total_rows}")

        if total_rows == 0:
            print(f"Nenhum registro novo para {row.source_table}. Pulando escrita.")
            continue

        df_landing = add_landing_metadata(df_source, row)

        target_path = write_landing_file(df_landing, row)

        update_watermark(row, df_landing)

        print(f"SUCESSO tabela............: {row.source_table}")
        print(f"Arquivo/diretório landing.: {target_path}")

    except Exception as e:
        print(f"ERRO tabela...............: {row.source_table}")
        print(f"Mensagem..................: {str(e)}")

        spark.sql(f"""
            update {watermark_table}
            set
                last_ingestion_id = '{ingestion_id}',
                last_status = 'failed',
                updated_at = current_timestamp()
            where source_system = '{source_system}'
              and source_catalog = '{row.source_catalog}'
              and source_schema = '{row.source_schema}'
              and source_table = '{row.source_table}'
        """)

        raise e

print("=" * 100)
print("FIM DA INGESTÃO SOURCE → LANDING")
print("=" * 100)

6.2 Observação importante sobre arquivos Parquet

No Spark, o comando:

df.write.parquet(path)

não gera um único arquivo chamado, por exemplo:

hoteis_20260509143000.parquet

Ele cria uma pasta com arquivos internos do tipo:

part-00000-....parquet

Por isso, adotamos a estrutura:

/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=yyyy-MM-dd/batch_timestamp=yyyyMMddHHmmss/

Esse é o padrão mais adequado para processamento distribuído em Spark.


Parte 7 — Criar o Job do Asset Bundle


7.1 Criar arquivo do Job

nano resources/jobs/job_ingest_neon_to_landing.yml

Cole:

resources:
  jobs:
    ingest_neon_to_landing:
      name: "ingest_neon_to_landing [${bundle.target}]"
      description: "Ingestão incremental Source Neon PostgreSQL para Landing usando watermark"

      tasks:
        - task_key: ingest_neon_to_landing
          notebook_task:
            notebook_path: ../../src/landing/ingest_neon_to_landing.py
            base_parameters:
              target_catalog: ${var.target_catalog}
              source_catalog: ${var.source_catalog}
              source_schema: ${var.source_schema}
              source_system: ${var.source_system}
              table_name: all

7.2 Correção importante do caminho do notebook

Como o arquivo do job está em:

resources/jobs/job_ingest_neon_to_landing.yml

e o notebook está em:

src/landing/ingest_neon_to_landing.py

o caminho correto é:

notebook_path: ../../src/landing/ingest_neon_to_landing.py

O caminho abaixo estava incorreto:

notebook_path: ../src/landing/ingest_neon_to_landing.py

Porque faria o Bundle procurar em:

resources/src/landing/ingest_neon_to_landing.py

E esse caminho não existe.


Parte 8 — Validar, implantar e executar o Job


8.1 Validar o Bundle

databricks bundle validate --target dev --profile hotelaria-free

Se der certo, a autenticação e os arquivos YAML estão corretos.


8.2 Fazer deploy

databricks bundle deploy --target dev --profile hotelaria-free

Esse comando publica os arquivos do projeto no workspace Databricks.


8.3 Ver resumo do Bundle

databricks bundle summary --target dev --profile hotelaria-free

Procure pelo job:

ingest_neon_to_landing

Também observe o caminho do workspace publicado, algo como:

/Workspace/Users/<usuario>/.bundle/laboratorio-etl-hotelaria/dev

8.4 Executar o Job

databricks bundle run ingest_neon_to_landing --target dev --profile hotelaria-free

Esse comando dispara o job no Databricks.


Parte 9 — Validações após execução do Job


9.1 Validar watermark

No Databricks SQL Editor:

select
    source_table,
    watermark_column,
    last_watermark_value,
    last_status,
    last_success_at,
    updated_at
from hotelaria_dev.control.ingestion_watermark
order by source_table;

Resultado esperado:

last_status = success

para as tabelas processadas.


9.2 Validar diretórios da Landing

list '/Volumes/hotelaria_dev/landing/neon_postgresql/';

Depois validar uma tabela:

list '/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/';

Depois validar a partição por data:

list '/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=YYYY-MM-DD/';

Substitua YYYY-MM-DD pela data da execução.


9.3 Ler arquivos da Landing

Em notebook PySpark:

df = spark.read.parquet("/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/")
display(df)

Ou via SQL:

select *
from parquet.`/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/`
limit 10;

Parte 10 — Resultado alcançado

Ao final deste passo, temos:

Databricks CLI instalada no Linux
Profile hotelaria-free configurado
Projeto local estruturado
databricks.yml criado e validado
Schemas e volume criados no Unity Catalog
Tabela de watermark criada
Tabela de arquivos processados criada
Script de ingestão Source → Landing criado
Job do Asset Bundle criado
Bundle validado
Bundle implantado
Job executado com sucesso
Arquivos Parquet gerados na Landing
Watermark atualizada com sucesso

Fluxo final desta etapa:

neon_hotelaria.public.hoteis
neon_hotelaria.public.hospedes
neon_hotelaria.public.quartos
neon_hotelaria.public.reservas
neon_hotelaria.public.consumos
neon_hotelaria.public.faturas
neon_hotelaria.public.reservas_ota
        ↓
/Volumes/hotelaria_dev/landing/neon_postgresql/<tabela>/ingestion_date=<data>/batch_timestamp=<timestamp>/

Parte 11 — Decisão arquitetural desta etapa

A decisão adotada foi:

Source:
  Neon PostgreSQL refletido no Databricks via Foreign Catalog

Landing:
  Arquivos Parquet em Unity Catalog Volume

Controle incremental:
  Tabela Delta hotelaria_dev.control.ingestion_watermark

Incremento:
  update_date com lookback de 1440 minutos

Retenção:
  Histórico físico da Landing por até 15 dias

Orquestração:
  Databricks Job gerenciado por Asset Bundle

Deploy:
  Databricks CLI via target dev

Essa abordagem mantém a Landing simples, auditável, reprocessável e aderente a um cenário real de engenharia de dados.

O próximo passo será construir a carga Landing → Bronze, mantendo a Bronze como append-only, sem merge, sem update e sem delete.

Comentários


bottom of page