Ingestão no Fabric
Este guia demonstra como realizar a ingestão de dados no Microsoft Fabric utilizando exemplos práticos. São apresentadas quatro abordagens disponíveis na plataforma.
Quando usar cada abordagem?
Matriz de Decisão
| Critério | Copy Job | Pipeline | Dataflow Gen2 | Notebook |
|---|---|---|---|---|
| Complexidade | Muito baixa | Média | Baixa | Alta |
| Orquestração | Não | Sim | Não | Não |
| Transformação | Não | Básica | Visual (Power Query) | Código |
| Gateway On-premises | ✅ Sim | ✅ Sim | ✅ Sim | ❌ Não |
| Agendamento | Básico | Avançado | Básico | Via Pipeline |
| Melhor para | Cópia rápida de 1 tabela | ETL de múltiplas tabelas | Excel/SharePoint | APIs e lógica customizada |
Data Pipeline (Copy Activity)
Ideal para: Orquestração de múltiplas tabelas, cópia com dependências e agendamento avançado.
Cenário de Exemplo
| Item | Valor |
|---|---|
| Fonte | TOTVS Protheus (SQL Server) |
| Tabela Origem | SB1010 |
| Destino | lh_corp_bronze |
| Tabela Destino | tb_raw_protheus_sb1 |
Passo 1 — Criar o Pipeline
- Acesse o Workspace
qan_br_td_data_dev - Clique em + Novo → Pipeline de dados
- Nomeie como
pl_bronze_protheus_sb1
Passo 2 — Adicionar Copy Activity
- Na aba Início, arraste a atividade Copiar dados para o canvas
- Renomeie para
Ingestao SB1 Protheus
Passo 3 — Aba Geral
Configurações básicas de identificação e comportamento da atividade.
Nome
Identificador único da atividade dentro do pipeline. Use nomes descritivos que indiquem a origem e o propósito.
| Recomendação | Exemplo |
|---|---|
| Padrão | Ingestao [TABELA] [FONTE] |
| Aplicado | Ingestao SB1 Protheus |
Tempo limite
Define o tempo máximo de execução da atividade antes de ser cancelada automaticamente. Formato: D.HH:MM:SS
| Volume de dados | Tempo recomendado |
|---|---|
| Pequeno (< 100 mil registros) | 0.01:00:00 (1 hora) |
| Médio (100 mil - 1 milhão) | 0.04:00:00 (4 horas) |
| Grande (> 1 milhão) | 0.12:00:00 (12 horas) |
Monitore as primeiras execuções e defina o timeout como 2x o tempo médio de execução. Isso dá margem para variações sem desperdiçar recursos.
Tentar novamente
Número de tentativas automáticas em caso de falha. O Fabric aguarda o Intervalo de repetição entre cada tentativa.
| Cenário | Valor recomendado | Justificativa |
|---|---|---|
| Fonte estável (banco interno) | 0 ou 1 | Falhas são raras e geralmente persistentes |
| Fonte instável (API externa) | 2 a 3 | Falhas podem ser transientes (timeout, rede) |
| Carga crítica | 3 | Maximiza chances de sucesso |
Intervalo de repetição (segundos)
Tempo de espera entre tentativas. Valores maiores evitam sobrecarregar a fonte em caso de instabilidade.
| Cenário | Valor recomendado |
|---|---|
| Banco de dados | 30 segundos |
| API com rate limit | 60 a 120 segundos |
Passo 4 — Aba Origem
Configurações de conexão e extração dos dados da fonte.
Conexão
Selecione uma conexão existente ou crie uma nova. A conexão armazena as credenciais de acesso ao banco de dados de forma segura.
Usar consulta
Define como os dados serão extraídos da fonte.
| Opção | Quando usar | Exemplo |
|---|---|---|
| Tabela | Carga completa de uma tabela sem filtros | Extrai todos os registros da SB1010 |
| Consulta | Precisa de filtros, joins ou colunas específicas | SELECT * FROM SB1010 WHERE R_E_C_D_E_L_ = 0 |
| Procedimento armazenado | Lógica complexa já existe no banco | EXEC sp_extrai_produtos |
- Filtre registros deletados:
WHERE R_E_C_D_E_L_ = 0(padrão Protheus) - Selecione apenas colunas necessárias: Reduz volume e tempo de transferência
- Adicione filtros de data para incremental:
WHERE D_E_L_E_T_ = '' AND R_E_C_N_O_ > @ultimo_id
Passo 5 — Aba Destino
Configurações de onde e como os dados serão gravados.
Conexão
Selecione o Lakehouse de destino. O Lakehouse deve estar no mesmo Workspace ou em um Workspace com permissões configuradas.
Pasta raiz
Define o tipo de armazenamento no Lakehouse.
| Opção | Quando usar | Resultado |
|---|---|---|
| Tabelas | Dados estruturados para análise | Cria tabela Delta gerenciada (recomendado) |
| Arquivos | Staging de arquivos brutos | Grava em Files/landing/ como Parquet/CSV |
Tabela
Nome da tabela de destino. Siga o padrão de nomenclatura da camada Bronze: tb_raw_[fonte]_[tabela]
Ação de tabela
Define como os dados serão inseridos na tabela de destino.
| Ação | Comportamento | Quando usar |
|---|---|---|
| Acrescentar | Adiciona registros ao final da tabela existente | Carga incremental diária — novos registros são adicionados sem afetar os existentes |
| Substituir | Apaga todos os registros e recarrega do zero | Carga full — tabelas pequenas ou quando a fonte não suporta incremental |
| Executar upsert | Atualiza registros existentes e insere novos (requer chave) | Sincronização — quando registros na fonte podem ser alterados |
A opção Substituir apaga todos os dados antes de carregar. Se a carga falhar no meio, você perde os dados anteriores. Para tabelas críticas, prefira Acrescentar com controle de duplicatas no Silver.
Aplicar V-Order
Otimização de armazenamento que melhora a performance de leitura no Power BI e queries SQL.
| Opção | Recomendação |
|---|---|
| Habilitado | ✅ Sempre habilite para tabelas que serão consumidas por relatórios |
| Desabilitado | Apenas para staging temporário que será reprocessado |
Habilitar partições
Divide a tabela em partições físicas para melhorar performance em tabelas grandes.
| Volume | Particionamento |
|---|---|
| < 1 milhão de registros | ❌ Não necessário |
| > 1 milhão de registros | ✅ Particionar por data (ano, mes) |
Passo 6 — Aba Mapeamento
Configurações de conversão de tipos entre origem e destino.
Permitir truncamento de dados
Permite que valores maiores que o tamanho do campo destino sejam truncados automaticamente.
| Opção | Quando usar |
|---|---|
| Habilitado | Quando prefere perder dados parciais a falhar a carga |
| Desabilitado | Quando integridade total é crítica (campos financeiros, códigos) |
Habilite para campos texto descritivos. Desabilite para campos-chave como códigos e valores numéricos.
Importar esquemas
Clique para mapear automaticamente as colunas origem → destino. Revise o mapeamento para garantir compatibilidade de tipos.
Passo 7 — Aba Configurações
Configurações de performance e comportamento de execução.
Otimização de taxa de transferência inteligente
Define como o Fabric otimiza a velocidade de cópia.
| Opção | Descrição |
|---|---|
| Automático | Fabric ajusta dinamicamente (recomendado) |
| Valor personalizado | Define DIUs fixas (Data Integration Units) |
Grau de paralelismo de cópia
Número de conexões paralelas para leitura da fonte.
| Opção | Quando usar |
|---|---|
| Automático | Fabric determina baseado na fonte (recomendado) |
| Valor fixo | Quando a fonte tem limitações de conexões simultâneas |
Tolerância a falhas
Define o comportamento quando encontra erros em registros individuais.
| Opção | Comportamento | Quando usar |
|---|---|---|
| (vazio) | Falha no primeiro erro | Dados críticos — prefere não carregar nada a carregar incompleto |
| Ignorar linhas incompatíveis | Pula registros com erro | Dados tolerantes — aceita perda parcial |
Habilitar o registro em log
Grava log detalhado de linhas ignoradas ou com erro.
| Opção | Recomendação |
|---|---|
| Habilitado | Quando usa tolerância a falhas — permite auditar o que foi ignorado |
| Desabilitado | Quando não há tolerância a falhas |
Passo 8 — Validar e Executar
- Clique em Validar para verificar configurações
- Clique em Executar para testar manualmente
- Acompanhe o status na aba Saída
Resultado
lh_corp_bronze.dbo.tb_raw_protheus_sb1
Dataflow Gen2
Ideal para: Fontes que precisam de tratamento visual (Excel, CSV, SharePoint) ou transformações leves.
Cenário de Exemplo
| Item | Valor |
|---|---|
| Fonte | Planilha Excel do Banco Central |
| Arquivo | bcdata.csv |
| Série | 10813 - Taxa de câmbio - Livre - Dólar americano (compra) |
| Destino | lh_corp_bronze |
| Tabela Destino | tb_raw_bcb_dollar_rate |
Sobre a Planilha
A planilha bcdata.csv contém a série histórica da taxa de câmbio do dólar (compra) disponibilizada pelo Banco Central do Brasil através do Sistema Gerenciador de Séries Temporais (SGS).
Estrutura do arquivo:
| Coluna | Tipo | Descrição |
|---|---|---|
| Data | Data | Data da cotação (DD/MM/AAAA) |
| Valor | Número | Taxa de câmbio em R$/US$ |
Passo 1 — Baixar o Arquivo
Passo 2 — Fazer Upload para o Lakehouse
- Acesse o Workspace
qan_br_td_data_dev - Abra o Lakehouse
lh_corp_bronze - No painel esquerdo, clique em Files
- Navegue ou crie a pasta
landing/bcb/ - Clique em Upload → Upload files
- Selecione o arquivo
bcdata.csv
Caminho final do arquivo:
lh_corp_bronze/Files/landing/bcb/bcdata.csv
Passo 3 — Criar o Dataflow
- Acesse o Workspace
qan_br_td_data_dev - Clique em + Novo → Dataflow Gen2
- Nomeie como
df_bronze_bcb_dollar_rate
Passo 4 — Conectar à Fonte (Lakehouse Files)
- Clique em Obter dados
- Selecione Lakehouse
- Selecione o Lakehouse
lh_corp_bronze - Navegue até Files → landing → bcb
- Selecione o arquivo
bcdata.csv - Escolha a planilha que contém os dados
Passo 5 — Aplicar Transformações
No editor Power Query, aplique as seguintes transformações:
5.1 — Promover Cabeçalhos
Se a primeira linha contém os nomes das colunas:
- Clique em Transformar → Usar a primeira linha como cabeçalho
5.2 — Renomear Colunas
Padronize os nomes das colunas para snake_case em inglês:
| Coluna Original | Novo Nome |
|---|---|
| Data | dt_rate |
| Valor | vl_rate |
- Clique com botão direito na coluna → Renomear
5.3 — Alterar Tipos
Garanta que os tipos estão corretos:
| Coluna | Tipo |
|---|---|
| dt_rate | Data |
| vl_rate | Número Decimal |
- Clique no ícone de tipo na coluna → Selecione o tipo correto
5.4 — Filtrar Linhas Vazias
Remova linhas onde a data está vazia:
- Clique na seta da coluna
dt_rate - Desmarque (nulo) e (vazio)
5.5 — Adicionar Colunas de Metadados
Adicione informações de controle:
- Clique em Adicionar Coluna → Coluna Personalizada
- Adicione as colunas:
| Nome | Fórmula |
|---|---|
| ds_source | "BCB_SGS_10813" |
| ds_currency_from | "USD" |
| ds_currency_to | "BRL" |
| dt_load | DateTime.LocalNow() |
Passo 6 — Configurar Destino
- Clique em Adicionar destino de dados → Lakehouse
- Selecione o Lakehouse
lh_corp_bronze - Defina o nome da tabela:
tb_raw_bcb_dollar_rate - Método de atualização:
- Substituir: Para recarregar todo o histórico
- Acrescentar: Para adicionar apenas novos registros
Passo 7 — Publicar e Executar
- Clique em Publicar
- Aguarde a publicação finalizar
- No Workspace, localize o Dataflow
df_bronze_bcb_dollar_rate - Clique em ⋮ → Atualizar agora
- Acompanhe o status de execução
Passo 8 — Otimizar o Código M (Recomendado)
O Dataflow gera código M automaticamente conforme você aplica transformações na interface. No entanto, esse código pode ser otimizado para melhor performance e legibilidade.
Acessar o Editor Avançado
- No editor do Dataflow, clique em Exibir → Editor Avançado
- O código M completo será exibido
Código Original (Gerado pelo Dataflow)
let
Origem = Lakehouse.Contents(null),
Navegação = Origem{[workspaceId = "c3fc2dea-8363-44b6-8bc4-446d0be8c18e"]}[Data],
#"Navegação 1" = Navegação{[lakehouseId = "87b01831-605a-4b18-8bfa-24d0aed7d89d"]}[Data],
#"Navegação 2" = #"Navegação 1"{[Id = "Files", ItemKind = "Folder"]}[Data],
#"Navegação 3" = #"Navegação 2"{[Name = "landing"]}[Content],
#"Navegação 4" = #"Navegação 3"{[Name = "bcb"]}[Content],
#"Navegação 5" = #"Navegação 4"{[Name = "bcdata.csv"]}[Content],
#"CSV Importado" = Csv.Document(#"Navegação 5", [Delimiter = ";", Columns = 2, QuoteStyle = QuoteStyle.None]),
#"Cabeçalhos promovidos" = Table.PromoteHeaders(#"CSV Importado", [PromoteAllScalars = true]),
#"Tipo de coluna alterado" = Table.TransformColumnTypes(#"Cabeçalhos promovidos", {{"data", type date}, {"valor", type number}}),
#"Colunas renomeadas" = Table.RenameColumns(#"Tipo de coluna alterado", {{"data", "dt_rate"}, {"valor", "vl_rate"}}),
#"Linhas filtradas" = Table.SelectRows(#"Colunas renomeadas", each [dt_rate] <> null and [dt_rate] <> ""),
#"Personalização adicionada" = Table.TransformColumnTypes(Table.AddColumn(#"Linhas filtradas", "ds_source", each "BCB_SGS_10813"), {{"ds_source", type text}}),
#"Personalização adicionada 1" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada", "ds_currency_from", each "USD"), {{"ds_currency_from", type text}}),
#"Personalização adicionada 2" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada 1", "ds_currency_to", each "BRL"), {{"ds_currency_to", type text}}),
#"Personalização adicionada 3" = Table.TransformColumnTypes(Table.AddColumn(#"Personalização adicionada 2", "dt_load", each DateTime.LocalNow()), {{"dt_load", type datetime}})
in
#"Personalização adicionada 3"
Problemas Identificados
| Problema | Impacto |
|---|---|
Múltiplas chamadas de TransformColumnTypes | Cada chamada processa toda a tabela novamente |
| Navegação em muitos passos separados | Código verboso e difícil de manter |
Nomes genéricos (Navegação 1, Personalização adicionada 1) | Dificulta entendimento e debug |
Tipagem após cada AddColumn | Processamento redundante |
Código Otimizado
let
// =====================================================
// ORIGEM DOS DADOS
// =====================================================
Origem = Lakehouse.Contents(null),
Navegacao = Origem{[workspaceId = "c3fc2dea-8363-44b6-8bc4-446d0be8c18e"]}[Data],
Lakehouse = Navegacao{[lakehouseId = "87b01831-605a-4b18-8bfa-24d0aed7d89d"]}[Data],
// Navegação encadeada até o arquivo
Arquivo = Lakehouse
{[Id = "Files", ItemKind = "Folder"]}[Data]
{[Name = "landing"]}[Content]
{[Name = "bcb"]}[Content]
{[Name = "bcdata.csv"]}[Content],
// =====================================================
// IMPORTAÇÃO E LIMPEZA
// =====================================================
CSV = Csv.Document(Arquivo, [Delimiter = ";", Columns = 2, QuoteStyle = QuoteStyle.None]),
Cabecalhos = Table.PromoteHeaders(CSV, [PromoteAllScalars = true]),
Renomeado = Table.RenameColumns(Cabecalhos, {
{"data", "dt_rate"},
{"valor", "vl_rate"}
}),
Filtrado = Table.SelectRows(Renomeado, each [dt_rate] <> null and [dt_rate] <> ""),
// =====================================================
// METADADOS DE CONTROLE
// =====================================================
Meta_Source = Table.AddColumn(Filtrado, "ds_source", each "BCB_SGS_10813"),
Meta_CurrencyFrom = Table.AddColumn(Meta_Source, "ds_currency_from", each "USD"),
Meta_CurrencyTo = Table.AddColumn(Meta_CurrencyFrom, "ds_currency_to", each "BRL"),
Meta_Load = Table.AddColumn(Meta_CurrencyTo, "dt_load", each DateTime.LocalNow()),
// =====================================================
// TIPAGEM FINAL (única chamada)
// =====================================================
TiposDefinidos = Table.TransformColumnTypes(Meta_Load, {
{"dt_rate", type date},
{"vl_rate", type number},
{"ds_source", type text},
{"ds_currency_from", type text},
{"ds_currency_to", type text},
{"dt_load", type datetime}
})
in
TiposDefinidos
Benefícios da Otimização
| Aspecto | Antes | Depois | Melhoria |
|---|---|---|---|
| Passos | 15 | 10 | -33% |
Chamadas de TransformColumnTypes | 5 | 1 | -80% |
| Legibilidade | Nomes genéricos | Nomes descritivos | Facilita manutenção |
| Comentários | Nenhum | Seções organizadas | Documentação inline |
| Performance | Tipagem repetida | Tipagem única no final | Menos processamento |
Por que tipar apenas no final?
Cada chamada de Table.TransformColumnTypes percorre todas as linhas da tabela para converter os tipos. Quando você chama essa função 5 vezes (uma para cada coluna), está processando a tabela 5 vezes.
Ao consolidar em uma única chamada no final, a tabela é processada apenas uma vez, reduzindo significativamente o tempo de execução em tabelas com muitos registros.
Boas Práticas de Código M
- Nomes descritivos: Use nomes que indiquem a ação (
Filtrado,Renomeado,Meta_Source) - Comentários por seção: Divida o código em blocos lógicos com comentários
- Tipagem consolidada: Defina todos os tipos em uma única chamada no final
- Navegação encadeada: Combine navegações simples em uma única expressão
- Consistência: Mantenha o mesmo padrão em todos os Dataflows do projeto
Você pode usar ferramentas de IA (como o Copilot ou Claude) para otimizar o código M gerado automaticamente pelo Dataflow. Basta copiar o código do Editor Avançado e solicitar a otimização.
Resultado
lh_corp_bronze.files.bcb.tb_raw_bcb_dollar_rate
Estrutura Final da Tabela
| Coluna | Tipo | Exemplo |
|---|---|---|
| dt_rate | DATE | 2025-01-03 |
| vl_rate | DECIMAL | 6.0150 |
| ds_source | STRING | BCB_SGS_10813 |
| ds_currency_from | STRING | USD |
| ds_currency_to | STRING | BRL |
| dt_load | TIMESTAMP | 2025-01-06 08:30:00 |
Opção 4: Notebook PySpark
Ideal para: APIs REST, lógica complexa, paginação, autenticação OAuth, ou quando você precisa de controle total.
Limitação: Servidores On-Premises
O Notebook PySpark roda na infraestrutura de nuvem do Microsoft Fabric. Diferente do Pipeline e Dataflow que utilizam o Data Gateway para acessar servidores on-premises, o Notebook não possui integração nativa com Gateway.
| Artefato | Acessa On-Premises via Gateway? |
|---|---|
| Copy Job | ✅ Sim |
| Data Pipeline | ✅ Sim |
| Dataflow Gen2 | ✅ Sim |
| Notebook PySpark | ❌ Não |
Cenários de Uso do Notebook para Ingestão
| Cenário | Funciona? | Solução |
|---|---|---|
| Banco de dados na nuvem (Azure SQL, AWS RDS) | ✅ Sim | Conexão JDBC direta |
| APIs REST públicas | ✅ Sim | Biblioteca requests |
| APIs REST com OAuth | ✅ Sim | Biblioteca requests + autenticação |
| Banco de dados on-premises | ❌ Não | Usar Copy Job/Pipeline/Dataflow |
Cenário de Exemplo: Cotação do Dólar (API Banco Central)
| Item | Valor |
|---|---|
| API | PTAX - Banco Central do Brasil |
| URL Base | https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata |
| Autenticação | Não requer |
| Destino | lh_corp_bronze |
| Tabela Destino | tb_raw_api_dollar_rate |
Passo 1 — Criar o Notebook
- Acesse o Workspace
qan_br_td_data_dev - Clique em + Novo → Notebook
- Nomeie como
nb_bronze_dollar_rate - Anexe ao Lakehouse
lh_corp_bronze
Passo 2 — Importar Bibliotecas
# Célula 1: Importar bibliotecas
import requests
import json
from datetime import datetime, timedelta
Passo 3 — Criar Função de Busca com Fallback
A API do Banco Central só retorna cotações em dias úteis. Esta função busca a cotação de hoje e, se não houver (fim de semana/feriado), busca automaticamente o último dia útil.
# Célula 2: Função para buscar cotação com fallback
def buscar_cotacao_dolar():
"""
Busca cotação do dólar do dia atual.
Se não houver (fim de semana/feriado), busca o último dia útil.
"""
# Tentar data de hoje
data_consulta = datetime.now().strftime("%m-%d-%Y")
url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarDia(dataCotacao=@dataCotacao)?@dataCotacao='{data_consulta}'&$format=json"
response = requests.get(url)
data = response.json()
# Se retornou dados, usa eles
if data['value']:
print(f"Cotacao encontrada para hoje: {data_consulta}")
return data['value']
# Se nao retornou, busca ultimos 7 dias e pega o mais recente
print(f"Sem cotacao para hoje ({data_consulta}). Buscando ultimo dia util...")
data_fim = datetime.now()
data_inicio = data_fim - timedelta(days=7)
data_inicio_str = data_inicio.strftime("%m-%d-%Y")
data_fim_str = data_fim.strftime("%m-%d-%Y")
url_periodo = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial='{data_inicio_str}'&@dataFinalCotacao='{data_fim_str}'&$format=json&$orderby=dataHoraCotacao%20desc&$top=1"
response_periodo = requests.get(url_periodo)
data_periodo = response_periodo.json()
if data_periodo['value']:
ultima_data = data_periodo['value'][0]['dataHoraCotacao']
print(f"Ultima cotacao disponivel: {ultima_data}")
return data_periodo['value']
print("Nenhuma cotacao encontrada nos ultimos 7 dias.")
return []
Lógica de fallback:
┌─────────────────────────────────────────────────────────────────────────────┐
│ LÓGICA DE BUSCA │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Tenta buscar cotação de HOJE │
│ │ │
│ ├── Se encontrar: Retorna os dados │
│ │ │
│ └── Se vazio (fim de semana/feriado): │
│ │ │
│ ▼ │
│ 2. Busca últimos 7 dias, ordenado por data DESC, LIMIT 1 │
│ │ │
│ └── Retorna a cotação mais recente disponível │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Passo 4 — Executar Busca
# Célula 3: Executar busca
cotacoes = buscar_cotacao_dolar()
if cotacoes:
print(f"\nRegistros retornados: {len(cotacoes)}")
print(json.dumps(cotacoes, indent=2))
else:
print("Nenhum dado retornado.")
Retorno esperado:
[
{
"cotacaoCompra": 6.0150,
"cotacaoVenda": 6.0180,
"dataHoraCotacao": "2025-01-03 13:09:08.512"
}
]
Passo 5 — Converter para DataFrame
# Célula 4: Converter para DataFrame
if cotacoes:
dados_cotacao = []
for cotacao in cotacoes:
dados_cotacao.append({
"ds_currency_from": "USD",
"ds_currency_to": "BRL",
"vl_rate_buy": float(cotacao["cotacaoCompra"]),
"vl_rate_sell": float(cotacao["cotacaoVenda"]),
"dt_rate": cotacao["dataHoraCotacao"],
"ds_bulletin_type": cotacao.get("tipoBoletim", "N/A")
})
df_cotacao = spark.createDataFrame(dados_cotacao)
print(f"Total de registros: {df_cotacao.count()}")
df_cotacao.show(truncate=False)
else:
print("Sem dados para processar.")
ds_bulletin_typeO campo tipoBoletim existe apenas no endpoint de período. Usamos .get("tipoBoletim", "N/A") para evitar erro quando o campo não existir.
Passo 6 — Adicionar Metadados e Gravar
# Célula 5: Adicionar metadados e gravar
from pyspark.sql.functions import current_timestamp, lit, from_utc_timestamp
df_final = df_cotacao \
.withColumn("dt_load", from_utc_timestamp(current_timestamp(), "America/Sao_Paulo")) \
.withColumn("ds_source", lit("API_BCB_PTAX"))
df_final.write \
.format("delta") \
.mode("append") \
.saveAsTable("lh_corp_bronze.tb_raw_api_dollar_rate")
print("Ingestao concluida com sucesso!")
Passo 7 — Validar
# Célula 6: Validação
spark.sql("""
SELECT
ds_currency_from,
ds_currency_to,
vl_rate_buy,
vl_rate_sell,
dt_rate,
dt_load
FROM lh_corp_bronze.tb_raw_api_dollar_rate
ORDER BY dt_rate DESC
LIMIT 10
""").show(truncate=False)
Resultado
lh_corp_bronze.dbo.tb_raw_api_dollar_rate
Transformações na Camada Bronze
Na arquitetura Medallion, a camada Bronze armazena dados "brutos". Porém, isso não significa que nenhuma transformação pode ser feita. Existe uma distinção importante entre transformações técnicas e transformações de negócio.
O que é permitido na Bronze?
- Padronização de nomes de colunas
- Definição de tipos de dados
- Adição de metadados (dt_load, ds_source)
- Remoção de linhas completamente vazias
- Conversão de encoding
- Regras de negócio e validações
- Joins entre tabelas
- Deduplicação inteligente
- Cálculos derivados
- Tratamento de valores inválidos
- Agregações e sumarizações
- KPIs e métricas de negócio
- Modelagem dimensional
- Dados prontos para relatórios
- Visões específicas por área
Comparativo: Técnico vs Negócio
| Tipo | Camada | Exemplo | Justificativa |
|---|---|---|---|
| Técnico | Bronze | Renomear data → dt_rate | Padronização de nomenclatura, não altera o dado |
| Técnico | Bronze | Adicionar dt_load | Metadado de rastreabilidade, não é dado de negócio |
| Técnico | Bronze | Definir tipo DATE | Garantir consistência, não altera o valor |
| Técnico | Bronze | Remover linhas 100% vazias | Limpeza estrutural, não é regra de negócio |
| Negócio | Silver | Filtrar status = 'ATIVO' | Regra de negócio que exclui dados |
| Negócio | Silver | Calcular vl_total = qtd * preco | Criação de dado derivado |
| Negócio | Silver | Join cliente + pedido | Relacionamento entre entidades |
| Analítico | Gold | SUM(vl_total) por mês | Agregação para análise |
Por que fazemos isso na Bronze?
┌─────────────────────────────────────────────────────────────────────────────┐
│ PRINCÍPIO: Dado bruto ≠ Dado bagunçado │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ A Bronze preserva a FIDELIDADE dos dados originais. │
│ Transformações técnicas não alteram o CONTEÚDO, apenas a ESTRUTURA. │
│ │
│ ✅ PERMITIDO NA BRONZE: │
│ • Renomear colunas (padronização) │
│ • Definir tipos de dados │
│ • Adicionar metadados de controle │
│ • Remover linhas completamente vazias │
│ │
│ ❌ NÃO PERMITIDO NA BRONZE: │
│ • Filtrar dados por regras de negócio │
│ • Fazer cálculos ou derivações │
│ • Aplicar joins entre tabelas │
│ • Remover duplicatas (exceto técnicas) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Exemplo Prático: Cotação do Dólar
Veja como as transformações se distribuem entre as camadas:
| Camada | Transformação | Dado Original | Dado Transformado |
|---|---|---|---|
| Bronze | Renomear coluna | data | dt_rate |
| Bronze | Adicionar metadado | (não existe) | ds_source = "BCB_SGS_10813" |
| Bronze | Definir tipo | "6.0150" (texto) | 6.0150 (decimal) |
| Silver | Validar range | 6.0150 | 6.0150 (válido) ou NULL (inválido) |
| Silver | Calcular variação | 6.0150 | vl_variacao_dia = 0.0023 |
| Gold | Agregar por mês | 6.0150 | vl_media_mensal = 5.89 |
Se a transformação altera o significado ou exclui dados de negócio, ela pertence à Silver. Se apenas organiza ou adiciona contexto técnico, pode ficar na Bronze.
Entre em contato com o Time de Transformação Digital (TD) ou o Comitê de Dados.