Compartilhar via


Tutorial: Executar um pipeline de análise do lakehouse de ponta a ponta

Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um lakehouse do Azure Databricks.

Importante

Este tutorial usa notebooks interativos para concluir tarefas ETL comuns no Python em clusters habilitados para o Catálogo do Unity. Se você não estiver usando o Catálogo do Unity, confira Executar sua primeira carga de trabalho de ETL no Azure Databricks.

Tarefas neste tutorial

Ao final deste artigo, você se sentirá confortável:

  1. Inicializar um cluster de cálculo habilitado para Catálogo do Unity.
  2. Criar um notebook do Databricks.
  3. Gravar e fazer a leitura d dados a partir de um local externo do Catálogo do Unity.
  4. Configurar a ingestão de dados incremental para uma tabela do Catálogo do Unity com o Carregador Automático.
  5. Executar células de notebook para processar, consultar e visualizar dados.
  6. Agendar um notebook como um trabalho do Databricks.
  7. Consultar tabelas do Catálogo do Unity do SQL do Databricks

O Azure Databricks fornece um conjunto de ferramentas prontas para produção que permitem aos profissionais de dados desenvolver e implantar rapidamente pipelines de ETL (extração, transformação e carregamento). O Catálogo do Unity permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda uma organização. O SQL do Databricks permite que os analistas executem consultas SQL nas mesmas tabelas usadas nas cargas de trabalho de ETL de produção, permitindo o business intelligence em tempo real em escala.

Use também o Delta Live Tables para criar pipelines de ETL. O Databricks criou o Delta Live Tables para reduzir a complexidade da criação, da implantação e da manutenção de pipelines de ETL de produção. Consulte Tutorial: como executar seu primeiro pipeline do Delta Live Tables.

Requisitos

Observação

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um cluster.

Etapa 1: criar um cluster

Para fazer a análise de dados exploratória e a engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

  1. Clique em compute iconComputação na barra lateral.
  2. Clique em Novo íconeNovo na barra lateral e selecione Cluster. Isso abre a página Novo cluster/computação.
  3. Especifique um nome exclusivo para o cluster.
  4. Selecione o botão de opção Nó Único.
  5. Selecione Usuário Único na lista suspensa do modo de acesso.
  6. Verifique se o endereço de email está visível no campo de Usuário Único.
  7. Selecione a versão de runtime do Databricks desejada, 11.1 ou superior para usar o Catálogo do Unity.
  8. Clique em Criar computação para criar o cluster.

Para saber mais sobre clusters do Databricks, confira Computação.

Etapa 2: criar um notebook do Databricks

Para criar um bloco de anotações em seu workspace, clique em ícone NovoNovo na barra lateral e clique em Notebook. Um notebook em branco é aberto no workspace.

Para saber mais sobre como criar e gerenciar notebooks, consulte Gerenciar notebooks.

Etapa 3: gravar e ler dados de um local externo gerenciado pelo Catálogo do Unity

O Databricks recomenda o uso do Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem.

Use o Catálogo do Unity para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com permissões READ FILES em um local externo podem usar o Carregador Automático para ingerir dados.

Normalmente, os dados chegarão em um local externo devido a gravações de outros sistemas. Nesta demonstração, você pode simular a chegada de dados gravando arquivos JSON em um local externo.

Copie o código abaixo em uma célula do notebook. Substitua o valor da cadeia de caracteres por catalog com o nome de um catálogo com as permissões CREATE CATALOG e USE CATALOG. Substitua o valor da cadeia de caracteres por external_location com o caminho de um local externo pelas permissões READ FILES, WRITE FILES e CREATE EXTERNAL TABLE.

Os locais externos podem ser definidos como um contêiner de armazenamento inteiro, mas geralmente apontam para um diretório aninhado em um contêiner.

O formato correto para um caminho de local externo é "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

A execução dessa célula deve imprimir uma linha em que se lê "12 bytes", imprimir a cadeia de caracteres "Olá, mundo!", e exibir todos os bancos de dados presentes no catálogo fornecido. Se você não conseguir que essa célula seja executada, confirme se você está em um workspace habilitado para o Catálogo do Unity e solicite permissões adequadas do administrador do workspace para concluir este tutorial.

O código Python abaixo usa seu endereço de email para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo no local externo fornecido. A execução dessa célula removerá todos os dados associados a este tutorial, permitindo que você execute este exemplo de maneira idempotente. Uma classe é definida e criará uma instância que você usará para simular lotes de dados que chegam de um sistema conectado ao seu local externo de origem.

Copie esse código para uma nova célula em seu notebook e execute-o para configurar seu ambiente.

Observação

As variáveis definidas nesse código devem permitir que você o execute com segurança sem risco de conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código, por isso contate o administrador do workspace para solucionar problemas dessas restrições.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Agora você pode colocar um lote de dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente essa célula até 60 vezes para disparar a chegada de novos dados.

RawData.land_batch()

Etapa 4: configurar o Carregador Automático para ingerir dados no Catálogo do Unity

O Databricks recomenda armazenar dados com o Delta Lake. O Delta Lake é uma camada de armazenamento código aberto que fornece transações ACID e habilita o data lakehouse. O Delta Lake é o formato padrão para tabelas criadas no Databricks.

A fim de configurar o Carregador Automático para ingerir dados em uma tabela do Catálogo do Unity, copie e cole o seguinte código em uma célula vazia em seu notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Para saber mais sobre o Carregador Automático, confira O que é o Carregador Automático?.

Para saber mais sobre o fluxo estruturado com o Catálogo do Unity, consulte Usando o Catálogo do Unity com fluxo estruturado.

Etapa 5: processar e interagir com os dados

Os notebooks executam célula por célula lógica. Use estas etapas para executar a lógica em sua célula:

  1. Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER.

  2. Para consultar a tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    df = spark.read.table(table)
    
  3. Para visualizar os dados em seu DataFrame, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    display(df)
    

Para saber mais sobre as opções interativas para visualização de dados, consulte Visualizações em notebooks de Databricks.

Etapa 6: agendar um trabalho

Você pode executar notebooks do Databricks como scripts de produção adicionando-os como uma tarefa em um trabalho do Databricks. Nesta etapa, você criará um novo trabalho que poderá disparar manualmente.

Para agendar seu notebook como uma tarefa:

  1. Clique em Agendamento no lado direito da barra de cabeçalho.
  2. Insira um nome exclusivo para o nome do trabalho.
  3. Clique em Manual.
  4. Na lista suspensa cluster, selecione o cluster que você criou na etapa 1.
  5. Clique em Criar.
  6. Na janela exibida, clique em Executar agora.
  7. Para consultar os resultados de execução de trabalho, clique no ícone Link Externo próximo ao timestamp de Última execução.

Para obter mais informações sobre trabalhos, consulte O que são trabalhos?.

Etapa 7: tabela de consulta do SQL do Databricks

Qualquer pessoa com a permissão USE CATALOG no catálogo atual, a permissão USE SCHEMA no esquema atual e a permissão SELECT na tabela pode consultar o conteúdo da tabela de sua API do Databricks preferida.

Você precisa de acesso a um SQL Warehouse em execução para executar consultas no SQL do Databricks.

A tabela que você criou anteriormente neste tutorial tem o nome target_table. Você pode consultá-lo usando o catálogo fornecido na primeira célula e o banco de dados com o patern e2e_lakehouse_<your-username>. Você pode usar o Explorador de Catálogos para localizar os objetos de dados criados.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com o Azure Databricks: