Partilhar via


Desenvolva código de pipeline com Python

DLT introduz várias novas construções de código Python para definir visualizações materializadas e tabelas de streaming em pipelines. O suporte Python para o desenvolvimento de pipelines baseia-se nos conceitos básicos do PySpark DataFrame e APIs de Streaming Estruturado.

Para usuários não familiarizados com Python e DataFrames, o Databricks recomenda o uso da interface SQL. Consulte Desenvolver código de pipeline com SQL.

Para obter uma referência completa da sintaxe DLT Python, consulte Referência da linguagem DLT Python.

Noções básicas de Python para desenvolvimento de pipeline

O código Python que cria conjuntos de dados DLT deve retornar DataFrames.

Todas as APIs Python DLT são implementadas no módulo dlt. Seu código de pipeline DLT implementado com Python deve importar explicitamente o módulo dlt na parte superior dos blocos de anotações e arquivos Python.

Lê e grava o padrão no catálogo e no esquema especificados durante a configuração do pipeline. Consulte Definir o catálogo de destino e o esquema.

O código Python específico do DLT difere de outros tipos de código Python de uma maneira crítica: o código de pipeline Python não chama diretamente as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados DLT. Em vez disso, a DLT interpreta as funções do decorador a partir do módulo dlt em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, consulte a referência de Python .

Crie uma visualização materializada ou uma tabela de streaming com Python

O decorador de @dlt.table diz à DLT para criar uma vista materializada ou uma tabela de streaming com base nos resultados retornados por uma função. Os resultados de uma leitura em lote criam uma exibição materializada, enquanto os resultados de uma leitura de streaming criam uma tabela de streaming.

Por padrão, os nomes de tabela de exibição materializada e streaming são inferidos a partir de nomes de funções. O exemplo de código a seguir mostra a sintaxe básica para criar uma exibição materializada e uma tabela de streaming:

Observação

Ambas as funções fazem referência à mesma tabela no catálogo de samples e usam a mesma função decoradora. Esses exemplos destacam que a única diferença na sintaxe básica para exibições materializadas e tabelas de streaming é usar spark.read versus spark.readStream.

Nem todas as fontes de dados suportam leituras de streaming. Algumas fontes de dados devem ser sempre processadas com semântica de streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, você pode especificar o nome da tabela usando o argumento name no decorador de @dlt.table. O exemplo a seguir demonstra esse padrão para uma exibição materializada e uma tabela de streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carregar dados do armazenamento de objetos

A DLT suporta o carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

Observação

Estes exemplos usam dados disponíveis sob o /databricks-datasets montados automaticamente em seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte O que são os volumes do Catálogo Unity?.

O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Veja O que é Auto Loader?.

O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Valide dados com expectativas

Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerenciar a qualidade dos dados com as expectativas do pipeline.

O código a seguir usa @dlt.expect_or_drop para definir uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consultar visões materializadas e tabelas de streaming definidas no seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma vista materializada chamada customer_orders que une registos dos conjuntos de dados orders e customers, converte o carimbo de data/hora da encomenda numa data e seleciona os campos customer_id, order_number, statee order_date.
  • Uma visão materializada denominada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.

Observação

Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são escritas e lidas do catálogo e esquema padrão configurados para seu pipeline.

O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe do esquema LIVE é silenciosamente ignorada. Consulte o esquema legado LIVE .

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Criar tabelas em um loop de for

Você pode usar loops Python for para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam em apenas alguns parâmetros, resultando em menos código total para manter e menos redundância de código.

O loop for avalia a lógica em ordem serial, mas assim que o planejamento for concluído para os conjuntos de dados, o pipeline executa a lógica em paralelo.

Importante

Ao usar esse padrão para definir conjuntos de dados, certifique-se de que a lista de valores passados para o loop for seja sempre aditiva. Se um conjunto de dados definido anteriormente em um pipeline for omitido de uma execução de pipeline futura, esse conjunto de dados será descartado automaticamente do esquema de destino.

O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome das exibições materializadas de destino e filtrar os dados de origem. As visualizações temporárias são usadas para definir junções das tabelas de origem usadas na construção das exibições materializadas finais.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Segue-se um exemplo do gráfico de fluxo de dados para este pipeline:

Um gráfico de fluxo de dados de duas visualizações que levam a cinco tabelas regionais.

Solução de problemas: for loop cria muitas tabelas com os mesmos valores

O modelo de execução lento que os pipelines usam para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dlt.table() é invocada.

O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um loop for. Em ambos os exemplos, cada nome de tabela da lista de tables é explicitamente referenciado dentro da função decorada por @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

O exemplo a seguir não faz referência aos valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no loop for:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)