Partilhar via


Tutorial: Executar seu primeiro pipeline de DLT

Este tutorial leva você pelas etapas para configurar seu primeiro pipeline DLT, escrever código ETL básico e executar uma atualização de pipeline.

Todas as etapas neste tutorial são projetadas para espaços de trabalho com o Unity Catalog habilitado. Você também pode configurar pipelines DLT para trabalhar com o metastore herdado do Hive. Consulte Utilizar pipelines DLT com metastore herdado do Hive.

Observação

Este tutorial contém instruções para desenvolver e validar novo código de pipeline usando notebooks do Databricks. Você também pode configurar pipelines usando código-fonte em arquivos Python ou SQL.

Você pode configurar um pipeline para executar seu código se já tiver o código-fonte escrito usando a sintaxe DLT. Consulte Configurar um pipeline de DLT.

Você pode usar a sintaxe SQL totalmente declarativa no Databricks SQL para registrar e definir agendas de atualização para exibições materializadas e tabelas de streaming como objetos gerenciados pelo Unity Catalog. Consulte Utilizar vistas materializadas no Databricks SQL e Carregar dados usando tabelas de streaming no Databricks SQL.

Exemplo: Ingerir e processar dados de nomes de bebés de Nova Iorque

O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Este exemplo demonstra o uso de um pipeline DLT para:

  • Leia dados CSV brutos de um volume em uma tabela.
  • Leia os registros da tabela de ingestão e use DLT expectativas para criar uma nova tabela que contenha dados limpos.
  • Use os registros limpos como entrada para consultas DLT que criam conjuntos de dados derivados.

Este código demonstra um exemplo simplificado da arquitetura em medalhão. Veja O que é a arquitetura do medalhão lakehouse?.

Implementações deste exemplo são fornecidas para Python e SQL. Siga as etapas para criar um novo pipeline e bloco de anotações e, em seguida, copie e cole o código fornecido.

Exemplo blocos de anotações com código completo também são fornecidos.

Requerimentos

  • Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster que defina um cluster DLT. O runtime da DLT cria um cluster antes de executar a sua pipeline e falha se não tiver a permissão correta.
  • Todos os usuários podem disparar atualizações usando pipelines sem servidor por padrão. O Serverless deve estar habilitado no nível da conta e pode não estar disponível na região do espaço de trabalho. Consulte a secção Ativar computação sem servidor.
  • Os exemplos neste tutorial usam Unity Catalog. O Databricks recomenda a criação de um novo esquema para executar este tutorial, pois vários objetos de banco de dados são criados no esquema de destino.

    • Para criar um novo esquema em um catálogo, você deve ter privilégios de ALL PRIVILEGES ou USE CATALOG e CREATE SCHEMA.
    • Se não for possível criar um novo esquema, execute este tutorial em relação a um esquema existente. Você deve ter os seguintes privilégios:
      • USE CATALOG para o catálogo pai.
      • ALL PRIVILEGES ou USE SCHEMA, CREATE MATERIALIZED VIEWe CREATE TABLE privilégios no esquema de destino.
    • Este tutorial usa um volume para armazenar dados de exemplo. A Databricks recomenda a criação de um novo volume para este tutorial. Se você criar um novo esquema para este tutorial, poderá criar um novo volume nesse esquema.
      • Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
        • USE CATALOG para o catálogo pai.
        • ALL PRIVILEGES ou USE SCHEMA e CREATE VOLUME privilégios no esquema de destino.
      • Opcionalmente, você pode usar um volume existente. Você deve ter os seguintes privilégios:
        • USE CATALOG para o catálogo pai.
        • USE SCHEMA para o esquema pai.
        • ALL PRIVILEGES ou READ VOLUME e WRITE VOLUME no volume alvo.

    Para definir essas permissões, entre em contato com o administrador do Databricks. Para mais informações sobre os privilégios e objetos securitizáveis do Catálogo Unity, consulte .

Passo 0: Transferir dados

Este exemplo carrega dados de um volume do Catálogo Unity. O código a seguir baixa um arquivo CSV e o armazena no volume especificado. Abra um novo bloco de anotações e execute o seguinte código para baixar esses dados para o volume especificado:

import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity. O código fornecido tenta criar o esquema e o volume especificados se esses objetos não existirem. Você deve ter os privilégios apropriados para criar e gravar em objetos no Unity Catalog. Consulte Requisitos.

Observação

Certifique-se de que este bloco de notas foi executado com êxito antes de continuar com o tutorial. Não configure este notebook como parte do seu pipeline.

Etapa 1: Criar um pipeline

A DLT cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados código-fonte) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline.

Importante

Não configure nenhum ativo no campo do código-fonte. Deixar esse campo preto cria e configura um bloco de anotações para a criação do código-fonte.

As instruções neste tutorial usam computação sem servidor e Unity Catalog. Use as configurações padrão para todas as opções de configuração não especificadas nestas instruções.

Observação

Se serverless não estiver habilitado ou suportado em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Você deve selecionar manualmente Catálogo Unity em Opções de Armazenamento na seção Destino da interface do utilizador do Criar pipeline.

Para configurar um novo pipeline, faça o seguinte:

  1. Na barra lateral, clique DLT.
  2. Clique Criar pipeline.
  3. Em nome do pipeline, introduza um nome exclusivo para o pipeline.
  4. Marque a caixa de seleção Servidorless.
  5. No Destino, para configurar um local do Catálogo Unity onde as tabelas são publicadas, selecione um Catálogo e um Esquema .
  6. Em Avançado, clique em Adicionar configuração e depois defina parâmetros de pipeline para o catálogo, esquema e volume para os quais descarregou dados usando os seguintes nomes de parâmetro:
    • my_catalog
    • my_schema
    • my_volume
  7. Clique Criar.

A interface de utilizador dos pipelines aparece para o novo pipeline. Um bloco de anotações de código-fonte é criado e configurado automaticamente para o pipeline.

O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo correspondem ao nome do seu pipeline. Por exemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Um link para acessar este bloco de anotações está sob o campo do código-fonte no painel detalhes do Pipeline. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.

Etapa 2: Declarar visualizações materializadas e tabelas de streaming em um bloco de anotações com Python ou SQL

Você pode usar blocos de anotações Datbricks para desenvolver e validar interativamente o código-fonte para pipelines DLT. Você deve ligar o seu notebook ao pipeline para usar esta funcionalidade. Para anexar o seu caderno de notas recém-criado ao pipeline que acabou de criar:

  1. Clique em Connect no canto superior direito para abrir o menu de configuração de computação.
  2. Passe o cursor sobre o nome do pipeline criado na Etapa 1.
  3. Clique Ligar.

A interface do Utilizador muda para incluir os botões Validar e Iniciar no canto superior direito. Para saber mais sobre o suporte de notebooks para o desenvolvimento de código para pipelines, consulte Desenvolvimento e depuração de pipelines DLT em notebooks.

Importante

  • As pipelines DLT avaliam todas as células de um notebook durante a fase de planeamento. Ao contrário dos blocos de anotações que são executados em computação multiuso ou agendados como trabalhos, os pipelines não garantem que as células sejam executadas na ordem especificada.
  • Os blocos de notas só podem conter uma única linguagem de programação. Não misture Python e código SQL em blocos de anotações de código-fonte de pipeline.

Para obter detalhes sobre como desenvolver código com Python ou SQL, consulte Desenvolver código de pipeline com Python ou Desenvolver código de pipeline com SQL.

Exemplo de código de pipeline

Para implementar o exemplo neste tutorial, copie e cole o código a seguir em uma célula do bloco de anotações configurada como código-fonte para seu pipeline.

O código fornecido faz o seguinte:

  • Importa módulos necessários (somente Python).
  • Referencia parâmetros definidos durante a configuração do pipeline.
  • Define uma tabela de streaming chamada baby_names_raw que ingere a partir de um volume.
  • Define uma exibição materializada chamada baby_names_prepared que valida os dados ingeridos.
  • Define uma exibição materializada chamada top_baby_names_2021 que tem uma exibição altamente refinada dos dados.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Etapa 3: Iniciar uma atualização da linha de produção

Para iniciar uma atualização de pipeline, clique no botão Iniciar no canto superior direito da interface do bloco de anotações.

Exemplos de blocos de notas

Os blocos de anotações a seguir contêm os mesmos exemplos de código fornecidos neste artigo. Esses blocos de anotações têm os mesmos requisitos das etapas deste artigo. Consulte Requisitos.

Para importar um bloco de anotações, conclua as seguintes etapas:

  1. Abra a interface do usuário do bloco de anotações.
    • Clique em + Novo>Bloco de Anotações.
    • Um bloco de anotações vazio é aberto.
  2. Clique em Arquivo>Importar.... A caixa de diálogo Importar é exibida.
  3. Selecione a opção URL para Importar de.
  4. Cole o URL do bloco de notas.
  5. Clique Importar.

Este tutorial requer que você execute um bloco de anotações de configuração de dados antes de configurar e executar seu pipeline DLT. Importe o notebook seguinte, ligue o notebook a um recurso de computação, preencha as variáveis necessárias para my_catalog, my_schemae my_volumee clique em Executar tudo.

Tutorial sobre download de dados para pipelines

Obter bloco de notas

Os blocos de anotações a seguir fornecem exemplos em Python ou SQL. Ao importar um caderno, ele é guardado no diretório inicial do utilizador.

Depois de importar um dos notebooks abaixo, conclua as etapas para criar um pipeline, mas use o seletor de ficheiros Source code para selecionar o notebook baixado. Depois de criar o pipeline com um bloco de anotações configurado como código-fonte, clique em Iniciar na interface do usuário do pipeline para disparar uma atualização.

Introdução ao notebook DLT Python

Obter bloco de notas

Comece a utilizar o notebook SQL do DLT

Obter bloco de notas

Recursos adicionais