3. Comandos de Gerenciamento de Esquemas e Catálogos
1. readStream – Leitura de Dados em Streaming
2. writeStream – Escrita de Dados em Streaming
A operação MERGE no Databricks é usada para realizar upserts (combinação de update e insert) entre uma tabela de origem e uma Delta Table de destino. Ela permite inserir, atualizar ou excluir dados com base em uma condição.
MERGE INTO target_table AS target
USING source_table AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.column1 = source.column1
WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (source.column1, source.column2);MERGE INTO customers AS t
USING new_customers AS s
ON t.customer_id = source.customer_id
WHEN MATCHED THEN
UPDATE SET target.name = source.name, target.address = source.address
WHEN NOT MATCHED THEN
INSERT (customer_id, name, address)
VALUES (source.customer_id, source.name, source.address); MERGE INTO customers AS target
USING old_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN DELETE;No Databricks, Structured Streaming permite o processamento contínuo de dados em tempo real de maneira escalável e tolerante a falhas. Ele é baseado no conceito de DataFrames e DataSets e utiliza os métodos readStream para leitura de dados em fluxo e writeStream para gravação dos resultados em uma saída contínua.
O método readStream permite que você leia dados de uma fonte de streaming, como Kafka, arquivos, sockets, entre outros.
# Leitura de dados de um diretório de arquivos JSON em streaming
streaming_df = spark.readStream \
.format("json") \
.option("path", "/caminho/para/diretorio/streaming") \
.load()Neste exemplo:
Após processar os dados em streaming, você utiliza o método writeStream para gravar os resultados continuamente em um destino, como tabelas, arquivos, Kafka, console, etc.
# Escreve os dados lidos em uma tabela Delta em tempo real
streaming_query = streaming_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/caminho/para/checkpoint") \
.start("/caminho/para/tabela/delta")Neste exemplo:
# Leitura de dados JSON em streaming
streaming_df = spark.readStream \
.format("json") \
.option("path", "/input/dir") \
.load()
# Escreve os dados para uma tabela Delta
streaming_query = streaming_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/dir") \
.start("/output/delta-table")streaming_query = streaming_df.writeStream \
.outputMode("append") \
.trigger(processingTime='5 seconds') \
.option("checkpointLocation", "/checkpoint/path") \
.table("output_table")trigger: Define quando o processamento de streaming será executado.
trigger(continuous=...): Processa os dados de forma contínua, com latência muito baixa (para aplicações de baixa latência).trigger(processingTime='interval'): Define um intervalo fixo para a execução de cada batch (ex: processingTime='10 seconds').trigger(once=True): Processa todos os dados disponíveis e depois para o streaming. Útil para carregar grandes volumes de dados em batch, mas com a simplicidade de um stream.outputMode: Define como os dados são gravados no destino.
append: Apenas novos dados são gravados (nada é atualizado). Ideal para operações de ingestão contínua.update: Atualiza os dados que mudaram desde o último trigger. Funciona com consultas agregadas.complete: Grava todo o resultado da consulta a cada batch, substituindo todos os dados. Útil para consultas agregadas em que você deseja recalcular todo o conjunto.table(): O uso de table() no writeStream escreve os dados de streaming diretamente em uma tabela no Databricks, como uma Delta Table, com suporte a controle de estado e transações. O uso de Delta Tables oferece suporte a operações de viagem no tempo, auditoria, e rollback, aproveitando as funcionalidades de otimização do Delta Lake.
O Autoloader no Databricks é uma ferramenta para ingestão automática e escalável de dados de arquivos em diretórios de armazenamento como o S3, Azure Blob Storage, ou ADLS. Ele é ideal para processar arquivos novos ou modificados em tempo real, sem a necessidade de configurar manualmente triggers ou monitoramentos de arquivos, permitindo que você processe grandes volumes de arquivos de forma eficiente e automatizada!
O Auto Loader utiliza notificações ou o monitoremento de diretórios para identificar novos arquivos, e ingeri-los em tabelas Delta ou outras saídas.
Processamento em Streaming: Auto Loader lê novos arquivos automaticamente em um fluxo de dados contínuo.
Escalabilidade: Lida com grandes volumes de arquivos em armazenamento distribuído.
Monitoramento Automático: Detecta automaticamente novos arquivos ou mudanças sem a necessidade de escrever lógica de controle manual.
Tolerância a falhas: Com uso de checkpoints, o Auto Loader garante que nenhum dado seja perdido ou processado duas vezes.
Facilidade de uso: Basta configurar a fonte e destino, e ele lida com o resto.
Notificação de eventos: Utiliza eventos de sistema (como notificações S3 ou Azure) para detectar novos arquivos (mais eficiente).
Listagem de diretórios: Varre periodicamente o diretório para verificar novos arquivos (menos eficiente, mas funciona sem configurações adicionais).
streaming_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/caminho/para/esquema") \
.load("/caminho/para/diretorio")Neste exemplo:
format("cloudFiles"): Indica o uso do Auto Loader.
cloudFiles.format: Especifica o formato dos arquivos (JSON, CSV, Parquet, etc.).
cloudFiles.schemaLocation: Define o local onde o Auto Loader armazena o esquema dos dados (necessário para inferir novos arquivos).
streaming_query = streaming_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/path") \
.start("/output/delta-table")Neste exemplo, os dados são escritos em uma Delta Table em tempo real.
DLT no databricks sao uma maneira simplificada e eficiente de construir pipelines ETL declarativos, para dados em streaming ou batch. Uma DLT integra comandos SQL e PySpark para criar, transformar e gerenciar tabelas Delta de forma incremental e confiavel.
LIVE?
LIVE é usado para referenciar tabelas já criadas no pipeline DLT .Exemplo:
--- vendas é uma tabela já definida no pipeline.
--- vendas_filtradas é uma nova tabela que usa`LIVE.vendas` como fonte.
CREATE LIVE TABLE vendas_filtradas AS
SELECT * FROM LIVE.vendas
WHERE status = 'concluída';STREAMING?
STREAMING é usado quando você quer processar dados em tempo real .Exemplo:
--- vendas_stream está ingerindo dados em tempo real de arquivos JSON que chegam na pasta /mnt/vendas
CREATE STREAMING LIVE TABLE vendas_stream AS
SELECT * FROM cloud_files("/mnt/vendas", "json");Aqui estão exemplos práticos de uso de Delta Live Tables (DLT) no Databricks com comandos SQL e funções como STREAMING, LIVE, APPLY CHANGES, e o uso de cloud_files() para ingestão de dados.
CREATE OR REFRESH STREAMING LIVE TABLE raw_data
AS SELECT * FROM cloud_files("/input/directory", "json");STREAMING: Indica que a tabela é alimentada continuamente em modo streaming.cloud_files(): Detecta novos arquivos em um diretório (funciona com JSON, CSV, Parquet, etc.).CREATE OR REFRESH: Cria ou atualiza a tabela incrementalmente.Permite aplicar filtros, agregações ou qualquer lógica SQL. Usamos o comando LIVE para referenciar outra tabela criada no pipeline.
CREATE OR REFRESH LIVE TABLE transformed_data AS SELECT id, name, timestamp, value
FROM LIVE.raw_data
WHERE value > 10;LIVE.raw_data: Referencia a tabela live raw_data criada anteriormente no pipeline.
O comando APPLY CHANGES é usado para capturar mudanças (CDC - Change Data Capture) em dados.
APPLY CHANGES INTO LIVE.customer_data FROM STREAM(LIVE.raw_data)
KEYS (customer_id)
SEQUENCE BY timestamp
COLUMNS * EXCEPT (metadata)
STORED AS SCD TYPE 1;STREAM(LIVE.raw_data): Processa a tabela raw_data em modo streaming.
KEYS: Coluna-chave usada para identificar registros únicos.
SEQUENCE BY: Coluna que define a ordenação dos eventos.
STORED AS SCD TYPE 1: Aplica a lógica de Slowly Changing Dimension Type 1, onde os registros antigos são sobrescritos.
SCD TYPE
| SCD Type | Descrição | *Quando Usar |
|---|---|---|
| SCD Type 1 | Sobrescreve dados antigos (sem histórico) | Quando o histórico não é necessário (ex: correção de erros) |
| SCD Type 2 | Mantém o histórico com múltiplas versões | Quando é importante rastrear alterações ao longo do tempo. |
| SCD Type 3 | Mantém apenas o valor atual e o anterior | Quando você só precisa de um histórico limitado. |
DLT permite o processamento híbrido (batch e streaming). O DLT trata automaticamente a ingestão de arquivos novos como streaming, mesmo que os arquivos venham em lotes (batch).
CREATE OR REFRESH LIVE TABLE hybrid_data
AS SELECT * FROM cloud_files("/input/hybrid", "csv", map("cloudFiles.inferSchema", "true"));cloud_files(): Garante que arquivos novos e modificados sejam automaticamente identificados.map(): Passa opções para o carregamento, como cloudFiles.inferSchema.-- Tabela raw em streaming
CREATE OR REFRESH STREAMING LIVE TABLE raw_data
AS SELECT * FROM cloud_files("/data/input/", "json");
-- Transformação na tabela raw
CREATE OR REFRESH LIVE TABLE cleaned_data
AS SELECT id, name, value
FROM LIVE.raw_data
WHERE value IS NOT NULL;
-- Aplicar mudanças incrementais
APPLY CHANGES INTO LIVE.final_data
FROM STREAM(LIVE.cleaned_data)
KEYS (id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;No Databricks, o conceito de viagem no tempo em Delta Tables permite consultar, restaurar ou inspecionar uma tabela em versões ou pontos anteriores no tempo. Isso é extremamente útil para auditoria de dados, análises de mudanças ou para desfazer operações
SELECT * FROM table_name VERSION AS OF 5;
SELECT * FROM table_name@v5;RESTORE TABLE table_name TO VERSION AS OF 5;SELECT * FROM table_name TIMESTAMP AS OF 'YYYY-MM-DDTHH:MM:SSZ';RESTORE TABLE table_name TO TIMESTAMP AS OF 'YYYY-MM-DDTHH:MM:SSZ';DESCRIBE HISTORY table_name;