Skip to content

Instantly share code, notes, and snippets.

@VonLisboa
Last active February 1, 2025 03:57
Show Gist options
  • Select an option

  • Save VonLisboa/c84d56aa5f1a5af9947d5d93f1fb8a3b to your computer and use it in GitHub Desktop.

Select an option

Save VonLisboa/c84d56aa5f1a5af9947d5d93f1fb8a3b to your computer and use it in GitHub Desktop.
Databricks Resumido na Pratica

COMANDOS ESSENCIAIS PARA DELTA TABLE NO DATABRICKS

  1. Comandos para Delta Tables

    1. CREATE TABLE ... USING DELTA: Cria uma Delta Table. Pode ser criada a partir de uma consulta ou apenas a estrutura da tabela.
    2. ALTER TABLE ... ADD CONSTRAINT: Adiciona uma constraint (restrição) à Delta Table, como CHECK ou NOT NULL.
    3. ALTER TABLE ... DROP CONSTRAINT: Remove uma constraint específica de uma Delta Table.
    4. DROP TABLE: Remove uma Delta Table. Se a tabela for gerenciada, os dados serão excluídos junto com a definição da tabela.
    5. RENAME TABLE: Renomeia uma Delta Table.
    6. DESCRIBE HISTORY: Mostra o histórico de versões e operações realizadas na Delta Table. Útil para fazer auditoria ou rollback.
    7. DESCRIBE DETAIL: Exibe detalhes sobre a Delta Table, como o número de arquivos, o tamanho total dos dados e outras informações estatísticas.
    8. VACUUM: Remove arquivos antigos que não são mais referenciados pela Delta Table. Isso é útil para liberar espaço e otimizar a performance.
    9. OPTIMIZE: Otimizar os dados na Delta Table, por exemplo, re-escrevendo arquivos para melhorar a performance das consultas.
    10. ALTER TABLE ... SET PROPERTIES: Modifica as propriedades da Delta Table, como ativar ou desativar delta.autoOptimize.optimizeWrite.
    11. MERGE INTO: Realiza operações UPSERT (update + insert) entre uma tabela de origem e uma Delta Table, com base em uma condição específica.
    12. CONVERT TO DELTA: Converte uma tabela existente em uma Delta Table. Pode ser aplicada em tabelas Parquet ou Hive.
    13. GENERATE: Cria um manifesto ou arquivo de snapshot da Delta Table, que pode ser utilizado para integração com outros sistemas.
  2. Comandos para Views

    1. CREATE VIEW: Cria uma view com base em uma consulta SQL. As views não armazenam dados, apenas a definição da consulta.
    2. CREATE OR REPLACE VIEW: Cria ou substitui uma view existente.
    3. ALTER VIEW: Modifica a definição de uma view existente.
    4. DROP VIEW: Remover uma view do catálogo.
  3. Comandos de Gerenciamento de Esquemas e Catálogos

    1. CREATE SCHEMA: Cria um novo esquema (namespace) para organizar as tabelas e views.
    2. DROP SCHEMA: Remove um esquema e todo o seu conteúdo, se o esquema não estiver vazio.
    3. CREATE CATALOG: Cria um novo catálogo para organizar esquemas, tabelas e views.
    4. DROP CATALOG: Remove um catálogo e todo o seu conteúdo.
  4. Comandos Adicionais

    1. SHOW TABLES: Lista todas as tabelas e views em um determinado esquema.
    2. SHOW COLUMNS: Lista as colunas de uma tabela ou view.
    3. SHOW SCHEMAS: Lista todos os esquemas disponíveis no catálogo.
    4. SHOW CREATE TABLE: Mostra o comando SQL usado para criar uma tabela, útil para entender a estrutura.

MERGE NO DATABRICKS

A operação MERGE no Databricks é usada para realizar upserts (combinação de update e insert) entre uma tabela de origem e uma Delta Table de destino. Ela permite inserir, atualizar ou excluir dados com base em uma condição.

Benefícios:

  • Upsert simplificado: Combina inserções e atualizações em uma única operação.
  • Desempenho: Mais eficiente do que realizar operações UPDATE e INSERT separadas.
  • Gerenciamento de dados: Ideal para manter tabelas em Delta Lake sempre atualizadas com dados mais recentes.

Sintaxe Geral do MERGE

MERGE INTO target_table AS target  
USING source_table AS source  
ON target.id = source.id  
WHEN MATCHED THEN UPDATE SET target.column1 = source.column1  
WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (source.column1, source.column2);

Explicação:

  • MERGE INTO target_table: Define a tabela de destino que será atualizada.
  • USING source_table: Define a tabela de origem com os dados que serão mesclados.
  • ON target.id = source.id: Condição para combinar as tabelas (pode ser qualquer coluna).
  • WHEN MATCHED: O que fazer quando a condição é atendida (normalmente UPDATE).
  • WHEN NOT MATCHED: O que fazer quando a condição não é atendida (normalmente INSERT).

Exemplo de MERGE

  • Exemplo 1: Atualizar dados existentes e inserir novos dados

    MERGE INTO customers AS t  
    USING new_customers AS s  
    ON t.customer_id = source.customer_id  
    WHEN MATCHED THEN   
      UPDATE SET target.name = source.name, target.address = source.address  
    WHEN NOT MATCHED THEN   
      INSERT (customer_id, name, address)   
      VALUES (source.customer_id, source.name, source.address);  
  • Neste exemplo:
    • Se o customer_id já existir na tabela customers, o registro será atualizado.
    • Se o customer_id não existir, um novo registro será inserido.
  • Exemplo 2: Excluir dados em uma correspondência
    MERGE INTO customers AS target  
    USING old_customers AS source  
    ON target.customer_id = source.customer_id  
    WHEN MATCHED THEN DELETE;
  • Neste exemplo:
    • Se houver uma correspondência de customer_id, o registro correspondente na tabela customers será excluído.

STRUCTURED STREAMING

No Databricks, Structured Streaming permite o processamento contínuo de dados em tempo real de maneira escalável e tolerante a falhas. Ele é baseado no conceito de DataFrames e DataSets e utiliza os métodos readStream para leitura de dados em fluxo e writeStream para gravação dos resultados em uma saída contínua.

1. readStream – Leitura de Dados em Streaming

O método readStream permite que você leia dados de uma fonte de streaming, como Kafka, arquivos, sockets, entre outros.

Exemplo:

# Leitura de dados de um diretório de arquivos JSON em streaming  
streaming_df = spark.readStream \  
  .format("json") \  
  .option("path", "/caminho/para/diretorio/streaming") \
  .load()

Neste exemplo:

  • format("json"): Especifica o formato dos arquivos de entrada (json, csv, delta, parquet, console, memory, kafka).
  • option("path", ...): Define o caminho para os arquivos que estão sendo gerados continuamente (caminho, tópico Kafka, etc).
  • load(): Começa a ler os dados da fonte em tempo real.

2. writeStream – Escrita de Dados em Streaming

Após processar os dados em streaming, você utiliza o método writeStream para gravar os resultados continuamente em um destino, como tabelas, arquivos, Kafka, console, etc.

Exemplo:

# Escreve os dados lidos em uma tabela Delta em tempo real  
streaming_query = streaming_df.writeStream \
  .format("delta") \
  .option("checkpointLocation", "/caminho/para/checkpoint") \
  .start("/caminho/para/tabela/delta")

Neste exemplo:

  • format("delta"): Define o formato de saída (pode ser delta, parquet, console, etc.).
  • option("checkpointLocation", ...): Define um local para checkpoints, o que garante a tolerância a falhas e controle de estado.
  • start(): Inicia a gravação contínua dos dados processados no destino.

Exemplo Completo de Streaming:

# Leitura de dados JSON em streaming  
streaming_df = spark.readStream \
  .format("json") \
  .option("path", "/input/dir") \
  .load()

# Escreve os dados para uma tabela Delta  
streaming_query = streaming_df.writeStream \
  .format("delta") \
  .option("checkpointLocation", "/checkpoint/dir") \
  .start("/output/delta-table")

Principais Considerações:

  • Tolerância a falhas: O uso de checkpoints garante que, em caso de falhas, o streaming possa ser retomado do último ponto processado.
  • Latência: O Structured Streaming permite o processamento de dados com baixa latência, desde que as fontes e destinos sejam otimizados.
  • Modo de Saída: O Structured Streaming suporta diferentes modos de saída: append, update, e complete.

Exemplo com trigger e outputMode

streaming_query = streaming_df.writeStream \
  .outputMode("append") \
  .trigger(processingTime='5 seconds') \
  .option("checkpointLocation", "/checkpoint/path") \
  .table("output_table")
  • trigger: Define quando o processamento de streaming será executado.

    • trigger(continuous=...): Processa os dados de forma contínua, com latência muito baixa (para aplicações de baixa latência).
    • trigger(processingTime='interval'): Define um intervalo fixo para a execução de cada batch (ex: processingTime='10 seconds').
    • trigger(once=True): Processa todos os dados disponíveis e depois para o streaming. Útil para carregar grandes volumes de dados em batch, mas com a simplicidade de um stream.
  • outputMode: Define como os dados são gravados no destino.

    • append: Apenas novos dados são gravados (nada é atualizado). Ideal para operações de ingestão contínua.
    • update: Atualiza os dados que mudaram desde o último trigger. Funciona com consultas agregadas.
    • complete: Grava todo o resultado da consulta a cada batch, substituindo todos os dados. Útil para consultas agregadas em que você deseja recalcular todo o conjunto.
  • table(): O uso de table() no writeStream escreve os dados de streaming diretamente em uma tabela no Databricks, como uma Delta Table, com suporte a controle de estado e transações. O uso de Delta Tables oferece suporte a operações de viagem no tempo, auditoria, e rollback, aproveitando as funcionalidades de otimização do Delta Lake.

AUTOLOADER

O Autoloader no Databricks é uma ferramenta para ingestão automática e escalável de dados de arquivos em diretórios de armazenamento como o S3, Azure Blob Storage, ou ADLS. Ele é ideal para processar arquivos novos ou modificados em tempo real, sem a necessidade de configurar manualmente triggers ou monitoramentos de arquivos, permitindo que você processe grandes volumes de arquivos de forma eficiente e automatizada!

Como Funciona:

O Auto Loader utiliza notificações ou o monitoremento de diretórios para identificar novos arquivos, e ingeri-los em tabelas Delta ou outras saídas.

Principais Características:

  • Processamento em Streaming: Auto Loader lê novos arquivos automaticamente em um fluxo de dados contínuo.

  • Escalabilidade: Lida com grandes volumes de arquivos em armazenamento distribuído.

  • Monitoramento Automático: Detecta automaticamente novos arquivos ou mudanças sem a necessidade de escrever lógica de controle manual.

  • Tolerância a falhas: Com uso de checkpoints, o Auto Loader garante que nenhum dado seja perdido ou processado duas vezes.

  • Facilidade de uso: Basta configurar a fonte e destino, e ele lida com o resto.

    Modos de Detecção:

  • Notificação de eventos: Utiliza eventos de sistema (como notificações S3 ou Azure) para detectar novos arquivos (mais eficiente).

  • Listagem de diretórios: Varre periodicamente o diretório para verificar novos arquivos (menos eficiente, mas funciona sem configurações adicionais).

    Sintaxe Básica:

    Leitura com Auto Loader:

  streaming_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/caminho/para/esquema") \
    .load("/caminho/para/diretorio")

Neste exemplo:

  • format("cloudFiles"): Indica o uso do Auto Loader.

  • cloudFiles.format: Especifica o formato dos arquivos (JSON, CSV, Parquet, etc.).

  • cloudFiles.schemaLocation: Define o local onde o Auto Loader armazena o esquema dos dados (necessário para inferir novos arquivos).

    Escrita em Tabelas Delta:

streaming_query = streaming_df.writeStream \
   .format("delta") \
   .option("checkpointLocation", "/checkpoint/path") \
   .start("/output/delta-table")

Neste exemplo, os dados são escritos em uma Delta Table em tempo real.

DELTA LIVE TABLES

DLT no databricks sao uma maneira simplificada e eficiente de construir pipelines ETL declarativos, para dados em streaming ou batch. Uma DLT integra comandos SQL e PySpark para criar, transformar e gerenciar tabelas Delta de forma incremental e confiavel.

Criação de Tabela com Streaming e LIVE (SQL)

  • O que é LIVE?
    • LIVE é usado para referenciar tabelas já criadas no pipeline DLT .
    • Ele não tem relação direta com streaming (dados em tempo real).
    • Serve para encadear transformações dentro do mesmo pipeline.

Exemplo:

--- vendas é uma tabela já definida no pipeline.
--- vendas_filtradas é uma nova tabela que usa`LIVE.vendas` como fonte.
CREATE LIVE TABLE vendas_filtradas AS
SELECT * FROM LIVE.vendas
WHERE status = 'concluída';
  • O que é STREAMING?
    • STREAMING é usado quando você quer processar dados em tempo real .
    • Ele permite que o DLT consuma dados continuamente de uma fonte (ex: arquivos, Kafka, etc.).

Exemplo:

--- vendas_stream está ingerindo dados em tempo real de arquivos JSON que chegam na pasta /mnt/vendas
CREATE STREAMING LIVE TABLE vendas_stream AS
SELECT * FROM cloud_files("/mnt/vendas", "json");

EXEMPLOS DLT:

Aqui estão exemplos práticos de uso de Delta Live Tables (DLT) no Databricks com comandos SQL e funções como STREAMING, LIVE, APPLY CHANGES, e o uso de cloud_files() para ingestão de dados.

1. Criação de Tabelas Delta Live Usando SQL

Tabela em Streaming: Cria uma tabela que lê dados em tempo real a partir de uma fonte, como arquivos em nuvem.

CREATE OR REFRESH STREAMING LIVE TABLE raw_data
AS SELECT * FROM cloud_files("/input/directory", "json");
  • STREAMING: Indica que a tabela é alimentada continuamente em modo streaming.
  • cloud_files(): Detecta novos arquivos em um diretório (funciona com JSON, CSV, Parquet, etc.).
  • CREATE OR REFRESH: Cria ou atualiza a tabela incrementalmente.

2. Transformações com Live Tables

Permite aplicar filtros, agregações ou qualquer lógica SQL. Usamos o comando LIVE para referenciar outra tabela criada no pipeline.

CREATE OR REFRESH LIVE TABLE transformed_data AS SELECT id, name, timestamp, value
FROM LIVE.raw_data
WHERE value > 10;

LIVE.raw_data: Referencia a tabela live raw_data criada anteriormente no pipeline.

3. Aplicar Mudanças Incrementais com APPLY CHANGES

O comando APPLY CHANGES é usado para capturar mudanças (CDC - Change Data Capture) em dados.

APPLY CHANGES INTO LIVE.customer_data FROM STREAM(LIVE.raw_data)
KEYS (customer_id)
SEQUENCE BY timestamp
COLUMNS * EXCEPT (metadata)
STORED AS SCD TYPE 1;
  • STREAM(LIVE.raw_data): Processa a tabela raw_data em modo streaming.

  • KEYS: Coluna-chave usada para identificar registros únicos.

  • SEQUENCE BY: Coluna que define a ordenação dos eventos.

  • STORED AS SCD TYPE 1: Aplica a lógica de Slowly Changing Dimension Type 1, onde os registros antigos são sobrescritos.

    • SCD TYPE

      SCD Type Descrição *Quando Usar
      SCD Type 1 Sobrescreve dados antigos (sem histórico) Quando o histórico não é necessário (ex: correção de erros)
      SCD Type 2 Mantém o histórico com múltiplas versões Quando é importante rastrear alterações ao longo do tempo.
      SCD Type 3 Mantém apenas o valor atual e o anterior Quando você só precisa de um histórico limitado.

4. Tabela com Dados em Batch e Streaming

DLT permite o processamento híbrido (batch e streaming). O DLT trata automaticamente a ingestão de arquivos novos como streaming, mesmo que os arquivos venham em lotes (batch).

CREATE OR REFRESH LIVE TABLE hybrid_data
AS SELECT * FROM cloud_files("/input/hybrid", "csv", map("cloudFiles.inferSchema", "true"));
  • cloud_files(): Garante que arquivos novos e modificados sejam automaticamente identificados.
  • map(): Passa opções para o carregamento, como cloudFiles.inferSchema.

5. Exemplo Completo em SQL

-- Tabela raw em streaming
CREATE OR REFRESH STREAMING LIVE TABLE raw_data
AS SELECT * FROM cloud_files("/data/input/", "json");

-- Transformação na tabela raw
CREATE OR REFRESH LIVE TABLE cleaned_data
AS SELECT id, name, value
FROM LIVE.raw_data
WHERE value IS NOT NULL;

-- Aplicar mudanças incrementais
APPLY CHANGES INTO LIVE.final_data
FROM STREAM(LIVE.cleaned_data)
KEYS (id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

VERSIONAMENTO NO DATABRICKS

No Databricks, o conceito de viagem no tempo em Delta Tables permite consultar, restaurar ou inspecionar uma tabela em versões ou pontos anteriores no tempo. Isso é extremamente útil para auditoria de dados, análises de mudanças ou para desfazer operações

  1. Consultar por versão:

    SELECT * FROM table_name VERSION AS OF 5;  
    SELECT * FROM table_name@v5;
  2. Restaurar por versão:

    RESTORE TABLE table_name TO VERSION AS OF 5;
  3. Consultar por timestamp:

    SELECT * FROM table_name TIMESTAMP AS OF 'YYYY-MM-DDTHH:MM:SSZ';
  4. Restaurar por timestamp:

    RESTORE TABLE table_name TO TIMESTAMP AS OF 'YYYY-MM-DDTHH:MM:SSZ';
  5. Ver o histórico da tabela:

    DESCRIBE HISTORY table_name;
  • Benefícios da Viagem no Tempo
    1. Auditoria: Rastrear alterações na tabela.
    2. Recuperação de Dados: Restaurar dados após uma operação incorreta.
    3. Análise de Mudanças: Comparar versões anteriores da tabela para entender mudanças ao longo do tempo.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment