Skip to main content

Ingestão no Fabric

Este guia demonstra como realizar a ingestão de dados no Microsoft Fabric utilizando exemplos práticos. São apresentadas quatro abordagens disponíveis na plataforma.


Quando usar cada abordagem?

Copy JobCópia simples e rápida, uma origem → um destino
Tabela únicaRápido
Data PipelineOrquestração de múltiplas tabelas e etapas
SQL ServerOracleMySQL
Dataflow Gen2Ingestão com transformações visuais (Power Query)
ExcelSharePointCSV
Notebook PySparkLógica complexa, APIs, transformações customizadas
APIs RESTPaginaçãoOAuth

Matriz de Decisão

BANCO DE DADOS
Copy Job1 tabela, sem orquestração
PipelineMúltiplas tabelas, sequência
ARQUIVO / API
DataflowExcel, CSV, SharePoint
NotebookAPI REST, lógica complexa

CritérioCopy JobPipelineDataflow Gen2Notebook
ComplexidadeMuito baixaMédiaBaixaAlta
OrquestraçãoNãoSimNãoNão
TransformaçãoNãoBásicaVisual (Power Query)Código
Gateway On-premises✅ Sim✅ Sim✅ Sim❌ Não
AgendamentoBásicoAvançadoBásicoVia Pipeline
Melhor paraCópia rápida de 1 tabelaETL de múltiplas tabelasExcel/SharePointAPIs e lógica customizada

Data Pipeline (Copy Activity)

Ideal para: Orquestração de múltiplas tabelas, cópia com dependências e agendamento avançado.

Cenário de Exemplo

Objetivo:Ingerir a tabela SB1 (Produtos) do Protheus para a camada Bronze
ItemValor
FonteTOTVS Protheus (SQL Server)
Tabela OrigemSB1010
Destinolh_corp_bronze
Tabela Destinotb_raw_protheus_sb1

Passo 1 — Criar o Pipeline

  1. Acesse o Workspace qan_br_td_data_dev
  2. Clique em + NovoPipeline de dados
  3. Nomeie como pl_bronze_protheus_sb1

Passo 2 — Adicionar Copy Activity

  1. Na aba Início, arraste a atividade Copiar dados para o canvas
  2. Renomeie para Ingestao SB1 Protheus

Passo 3 — Aba Geral

Configurações básicas de identificação e comportamento da atividade.

Nome

Identificador único da atividade dentro do pipeline. Use nomes descritivos que indiquem a origem e o propósito.

RecomendaçãoExemplo
PadrãoIngestao [TABELA] [FONTE]
AplicadoIngestao SB1 Protheus

Tempo limite

Define o tempo máximo de execução da atividade antes de ser cancelada automaticamente. Formato: D.HH:MM:SS

Volume de dadosTempo recomendado
Pequeno (< 100 mil registros)0.01:00:00 (1 hora)
Médio (100 mil - 1 milhão)0.04:00:00 (4 horas)
Grande (> 1 milhão)0.12:00:00 (12 horas)
Como calcular?

Monitore as primeiras execuções e defina o timeout como 2x o tempo médio de execução. Isso dá margem para variações sem desperdiçar recursos.

Tentar novamente

Número de tentativas automáticas em caso de falha. O Fabric aguarda o Intervalo de repetição entre cada tentativa.

CenárioValor recomendadoJustificativa
Fonte estável (banco interno)0 ou 1Falhas são raras e geralmente persistentes
Fonte instável (API externa)2 a 3Falhas podem ser transientes (timeout, rede)
Carga crítica3Maximiza chances de sucesso

Intervalo de repetição (segundos)

Tempo de espera entre tentativas. Valores maiores evitam sobrecarregar a fonte em caso de instabilidade.

CenárioValor recomendado
Banco de dados30 segundos
API com rate limit60 a 120 segundos

Passo 4 — Aba Origem

Configurações de conexão e extração dos dados da fonte.

Conexão

Selecione uma conexão existente ou crie uma nova. A conexão armazena as credenciais de acesso ao banco de dados de forma segura.

Usar consulta

Define como os dados serão extraídos da fonte.

OpçãoQuando usarExemplo
TabelaCarga completa de uma tabela sem filtrosExtrai todos os registros da SB1010
ConsultaPrecisa de filtros, joins ou colunas específicasSELECT * FROM SB1010 WHERE R_E_C_D_E_L_ = 0
Procedimento armazenadoLógica complexa já existe no bancoEXEC sp_extrai_produtos
Boas Práticas na Query
  • Filtre registros deletados: WHERE R_E_C_D_E_L_ = 0 (padrão Protheus)
  • Selecione apenas colunas necessárias: Reduz volume e tempo de transferência
  • Adicione filtros de data para incremental: WHERE D_E_L_E_T_ = '' AND R_E_C_N_O_ > @ultimo_id

Passo 5 — Aba Destino

Configurações de onde e como os dados serão gravados.

Conexão

Selecione o Lakehouse de destino. O Lakehouse deve estar no mesmo Workspace ou em um Workspace com permissões configuradas.

Pasta raiz

Define o tipo de armazenamento no Lakehouse.

OpçãoQuando usarResultado
TabelasDados estruturados para análiseCria tabela Delta gerenciada (recomendado)
ArquivosStaging de arquivos brutosGrava em Files/landing/ como Parquet/CSV

Tabela

Nome da tabela de destino. Siga o padrão de nomenclatura da camada Bronze: tb_raw_[fonte]_[tabela]

Ação de tabela

Define como os dados serão inseridos na tabela de destino.

AçãoComportamentoQuando usar
AcrescentarAdiciona registros ao final da tabela existenteCarga incremental diária — novos registros são adicionados sem afetar os existentes
SubstituirApaga todos os registros e recarrega do zeroCarga full — tabelas pequenas ou quando a fonte não suporta incremental
Executar upsertAtualiza registros existentes e insere novos (requer chave)Sincronização — quando registros na fonte podem ser alterados
Cuidado com Substituir

A opção Substituir apaga todos os dados antes de carregar. Se a carga falhar no meio, você perde os dados anteriores. Para tabelas críticas, prefira Acrescentar com controle de duplicatas no Silver.

Aplicar V-Order

Otimização de armazenamento que melhora a performance de leitura no Power BI e queries SQL.

OpçãoRecomendação
Habilitado✅ Sempre habilite para tabelas que serão consumidas por relatórios
DesabilitadoApenas para staging temporário que será reprocessado

Habilitar partições

Divide a tabela em partições físicas para melhorar performance em tabelas grandes.

VolumeParticionamento
< 1 milhão de registros❌ Não necessário
> 1 milhão de registros✅ Particionar por data (ano, mes)

Passo 6 — Aba Mapeamento

Configurações de conversão de tipos entre origem e destino.

Permitir truncamento de dados

Permite que valores maiores que o tamanho do campo destino sejam truncados automaticamente.

OpçãoQuando usar
HabilitadoQuando prefere perder dados parciais a falhar a carga
DesabilitadoQuando integridade total é crítica (campos financeiros, códigos)
Recomendação

Habilite para campos texto descritivos. Desabilite para campos-chave como códigos e valores numéricos.

Importar esquemas

Clique para mapear automaticamente as colunas origem → destino. Revise o mapeamento para garantir compatibilidade de tipos.


Passo 7 — Aba Configurações

Configurações de performance e comportamento de execução.

Otimização de taxa de transferência inteligente

Define como o Fabric otimiza a velocidade de cópia.

OpçãoDescrição
AutomáticoFabric ajusta dinamicamente (recomendado)
Valor personalizadoDefine DIUs fixas (Data Integration Units)

Grau de paralelismo de cópia

Número de conexões paralelas para leitura da fonte.

OpçãoQuando usar
AutomáticoFabric determina baseado na fonte (recomendado)
Valor fixoQuando a fonte tem limitações de conexões simultâneas

Tolerância a falhas

Define o comportamento quando encontra erros em registros individuais.

OpçãoComportamentoQuando usar
(vazio)Falha no primeiro erroDados críticos — prefere não carregar nada a carregar incompleto
Ignorar linhas incompatíveisPula registros com erroDados tolerantes — aceita perda parcial

Habilitar o registro em log

Grava log detalhado de linhas ignoradas ou com erro.

OpçãoRecomendação
HabilitadoQuando usa tolerância a falhas — permite auditar o que foi ignorado
DesabilitadoQuando não há tolerância a falhas

Passo 8 — Validar e Executar

  1. Clique em Validar para verificar configurações
  2. Clique em Executar para testar manualmente
  3. Acompanhe o status na aba Saída

Resultado

Tabela criada com sucesso

lh_corp_bronze.dbo.tb_raw_protheus_sb1


Dataflow Gen2

Ideal para: Fontes que precisam de tratamento visual (Excel, CSV, SharePoint) ou transformações leves.

Cenário de Exemplo

Objetivo:Ingerir a planilha de cotação do dólar (BCB) para a camada Bronze
ItemValor
FontePlanilha Excel do Banco Central
Arquivobcdata.csv
Série10813 - Taxa de câmbio - Livre - Dólar americano (compra)
Destinolh_corp_bronze
Tabela Destinotb_raw_bcb_dollar_rate

Sobre a Planilha

A planilha bcdata.csv contém a série histórica da taxa de câmbio do dólar (compra) disponibilizada pelo Banco Central do Brasil através do Sistema Gerenciador de Séries Temporais (SGS).

Estrutura do arquivo:

ColunaTipoDescrição
DataDataData da cotação (DD/MM/AAAA)
ValorNúmeroTaxa de câmbio em R$/US$

Passo 1 — Baixar o Arquivo

Download do arquivo de exemplo:

bcdata.csv


Passo 2 — Fazer Upload para o Lakehouse

  1. Acesse o Workspace qan_br_td_data_dev
  2. Abra o Lakehouse lh_corp_bronze
  3. No painel esquerdo, clique em Files
  4. Navegue ou crie a pasta landing/bcb/
  5. Clique em UploadUpload files
  6. Selecione o arquivo bcdata.csv

Caminho final do arquivo:

lh_corp_bronze/Files/landing/bcb/bcdata.csv


Passo 3 — Criar o Dataflow

  1. Acesse o Workspace qan_br_td_data_dev
  2. Clique em + NovoDataflow Gen2
  3. Nomeie como df_bronze_bcb_dollar_rate

Passo 4 — Conectar à Fonte (Lakehouse Files)

  1. Clique em Obter dados
  2. Selecione Lakehouse
  3. Selecione o Lakehouse lh_corp_bronze
  4. Navegue até Fileslandingbcb
  5. Selecione o arquivo bcdata.csv
  6. Escolha a planilha que contém os dados

Passo 5 — Aplicar Transformações

No editor Power Query, aplique as seguintes transformações:

5.1 — Promover Cabeçalhos

Se a primeira linha contém os nomes das colunas:

  1. Clique em TransformarUsar a primeira linha como cabeçalho

5.2 — Renomear Colunas

Padronize os nomes das colunas para snake_case em inglês:

Coluna OriginalNovo Nome
Datadt_rate
Valorvl_rate
  1. Clique com botão direito na coluna → Renomear

5.3 — Alterar Tipos

Garanta que os tipos estão corretos:

ColunaTipo
dt_rateData
vl_rateNúmero Decimal
  1. Clique no ícone de tipo na coluna → Selecione o tipo correto

5.4 — Filtrar Linhas Vazias

Remova linhas onde a data está vazia:

  1. Clique na seta da coluna dt_rate
  2. Desmarque (nulo) e (vazio)

5.5 — Adicionar Colunas de Metadados

Adicione informações de controle:

  1. Clique em Adicionar ColunaColuna Personalizada
  2. Adicione as colunas:
NomeFórmula
ds_source"BCB_SGS_10813"
ds_currency_from"USD"
ds_currency_to"BRL"
dt_loadDateTime.LocalNow()

Passo 6 — Configurar Destino

  1. Clique em Adicionar destino de dadosLakehouse
  2. Selecione o Lakehouse lh_corp_bronze
  3. Defina o nome da tabela: tb_raw_bcb_dollar_rate
  4. Método de atualização:
    • Substituir: Para recarregar todo o histórico
    • Acrescentar: Para adicionar apenas novos registros

Passo 7 — Publicar e Executar

  1. Clique em Publicar
  2. Aguarde a publicação finalizar
  3. No Workspace, localize o Dataflow df_bronze_bcb_dollar_rate
  4. Clique em Atualizar agora
  5. Acompanhe o status de execução

Passo 8 — Otimizar o Código M (Recomendado)

O Dataflow gera código M automaticamente conforme você aplica transformações na interface. No entanto, esse código pode ser otimizado para melhor performance e legibilidade.

Acessar o Editor Avançado

  1. No editor do Dataflow, clique em ExibirEditor Avançado
  2. O código M completo será exibido

Código Original (Gerado pelo Dataflow)

let
Origem = Lakehouse.Contents(null),
Navegação = Origem{[workspaceId = "c3fc2dea-8363-44b6-8bc4-446d0be8c18e"]}[Data],
#"Navegação 1" = Navegação{[lakehouseId = "87b01831-605a-4b18-8bfa-24d0aed7d89d"]}[Data],
#"Navegação 2" = #"Navegação 1"{[Id = "Files", ItemKind = "Folder"]}[Data],
#"Navegação 3" = #"Navegação 2"{[Name = "landing"]}[Content],
#"Navegação 4" = #"Navegação 3"{[Name = "bcb"]}[Content],
#"Navegação 5" = #"Navegação 4"{[Name = "bcdata.csv"]}[Content],
#"CSV Importado" = Csv.Document(#"Navegação 5", [Delimiter = ";", Columns = 2, QuoteStyle = QuoteStyle.None]),
#"Cabeçalhos promovidos" = Table.PromoteHeaders(#"CSV Importado", [PromoteAllScalars = true]),
#"Tipo de coluna alterado" = Table.TransformColumnTypes(#"Cabeçalhos promovidos", {{"data", type date}, {"valor", type number}}),
#"Colunas renomeadas" = Table.RenameColumns(#"Tipo de coluna alterado", {{"data", "dt_rate"}, {"valor", "vl_rate"}}),
#"Linhas filtradas" = Table.SelectRows(#"Colunas renomeadas", each [dt_rate] <> null and [dt_rate] <> ""),
#"Personalização adicionada" = Table.TransformColumnTypes(Table.AddColumn(#"Linhas filtradas", "ds_source", each "BCB_SGS_10813"), {{"ds_source", type text}}),
#"Personalização adicionada 1" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada", "ds_currency_from", each "USD"), {{"ds_currency_from", type text}}),
#"Personalização adicionada 2" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada 1", "ds_currency_to", each "BRL"), {{"ds_currency_to", type text}}),
#"Personalização adicionada 3" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada 2", "dt_load", each DateTime.LocalNow()), {{"dt_load", type datetime}})
in
#"Personalização adicionada 3"

Problemas Identificados

ProblemaImpacto
Múltiplas chamadas de TransformColumnTypesCada chamada processa toda a tabela novamente
Navegação em muitos passos separadosCódigo verboso e difícil de manter
Nomes genéricos (Navegação 1, Personalização adicionada 1)Dificulta entendimento e debug
Tipagem após cada AddColumnProcessamento redundante

Código Otimizado

let
// =====================================================
// ORIGEM DOS DADOS
// =====================================================
Origem = Lakehouse.Contents(null),
Navegacao = Origem{[workspaceId = "c3fc2dea-8363-44b6-8bc4-446d0be8c18e"]}[Data],
Lakehouse = Navegacao{[lakehouseId = "87b01831-605a-4b18-8bfa-24d0aed7d89d"]}[Data],

// Navegação encadeada até o arquivo
Arquivo = Lakehouse
{[Id = "Files", ItemKind = "Folder"]}[Data]
{[Name = "landing"]}[Content]
{[Name = "bcb"]}[Content]
{[Name = "bcdata.csv"]}[Content],

// =====================================================
// IMPORTAÇÃO E LIMPEZA
// =====================================================
CSV = Csv.Document(Arquivo, [Delimiter = ";", Columns = 2, QuoteStyle = QuoteStyle.None]),
Cabecalhos = Table.PromoteHeaders(CSV, [PromoteAllScalars = true]),
Renomeado = Table.RenameColumns(Cabecalhos, {
{"data", "dt_rate"},
{"valor", "vl_rate"}
}),
Filtrado = Table.SelectRows(Renomeado, each [dt_rate] <> null and [dt_rate] <> ""),

// =====================================================
// METADADOS DE CONTROLE
// =====================================================
Meta_Source = Table.AddColumn(Filtrado, "ds_source", each "BCB_SGS_10813"),
Meta_CurrencyFrom = Table.AddColumn(Meta_Source, "ds_currency_from", each "USD"),
Meta_CurrencyTo = Table.AddColumn(Meta_CurrencyFrom, "ds_currency_to", each "BRL"),
Meta_Load = Table.AddColumn(Meta_CurrencyTo, "dt_load", each DateTime.LocalNow()),

// =====================================================
// TIPAGEM FINAL (única chamada)
// =====================================================
TiposDefinidos = Table.TransformColumnTypes(Meta_Load, {
{"dt_rate", type date},
{"vl_rate", type number},
{"ds_source", type text},
{"ds_currency_from", type text},
{"ds_currency_to", type text},
{"dt_load", type datetime}
})
in
TiposDefinidos

Benefícios da Otimização

AspectoAntesDepoisMelhoria
Passos1510-33%
Chamadas de TransformColumnTypes51-80%
LegibilidadeNomes genéricosNomes descritivosFacilita manutenção
ComentáriosNenhumSeções organizadasDocumentação inline
PerformanceTipagem repetidaTipagem única no finalMenos processamento

Por que tipar apenas no final?

Cada chamada de Table.TransformColumnTypes percorre todas as linhas da tabela para converter os tipos. Quando você chama essa função 5 vezes (uma para cada coluna), está processando a tabela 5 vezes.

Ao consolidar em uma única chamada no final, a tabela é processada apenas uma vez, reduzindo significativamente o tempo de execução em tabelas com muitos registros.

Boas Práticas de Código M

  1. Nomes descritivos: Use nomes que indiquem a ação (Filtrado, Renomeado, Meta_Source)
  2. Comentários por seção: Divida o código em blocos lógicos com comentários
  3. Tipagem consolidada: Defina todos os tipos em uma única chamada no final
  4. Navegação encadeada: Combine navegações simples em uma única expressão
  5. Consistência: Mantenha o mesmo padrão em todos os Dataflows do projeto
Dica

Você pode usar ferramentas de IA (como o Copilot ou Claude) para otimizar o código M gerado automaticamente pelo Dataflow. Basta copiar o código do Editor Avançado e solicitar a otimização.


Resultado

Tabela criada com sucesso

lh_corp_bronze.files.bcb.tb_raw_bcb_dollar_rate

Estrutura Final da Tabela

ColunaTipoExemplo
dt_rateDATE2025-01-03
vl_rateDECIMAL6.0150
ds_sourceSTRINGBCB_SGS_10813
ds_currency_fromSTRINGUSD
ds_currency_toSTRINGBRL
dt_loadTIMESTAMP2025-01-06 08:30:00

Opção 4: Notebook PySpark

Ideal para: APIs REST, lógica complexa, paginação, autenticação OAuth, ou quando você precisa de controle total.


Limitação: Servidores On-Premises

Importante

O Notebook PySpark roda na infraestrutura de nuvem do Microsoft Fabric. Diferente do Pipeline e Dataflow que utilizam o Data Gateway para acessar servidores on-premises, o Notebook não possui integração nativa com Gateway.

ArtefatoAcessa On-Premises via Gateway?
Copy Job✅ Sim
Data Pipeline✅ Sim
Dataflow Gen2✅ Sim
Notebook PySpark❌ Não

Cenários de Uso do Notebook para Ingestão

CenárioFunciona?Solução
Banco de dados na nuvem (Azure SQL, AWS RDS)✅ SimConexão JDBC direta
APIs REST públicas✅ SimBiblioteca requests
APIs REST com OAuth✅ SimBiblioteca requests + autenticação
Banco de dados on-premises❌ NãoUsar Copy Job/Pipeline/Dataflow

Cenário de Exemplo: Cotação do Dólar (API Banco Central)

Objetivo:Ingerir a cotação do dólar em tempo real via API do Banco Central
ItemValor
APIPTAX - Banco Central do Brasil
URL Basehttps://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata
AutenticaçãoNão requer
Destinolh_corp_bronze
Tabela Destinotb_raw_api_dollar_rate

Passo 1 — Criar o Notebook

  1. Acesse o Workspace qan_br_td_data_dev
  2. Clique em + NovoNotebook
  3. Nomeie como nb_bronze_dollar_rate
  4. Anexe ao Lakehouse lh_corp_bronze

Passo 2 — Importar Bibliotecas

# Célula 1: Importar bibliotecas
import requests
import json
from datetime import datetime, timedelta

Passo 3 — Criar Função de Busca com Fallback

A API do Banco Central só retorna cotações em dias úteis. Esta função busca a cotação de hoje e, se não houver (fim de semana/feriado), busca automaticamente o último dia útil.

# Célula 2: Função para buscar cotação com fallback
def buscar_cotacao_dolar():
"""
Busca cotação do dólar do dia atual.
Se não houver (fim de semana/feriado), busca o último dia útil.
"""

# Tentar data de hoje
data_consulta = datetime.now().strftime("%m-%d-%Y")
url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarDia(dataCotacao=@dataCotacao)?@dataCotacao='{data_consulta}'&$format=json"

response = requests.get(url)
data = response.json()

# Se retornou dados, usa eles
if data['value']:
print(f"Cotacao encontrada para hoje: {data_consulta}")
return data['value']

# Se nao retornou, busca ultimos 7 dias e pega o mais recente
print(f"Sem cotacao para hoje ({data_consulta}). Buscando ultimo dia util...")

data_fim = datetime.now()
data_inicio = data_fim - timedelta(days=7)

data_inicio_str = data_inicio.strftime("%m-%d-%Y")
data_fim_str = data_fim.strftime("%m-%d-%Y")

url_periodo = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial='{data_inicio_str}'&@dataFinalCotacao='{data_fim_str}'&$format=json&$orderby=dataHoraCotacao%20desc&$top=1"

response_periodo = requests.get(url_periodo)
data_periodo = response_periodo.json()

if data_periodo['value']:
ultima_data = data_periodo['value'][0]['dataHoraCotacao']
print(f"Ultima cotacao disponivel: {ultima_data}")
return data_periodo['value']

print("Nenhuma cotacao encontrada nos ultimos 7 dias.")
return []

Lógica de fallback:

┌─────────────────────────────────────────────────────────────────────────────┐
│ LÓGICA DE BUSCA │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Tenta buscar cotação de HOJE │
│ │ │
│ ├── Se encontrar: Retorna os dados │
│ │ │
│ └── Se vazio (fim de semana/feriado): │
│ │ │
│ ▼ │
│ 2. Busca últimos 7 dias, ordenado por data DESC, LIMIT 1 │
│ │ │
│ └── Retorna a cotação mais recente disponível │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Passo 4 — Executar Busca

# Célula 3: Executar busca
cotacoes = buscar_cotacao_dolar()

if cotacoes:
print(f"\nRegistros retornados: {len(cotacoes)}")
print(json.dumps(cotacoes, indent=2))
else:
print("Nenhum dado retornado.")

Retorno esperado:

[
{
"cotacaoCompra": 6.0150,
"cotacaoVenda": 6.0180,
"dataHoraCotacao": "2025-01-03 13:09:08.512"
}
]

Passo 5 — Converter para DataFrame

# Célula 4: Converter para DataFrame
if cotacoes:
dados_cotacao = []

for cotacao in cotacoes:
dados_cotacao.append({
"ds_currency_from": "USD",
"ds_currency_to": "BRL",
"vl_rate_buy": float(cotacao["cotacaoCompra"]),
"vl_rate_sell": float(cotacao["cotacaoVenda"]),
"dt_rate": cotacao["dataHoraCotacao"],
"ds_bulletin_type": cotacao.get("tipoBoletim", "N/A")
})

df_cotacao = spark.createDataFrame(dados_cotacao)
print(f"Total de registros: {df_cotacao.count()}")
df_cotacao.show(truncate=False)
else:
print("Sem dados para processar.")
Nota sobre ds_bulletin_type

O campo tipoBoletim existe apenas no endpoint de período. Usamos .get("tipoBoletim", "N/A") para evitar erro quando o campo não existir.


Passo 6 — Adicionar Metadados e Gravar

# Célula 5: Adicionar metadados e gravar
from pyspark.sql.functions import current_timestamp, lit, from_utc_timestamp

df_final = df_cotacao \
.withColumn("dt_load", from_utc_timestamp(current_timestamp(), "America/Sao_Paulo")) \
.withColumn("ds_source", lit("API_BCB_PTAX"))

df_final.write \
.format("delta") \
.mode("append") \
.saveAsTable("lh_corp_bronze.tb_raw_api_dollar_rate")

print("Ingestao concluida com sucesso!")

Passo 7 — Validar

# Célula 6: Validação
spark.sql("""
SELECT
ds_currency_from,
ds_currency_to,
vl_rate_buy,
vl_rate_sell,
dt_rate,
dt_load
FROM lh_corp_bronze.tb_raw_api_dollar_rate
ORDER BY dt_rate DESC
LIMIT 10
""").show(truncate=False)

Resultado

Tabela criada com sucesso

lh_corp_bronze.dbo.tb_raw_api_dollar_rate


Transformações na Camada Bronze

Na arquitetura Medallion, a camada Bronze armazena dados "brutos". Porém, isso não significa que nenhuma transformação pode ser feita. Existe uma distinção importante entre transformações técnicas e transformações de negócio.

O que é permitido na Bronze?

🥉Bronze
Transformações Técnicas
  • Padronização de nomes de colunas
  • Definição de tipos de dados
  • Adição de metadados (dt_load, ds_source)
  • Remoção de linhas completamente vazias
  • Conversão de encoding
🥈Silver
Transformações de Negócio
  • Regras de negócio e validações
  • Joins entre tabelas
  • Deduplicação inteligente
  • Cálculos derivados
  • Tratamento de valores inválidos
🥇Gold
Transformações Analíticas
  • Agregações e sumarizações
  • KPIs e métricas de negócio
  • Modelagem dimensional
  • Dados prontos para relatórios
  • Visões específicas por área

Comparativo: Técnico vs Negócio

TipoCamadaExemploJustificativa
TécnicoBronzeRenomear datadt_ratePadronização de nomenclatura, não altera o dado
TécnicoBronzeAdicionar dt_loadMetadado de rastreabilidade, não é dado de negócio
TécnicoBronzeDefinir tipo DATEGarantir consistência, não altera o valor
TécnicoBronzeRemover linhas 100% vaziasLimpeza estrutural, não é regra de negócio
NegócioSilverFiltrar status = 'ATIVO'Regra de negócio que exclui dados
NegócioSilverCalcular vl_total = qtd * precoCriação de dado derivado
NegócioSilverJoin cliente + pedidoRelacionamento entre entidades
AnalíticoGoldSUM(vl_total) por mêsAgregação para análise

Por que fazemos isso na Bronze?

┌─────────────────────────────────────────────────────────────────────────────┐
│ PRINCÍPIO: Dado bruto ≠ Dado bagunçado │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ A Bronze preserva a FIDELIDADE dos dados originais. │
│ Transformações técnicas não alteram o CONTEÚDO, apenas a ESTRUTURA. │
│ │
│ ✅ PERMITIDO NA BRONZE: │
│ • Renomear colunas (padronização) │
│ • Definir tipos de dados │
│ • Adicionar metadados de controle │
│ • Remover linhas completamente vazias │
│ │
│ ❌ NÃO PERMITIDO NA BRONZE: │
│ • Filtrar dados por regras de negócio │
│ • Fazer cálculos ou derivações │
│ • Aplicar joins entre tabelas │
│ • Remover duplicatas (exceto técnicas) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Exemplo Prático: Cotação do Dólar

Veja como as transformações se distribuem entre as camadas:

CamadaTransformaçãoDado OriginalDado Transformado
BronzeRenomear colunadatadt_rate
BronzeAdicionar metadado(não existe)ds_source = "BCB_SGS_10813"
BronzeDefinir tipo"6.0150" (texto)6.0150 (decimal)
SilverValidar range6.01506.0150 (válido) ou NULL (inválido)
SilverCalcular variação6.0150vl_variacao_dia = 0.0023
GoldAgregar por mês6.0150vl_media_mensal = 5.89
Regra de Ouro

Se a transformação altera o significado ou exclui dados de negócio, ela pertence à Silver. Se apenas organiza ou adiciona contexto técnico, pode ficar na Bronze.


Dúvidas, Sugestões ou Problemas?

Entre em contato com o Time de Transformação Digital (TD) ou o Comitê de Dados.