Partilhar via


Execute sua primeira carga de trabalho ETL no Azure Databricks

Saiba como usar ferramentas prontas para produção do Azure Databricks para desenvolver e implantar seus primeiros pipelines de extração, transformação e carregamento (ETL) para orquestração de dados.

Ao final deste artigo, você se sentirá confortável:

  1. Iniciando um cluster de computação multiuso Databricks.
  2. Criando um bloco de anotações Databricks.
  3. Configuração da ingestão incremental de dados para Delta Lake com Auto Loader.
  4. Executar células do bloco de notas para processar, consultar e pré-visualizar dados.
  5. Agendando um bloco de anotações como um trabalho do Databricks.

Este tutorial usa blocos de anotações interativos para concluir tarefas comuns de ETL em Python ou Scala.

Você também pode usar Delta Live Tables para criar pipelines ETL. A Databricks criou Delta Live Tables para reduzir a complexidade da construção, implantação e manutenção de pipelines ETL de produção. Consulte Tutorial: Execute seu primeiro pipeline Delta Live Tables.

Você também pode usar o provedor Databricks Terraform para criar os recursos deste artigo. Consulte Criar clusters, blocos de anotações e trabalhos com o Terraform.

Requisitos

Nota

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um cluster.

Etapa 1: Criar um cluster

Para fazer análise de dados exploratória e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

  1. Clique em ícone de computaçãoCalcular na barra lateral.
  2. Na página Computação, clique em Criar Cluster. Isso abre a página Novo cluster.
  3. Especifique um nome exclusivo para o cluster, deixe os valores restantes em seu estado padrão e clique em Criar Cluster.

Para saber mais sobre clusters Databricks, consulte Computação.

Etapa 2: Criar um bloco de anotações Databricks

Para criar um bloco de notas na sua área de trabalho, clique Novo íconeem Novo na barra lateral e, em seguida, clique em Bloco de Notas. Um bloco de anotações em branco é aberto no espaço de trabalho.

Para saber mais sobre como criar e gerir blocos de notas, consulte Gerir blocos de notas.

Etapa 3: Configurar o carregador automático para ingerir dados no Delta Lake

O Databricks recomenda o uso do Auto Loader para ingestão incremental de dados. O Auto Loader deteta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

A Databricks recomenda o armazenamento de dados com o Delta Lake. Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato padrão para tabelas criadas no Databricks.

Para configurar o Auto Loader para ingerir dados em uma tabela Delta Lake, copie e cole o seguinte código na célula vazia do seu bloco de anotações:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Nota

As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de conflito com ativos de espaço de trabalho existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do espaço de trabalho para solucionar essas restrições.

Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?.

Etapa 4: Processar e interagir com os dados

Os blocos de anotações executam lógica célula a célula. Para executar a lógica na célula:

  1. Para executar a célula concluída na etapa anterior, selecione-a e pressione SHIFT+ENTER.

  2. Para consultar a tabela que acabou de criar, copie e cole o código seguinte numa célula vazia e, em seguida, prima SHIFT+ENTER para executar a célula.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Para visualizar os dados em seu DataFrame, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    Python

    display(df)
    

    Scala

    display(df)
    

Para saber mais sobre as opções interativas para visualizar dados, consulte Visualizações em blocos de anotações Databricks.

Etapa 5: Agendar um trabalho

Você pode executar blocos de anotações Databricks como scripts de produção adicionando-os como uma tarefa em um trabalho Databricks. Nesta etapa, você criará um novo trabalho que poderá ser acionado manualmente.

Para agendar o seu bloco de notas como uma tarefa:

  1. Clique em Agendar no lado direito da barra de cabeçalho.
  2. Insira um nome exclusivo para o nome do trabalho.
  3. Clique em Manual.
  4. Na lista suspensa Cluster, selecione o cluster criado na etapa 1.
  5. Clique em Criar.
  6. Na janela apresentada, clique em Executar agora.
  7. Para ver os resultados da execução do trabalho, clique no Link externo ícone ao lado do carimbo de data/hora da última execução .

Para obter mais informações sobre trabalhos, consulte O que são trabalhos do Databricks?.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com o Azure Databricks: