3 _ [LAB - Hotelaria] Ingestão Source → Landing com Databricks CLI, Asset Bundles e Unity Catalog Volumes
- Michel Souza Santana
- há 6 dias
- 10 min de leitura

Nesta etapa do laboratório, o objetivo é estruturar a primeira parte do pipeline de engenharia de dados: a ingestão incremental dos dados do Neon PostgreSQL para a camada Landing no Databricks.
O fluxo implementado até aqui é:
Neon PostgreSQL
↓
Foreign Catalog no Databricks
↓
Job com Databricks Asset Bundles
↓
Landing em Unity Catalog Volume
↓
Arquivos Parquet particionados por tabela, data e batch
Nesta fase ainda não estamos carregando a Bronze. O foco é apenas garantir que a camada Landing receba snapshots incrementais do banco de origem com controle por marca d’água.
1. Por que usar Databricks CLI e Asset Bundles?
O Databricks CLI permite interagir com o workspace Databricks a partir do terminal local, usando comandos para autenticar, validar, implantar e executar recursos. A Databricks recomenda o uso da CLI moderna, versão 0.205.0 ou superior, para trabalhar com Bundles e automações. (Documentação Databricks)
Os Databricks Asset Bundles, atualmente documentados também como Declarative Automation Bundles, permitem declarar recursos como jobs, pipelines, notebooks, variáveis e targets em arquivos YAML versionáveis. Isso aproxima o desenvolvimento local de uma prática real de CI/CD para engenharia de dados. (Documentação Databricks)
Neste laboratório, usaremos o Bundle para controlar a implantação e execução do job de ingestão da Landing.
2. Por que usar Volumes na Landing?
A camada Landing precisa armazenar arquivos físicos extraídos da origem. Esses arquivos representam snapshots ou fatias incrementais do banco, preservando o histórico de ingestão.
Por isso, usamos Unity Catalog Volumes. Volumes são objetos governados pelo Unity Catalog usados para armazenar, organizar e controlar acesso a arquivos e dados não tabulares. Enquanto tabelas governam dados tabulares, volumes governam arquivos em formatos como Parquet, JSON, CSV e outros. (Documentação Databricks)
A Landing ficará em Volume porque queremos manter arquivos físicos como:
/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=2026-05-09/batch_timestamp=20260509143000/
Dentro desse diretório, o Spark grava arquivos part-*.parquet.
Essa escolha é melhor do que tentar gravar a Landing diretamente como tabela Delta, porque a Landing deve preservar a extração original como arquivo de reprocessamento.
3. Estratégia incremental adotada
A ingestão da Landing usa o conceito de marca d’água, ou seja, uma coluna de controle temporal que indica até onde a origem já foi processada.
No nosso caso, cada tabela possui a coluna:
update_date
A tabela de controle armazena o último update_date processado com sucesso por tabela.
A regra usada é:
where update_date >= last_watermark_value - lookback
Ou seja, não buscamos apenas registros maiores que o último valor. Aplicamos também um lookback de segurança.
Exemplo:
last_watermark_value = 2026-05-09 10:30:00
lookback_minutes = 1440
filtro aplicado:
update_date >= 2026-05-08 10:30:00
Esse lookback reduz risco de perda de dados em cenários como:
atraso de atualização na origem
diferença de timezone
registros com timestamps iguais
falha parcial de carga
necessidade de reprocessamento
A Landing recebe os dados novamente se caírem dentro da janela de lookback. Isso é aceitável porque a Bronze será append-only e, depois, a Silver tratará deduplicação e regra de negócio.
4. Estratégia de retenção da Landing
A Landing será mantida como histórico de extração por até 15 dias.
A estrutura do arquivo será baseada em:
source_system
source_table
ingestion_date
batch_timestamp
Exemplo:
/Volumes/hotelaria_dev/landing/neon_postgresql/reservas/ingestion_date=2026-05-09/batch_timestamp=20260509143000/
Essa abordagem permite:
reprocessar a Bronze a partir dos arquivos da Landing
auditar o que foi extraído em cada execução
comparar diferentes batches
evitar dependência direta da origem em caso de reprocessamento
5. Limitação do Databricks Free Edition
Como este laboratório está sendo executado no Databricks Free Edition, precisamos adaptar algumas decisões. A edição gratuita possui limitações como uso de compute serverless, restrição de customização de compute e limite de recursos. (Documentação Databricks)
Além disso, a Free Edition possui apenas um workspace e um metastore por conta, então a separação de ambientes será simulada por catálogos, e não por workspaces separados. (Documentação Databricks)
Por isso usamos:
hotelaria_dev
hotelaria_prod
Neste passo, trabalhamos somente com:
hotelaria_dev
Parte 1 — Instalação da Databricks CLI no Linux Mint/Ubuntu
1.1 Atualizar pacotes básicos
No terminal:
sudo apt update
Opcionalmente:
sudo apt upgrade -y
1.2 Instalar dependências úteis
sudo apt install curl git nano tree python3 python3-pip -y
Esses pacotes serão usados para:
curl → baixar o instalador da CLI
git → versionamento do projeto
nano → editar arquivos pelo terminal
tree → visualizar estrutura de pastas
python3 → validações locais
pip → instalar bibliotecas auxiliares, se necessário
1.3 Instalar a Databricks CLI
No nosso ambiente Linux, o comando correto usado foi:
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/v0.299.1/install.sh | sudo sh
O repositório oficial databricks/setup-cli permite instalar a versão mais recente usando main ou uma versão específica trocando main pela tag da versão, como v0.299.1. (GitHub)
1.4 Validar instalação
Execute:
databricks -v
ou:
databricks version
O esperado é retornar uma versão da CLI, por exemplo:
Databricks CLI v0.299.1
Parte 2 — Autenticação da CLI com o Databricks
2.1 Criar profile de autenticação
Usamos o profile:
hotelaria-free
Esse profile é apenas um apelido local para a autenticação da CLI. Ele não é ambiente, não é catálogo e não é workspace.
Execute:
databricks auth login \
--host https://dbc-26a3d512-ff21.cloud.databricks.com \
--profile hotelaria-free
A CLI abrirá o navegador para autenticação.
2.2 Validar profile
databricks auth describe --profile hotelaria-free
Depois:
databricks workspace list / --profile hotelaria-free
Se listar o workspace, a autenticação está funcionando.
2.3 Conferir arquivo local de configuração
cat ~/.databrickscfg
O resultado esperado deve ter algo parecido com:
[hotelaria-free]
host = https://dbc-26a3d512-ff21.cloud.databricks.com
account_id = ...
workspace_id = ...
auth_type = databricks-cli
[__settings__]
default_profile = hotelaria-free
Se o profile estiver com outro nome, os comandos precisam usar exatamente o nome existente.
Parte 3 — Estrutura local do projeto
3.1 Acessar a pasta do projeto
cd ~/Documentos/Laboratorio-ETL-end-to-end-with-assets-budles-and-DLT
3.2 Criar estrutura de diretórios
mkdir -p resources/jobs
mkdir -p resources/pipelines
mkdir -p src/common
mkdir -p src/landing
mkdir -p src/bronze
mkdir -p sql/setup
mkdir -p sql/control
mkdir -p contracts
mkdir -p tests
3.3 Criar arquivos iniciais
touch databricks.yml
touch README.md
3.4 Validar estrutura
tree -L 3
Estrutura esperada:
Laboratorio-ETL-end-to-end-with-assets-budles-and-DLT/
├── databricks.yml
├── resources
│ ├── jobs
│ └── pipelines
├── src
│ ├── bronze
│ ├── common
│ └── landing
├── sql
│ ├── control
│ └── setup
├── contracts
├── tests
└── README.md
Parte 4 — Configuração inicial do Bundle
4.1 Criar o databricks.yml
Abra o arquivo:
nano databricks.yml
Cole:
bundle:
name: laboratorio-etl-hotelaria
workspace:
host: https://dbc-26a3d512-ff21.cloud.databricks.com
include:
- resources/jobs/*.yml
- resources/pipelines/*.yml
variables:
source_catalog:
default: neon_hotelaria
source_schema:
default: public
source_system:
default: neon_postgresql
target_catalog:
default: hotelaria_dev
targets:
dev:
mode: development
default: true
workspace:
host: https://dbc-26a3d512-ff21.cloud.databricks.com
variables:
target_catalog:
default: hotelaria_dev
source_catalog:
default: neon_hotelaria
source_schema:
default: public
source_system:
default: neon_postgresql
prod:
mode: production
workspace:
host: https://dbc-26a3d512-ff21.cloud.databricks.com
variables:
target_catalog:
default: hotelaria_prod
source_catalog:
default: neon_hotelaria
source_schema:
default: public
source_system:
default: neon_postgresql
Salve:
CTRL + O
ENTER
CTRL + X
4.2 Validação importante sobre YAML
O formato correto dentro de targets.dev.variables é:
variables:
target_catalog:
default: hotelaria_dev
E não:
variables:
target_catalog: hotelaria_dev
Essa foi uma das correções aplicadas durante o laboratório.
4.3 Validar o Bundle
databricks bundle validate --target dev --profile hotelaria-free
Se tudo estiver correto, a validação passa.
Parte 5 — Criar objetos base no Databricks
Agora criaremos os objetos base no Unity Catalog.
5.1 Criar script de setup
No terminal:
nano sql/setup/00_setup_catalog_schemas_volumes.sql
Cole:
-- ============================================================
-- Setup inicial do laboratório Hotelaria
-- Catálogos, schemas, volume e tabelas de controle
-- ============================================================
create catalog if not exists hotelaria_dev;
create schema if not exists hotelaria_dev.control;
create schema if not exists hotelaria_dev.landing;
create schema if not exists hotelaria_dev.bronze;
create schema if not exists hotelaria_dev.silver;
create schema if not exists hotelaria_dev.gold;
-- Volume para arquivos físicos da Landing
create volume if not exists hotelaria_dev.landing.neon_postgresql;
-- ============================================================
-- Tabela de controle de watermark
-- ============================================================
create table if not exists hotelaria_dev.control.ingestion_watermark (
source_system string,
source_catalog string,
source_schema string,
source_table string,
target_catalog string,
target_schema string,
target_table string,
watermark_column string,
last_watermark_value timestamp,
lookback_minutes int,
last_ingestion_id string,
last_status string,
last_success_at timestamp,
created_at timestamp,
updated_at timestamp
)
using delta;
-- ============================================================
-- Tabela de controle de arquivos processados na Bronze
-- ============================================================
create table if not exists hotelaria_dev.control.processed_files (
source_system string,
source_table string,
source_file_path string,
source_file_name string,
file_modification_time timestamp,
file_size bigint,
ingestion_id string,
processed_at timestamp,
target_table string,
status string,
created_at timestamp
)
using delta;
5.2 Criar seed da watermark
nano sql/control/01_seed_ingestion_watermark.sql
Cole:
-- ============================================================
-- Seed inicial da tabela de watermark
-- Uma linha por tabela origem
-- ============================================================
insert into hotelaria_dev.control.ingestion_watermark
select
source_system,
source_catalog,
source_schema,
source_table,
target_catalog,
target_schema,
target_table,
watermark_column,
last_watermark_value,
lookback_minutes,
last_ingestion_id,
last_status,
last_success_at,
current_timestamp() as created_at,
current_timestamp() as updated_at
from values
(
'neon_postgresql',
'neon_hotelaria',
'public',
'hoteis',
'hotelaria_dev',
'landing',
'hoteis',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'hospedes',
'hotelaria_dev',
'landing',
'hospedes',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'quartos',
'hotelaria_dev',
'landing',
'quartos',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'reservas',
'hotelaria_dev',
'landing',
'reservas',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'consumos',
'hotelaria_dev',
'landing',
'consumos',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'faturas',
'hotelaria_dev',
'landing',
'faturas',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
),
(
'neon_postgresql',
'neon_hotelaria',
'public',
'reservas_ota',
'hotelaria_dev',
'landing',
'reservas_ota',
'update_date',
timestamp('1900-01-01 00:00:00'),
1440,
null,
'pending',
null
)
as t (
source_system,
source_catalog,
source_schema,
source_table,
target_catalog,
target_schema,
target_table,
watermark_column,
last_watermark_value,
lookback_minutes,
last_ingestion_id,
last_status,
last_success_at
);
5.3 Executar os scripts no Databricks
Neste laboratório, executamos essa parte manualmente pelo Databricks SQL Editor.
Passo a passo:
1. Abrir o workspace Databricks no navegador
2. Clicar em SQL Editor
3. Criar uma New Query
4. Colar o conteúdo de sql/setup/00_setup_catalog_schemas_volumes.sql
5. Clicar em Run
6. Apagar o conteúdo anterior
7. Colar o conteúdo de sql/control/01_seed_ingestion_watermark.sql
8. Clicar em Run
5.4 Validar objetos criados
No SQL Editor, execute:
show schemas in hotelaria_dev;
Resultado esperado:
bronze
control
gold
landing
silver
Validar volume:
show volumes in hotelaria_dev.landing;
Resultado esperado:
neon_postgresql
Validar watermark:
select *
from hotelaria_dev.control.ingestion_watermark
order by source_table;
Resultado esperado: 7 linhas.
Validar controle de arquivos:
select *
from hotelaria_dev.control.processed_files;
Resultado esperado: 0 linhas.
Parte 6 — Criar script de ingestão Source → Landing
6.1 Criar arquivo Python
No terminal:
nano src/landing/ingest_neon_to_landing.py
Cole:
# Databricks notebook source
from datetime import datetime, timezone
from pyspark.sql import functions as F
import uuid
# ============================================================
# Widgets / parâmetros
# ============================================================
dbutils.widgets.text("target_catalog", "hotelaria_dev")
dbutils.widgets.text("source_catalog", "neon_hotelaria")
dbutils.widgets.text("source_schema", "public")
dbutils.widgets.text("source_system", "neon_postgresql")
dbutils.widgets.text("table_name", "all")
target_catalog = dbutils.widgets.get("target_catalog")
source_catalog = dbutils.widgets.get("source_catalog")
source_schema = dbutils.widgets.get("source_schema")
source_system = dbutils.widgets.get("source_system")
table_name_param = dbutils.widgets.get("table_name")
control_schema = "control"
landing_schema = "landing"
watermark_table = f"{target_catalog}.{control_schema}.ingestion_watermark"
landing_volume_base_path = f"/Volumes/{target_catalog}/{landing_schema}/{source_system}"
batch_timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
ingestion_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
ingestion_id = str(uuid.uuid4())
print("=" * 100)
print("INÍCIO DA INGESTÃO SOURCE → LANDING")
print(f"target_catalog..........: {target_catalog}")
print(f"source_catalog..........: {source_catalog}")
print(f"source_schema...........: {source_schema}")
print(f"source_system...........: {source_system}")
print(f"table_name_param........: {table_name_param}")
print(f"batch_timestamp.........: {batch_timestamp}")
print(f"ingestion_date..........: {ingestion_date}")
print(f"ingestion_id............: {ingestion_id}")
print(f"landing_volume_base_path: {landing_volume_base_path}")
print("=" * 100)
# ============================================================
# Funções auxiliares
# ============================================================
def get_tables_to_process():
df = (
spark.table(watermark_table)
.filter(F.col("source_system") == source_system)
.filter(F.col("source_catalog") == source_catalog)
.filter(F.col("source_schema") == source_schema)
)
if table_name_param.lower() != "all":
df = df.filter(F.col("source_table") == table_name_param)
rows = df.collect()
if not rows:
raise Exception(
f"Nenhuma tabela encontrada na watermark para table_name={table_name_param}"
)
return rows
def read_source_table(row):
source_table_full_name = (
f"`{row.source_catalog}`.`{row.source_schema}`.`{row.source_table}`"
)
last_watermark_value = row.last_watermark_value
lookback_minutes = row.lookback_minutes
watermark_column = row.watermark_column
print("-" * 100)
print(f"Lendo origem.............: {source_table_full_name}")
print(f"watermark_column.........: {watermark_column}")
print(f"last_watermark_value.....: {last_watermark_value}")
print(f"lookback_minutes.........: {lookback_minutes}")
df_source = spark.table(source_table_full_name)
if watermark_column not in df_source.columns:
raise Exception(
f"Coluna de watermark '{watermark_column}' não existe na tabela {source_table_full_name}"
)
watermark_start_expr = F.expr(
f"timestampadd(MINUTE, -{lookback_minutes}, timestamp('{last_watermark_value}'))"
)
df_filtered = (
df_source
.filter(F.col(watermark_column) >= watermark_start_expr)
)
return df_filtered
def add_landing_metadata(df, row):
return (
df
.withColumn("_metadata_ingestion_id", F.lit(ingestion_id))
.withColumn("_metadata_ingestion_at", F.current_timestamp())
.withColumn("_metadata_batch_timestamp", F.lit(batch_timestamp))
.withColumn("_metadata_source_system", F.lit(source_system))
.withColumn("_metadata_source_catalog", F.lit(row.source_catalog))
.withColumn("_metadata_source_schema", F.lit(row.source_schema))
.withColumn("_metadata_source_table", F.lit(row.source_table))
.withColumn("_metadata_watermark_column", F.lit(row.watermark_column))
.withColumn("_metadata_landing_ingestion_date", F.lit(ingestion_date))
)
def write_landing_file(df, row):
target_path = (
f"{landing_volume_base_path}/"
f"{row.source_table}/"
f"ingestion_date={ingestion_date}/"
f"batch_timestamp={batch_timestamp}/"
)
print(f"Gravando Landing em......: {target_path}")
(
df.write
.format("parquet")
.mode("overwrite")
.save(target_path)
)
return target_path
def update_watermark(row, df_landing):
watermark_column = row.watermark_column
max_watermark = (
df_landing
.select(F.max(F.col(watermark_column)).alias("max_watermark"))
.collect()[0]["max_watermark"]
)
if max_watermark is None:
print(f"Nenhum max_watermark encontrado para {row.source_table}. Watermark não será atualizada.")
return
spark.sql(f"""
update {watermark_table}
set
last_watermark_value = timestamp('{max_watermark}'),
last_ingestion_id = '{ingestion_id}',
last_status = 'success',
last_success_at = current_timestamp(),
updated_at = current_timestamp()
where source_system = '{source_system}'
and source_catalog = '{row.source_catalog}'
and source_schema = '{row.source_schema}'
and source_table = '{row.source_table}'
""")
print(f"Watermark atualizada para: {max_watermark}")
# ============================================================
# Execução principal
# ============================================================
tables_to_process = get_tables_to_process()
for row in tables_to_process:
print("=" * 100)
print(f"Processando tabela: {row.source_table}")
try:
df_source = read_source_table(row)
total_rows = df_source.count()
print(f"Registros encontrados....: {total_rows}")
if total_rows == 0:
print(f"Nenhum registro novo para {row.source_table}. Pulando escrita.")
continue
df_landing = add_landing_metadata(df_source, row)
target_path = write_landing_file(df_landing, row)
update_watermark(row, df_landing)
print(f"SUCESSO tabela............: {row.source_table}")
print(f"Arquivo/diretório landing.: {target_path}")
except Exception as e:
print(f"ERRO tabela...............: {row.source_table}")
print(f"Mensagem..................: {str(e)}")
spark.sql(f"""
update {watermark_table}
set
last_ingestion_id = '{ingestion_id}',
last_status = 'failed',
updated_at = current_timestamp()
where source_system = '{source_system}'
and source_catalog = '{row.source_catalog}'
and source_schema = '{row.source_schema}'
and source_table = '{row.source_table}'
""")
raise e
print("=" * 100)
print("FIM DA INGESTÃO SOURCE → LANDING")
print("=" * 100)
6.2 Observação importante sobre arquivos Parquet
No Spark, o comando:
df.write.parquet(path)
não gera um único arquivo chamado, por exemplo:
hoteis_20260509143000.parquet
Ele cria uma pasta com arquivos internos do tipo:
part-00000-....parquet
Por isso, adotamos a estrutura:
/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=yyyy-MM-dd/batch_timestamp=yyyyMMddHHmmss/
Esse é o padrão mais adequado para processamento distribuído em Spark.
Parte 7 — Criar o Job do Asset Bundle
7.1 Criar arquivo do Job
nano resources/jobs/job_ingest_neon_to_landing.yml
Cole:
resources:
jobs:
ingest_neon_to_landing:
name: "ingest_neon_to_landing [${bundle.target}]"
description: "Ingestão incremental Source Neon PostgreSQL para Landing usando watermark"
tasks:
- task_key: ingest_neon_to_landing
notebook_task:
notebook_path: ../../src/landing/ingest_neon_to_landing.py
base_parameters:
target_catalog: ${var.target_catalog}
source_catalog: ${var.source_catalog}
source_schema: ${var.source_schema}
source_system: ${var.source_system}
table_name: all
7.2 Correção importante do caminho do notebook
Como o arquivo do job está em:
resources/jobs/job_ingest_neon_to_landing.yml
e o notebook está em:
src/landing/ingest_neon_to_landing.py
o caminho correto é:
notebook_path: ../../src/landing/ingest_neon_to_landing.py
O caminho abaixo estava incorreto:
notebook_path: ../src/landing/ingest_neon_to_landing.py
Porque faria o Bundle procurar em:
resources/src/landing/ingest_neon_to_landing.py
E esse caminho não existe.
Parte 8 — Validar, implantar e executar o Job
8.1 Validar o Bundle
databricks bundle validate --target dev --profile hotelaria-free
Se der certo, a autenticação e os arquivos YAML estão corretos.
8.2 Fazer deploy
databricks bundle deploy --target dev --profile hotelaria-free
Esse comando publica os arquivos do projeto no workspace Databricks.
8.3 Ver resumo do Bundle
databricks bundle summary --target dev --profile hotelaria-free
Procure pelo job:
ingest_neon_to_landing
Também observe o caminho do workspace publicado, algo como:
/Workspace/Users/<usuario>/.bundle/laboratorio-etl-hotelaria/dev
8.4 Executar o Job
databricks bundle run ingest_neon_to_landing --target dev --profile hotelaria-free
Esse comando dispara o job no Databricks.
Parte 9 — Validações após execução do Job
9.1 Validar watermark
No Databricks SQL Editor:
select
source_table,
watermark_column,
last_watermark_value,
last_status,
last_success_at,
updated_at
from hotelaria_dev.control.ingestion_watermark
order by source_table;
Resultado esperado:
last_status = success
para as tabelas processadas.
9.2 Validar diretórios da Landing
list '/Volumes/hotelaria_dev/landing/neon_postgresql/';
Depois validar uma tabela:
list '/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/';
Depois validar a partição por data:
list '/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/ingestion_date=YYYY-MM-DD/';
Substitua YYYY-MM-DD pela data da execução.
9.3 Ler arquivos da Landing
Em notebook PySpark:
df = spark.read.parquet("/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/")
display(df)
Ou via SQL:
select *
from parquet.`/Volumes/hotelaria_dev/landing/neon_postgresql/hoteis/`
limit 10;
Parte 10 — Resultado alcançado
Ao final deste passo, temos:
Databricks CLI instalada no Linux
Profile hotelaria-free configurado
Projeto local estruturado
databricks.yml criado e validado
Schemas e volume criados no Unity Catalog
Tabela de watermark criada
Tabela de arquivos processados criada
Script de ingestão Source → Landing criado
Job do Asset Bundle criado
Bundle validado
Bundle implantado
Job executado com sucesso
Arquivos Parquet gerados na Landing
Watermark atualizada com sucesso
Fluxo final desta etapa:
neon_hotelaria.public.hoteis
neon_hotelaria.public.hospedes
neon_hotelaria.public.quartos
neon_hotelaria.public.reservas
neon_hotelaria.public.consumos
neon_hotelaria.public.faturas
neon_hotelaria.public.reservas_ota
↓
/Volumes/hotelaria_dev/landing/neon_postgresql/<tabela>/ingestion_date=<data>/batch_timestamp=<timestamp>/
Parte 11 — Decisão arquitetural desta etapa
A decisão adotada foi:
Source:
Neon PostgreSQL refletido no Databricks via Foreign Catalog
Landing:
Arquivos Parquet em Unity Catalog Volume
Controle incremental:
Tabela Delta hotelaria_dev.control.ingestion_watermark
Incremento:
update_date com lookback de 1440 minutos
Retenção:
Histórico físico da Landing por até 15 dias
Orquestração:
Databricks Job gerenciado por Asset Bundle
Deploy:
Databricks CLI via target dev
Essa abordagem mantém a Landing simples, auditável, reprocessável e aderente a um cenário real de engenharia de dados.
O próximo passo será construir a carga Landing → Bronze, mantendo a Bronze como append-only, sem merge, sem update e sem delete.

![5 _ [LAB - Hotelaria] Construção da Camada Bronze Append-Only com Databricks Asset Bundles](https://static.wixstatic.com/media/430b63_5f763c5f8c4c43cdb3871366ca98f195~mv2.png/v1/fill/w_980,h_980,al_c,q_90,usm_0.66_1.00_0.01,enc_avif,quality_auto/430b63_5f763c5f8c4c43cdb3871366ca98f195~mv2.png)
![4 _ [LAB - Hotelaria] Ajuste de Fundação: Contratos, CI/CD e Padronização antes da Bronze](https://static.wixstatic.com/media/430b63_f69a815994ad4c4188695cc393807f09~mv2.png/v1/fill/w_980,h_1225,al_c,q_90,usm_0.66_1.00_0.01,enc_avif,quality_auto/430b63_f69a815994ad4c4188695cc393807f09~mv2.png)
![2 _ [LAB - Hotelaria] Integração do Neon PostgreSQL com Databricks Free Edition](https://static.wixstatic.com/media/430b63_0f94c120084448168f97c7ded696ce8a~mv2.png/v1/fill/w_980,h_653,al_c,q_90,usm_0.66_1.00_0.01,enc_avif,quality_auto/430b63_0f94c120084448168f97c7ded696ce8a~mv2.png)
Comentários