Delen via


Parameters gebruiken met Delta Live Tables-pijplijnen

In dit artikel wordt uitgelegd hoe u pijplijnconfiguraties van Delta Live Tables kunt gebruiken om pijplijncode te parameteriseren.

Referentieparameters

Tijdens updates heeft uw pijplijnbroncode toegang tot pijplijnparameters met behulp van de syntaxis om waarden voor Spark-configuraties op te halen.

U verwijst naar pijplijnparameters met behulp van de sleutel. De waarde wordt als tekenreeks in uw broncode geïnjecteerd voordat de broncodelogica wordt geëvalueerd.

In de volgende voorbeeldsyntaxis wordt een parameter met sleutel source_catalog en waarde dev_catalog gebruikt om de gegevensbron voor een gerealiseerde weergave op te geven:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Parameters instellen

Parameters doorgeven aan pijplijnen door willekeurige sleutel-waardeparen door te geven als configuraties voor de pijplijn. U kunt parameters instellen tijdens het definiëren of bewerken van een pijplijnconfiguratie met behulp van de gebruikersinterface van de werkruimte of JSON. Zie Een Delta Live Tables-pijplijn configureren.

Pijplijnparametersleutels mogen alleen alfanumerieke tekens bevatten _ - . . Parameterwaarden worden ingesteld als tekenreeksen.

Pijplijnparameters bieden geen ondersteuning voor dynamische waarden. U moet de waarde bijwerken die is gekoppeld aan een sleutel in de pijplijnconfiguratie.

Belangrijk

Gebruik geen trefwoorden die conflicteren met gereserveerde pijplijn of Apache Spark-configuratiewaarden.

Gegevenssetdeclaraties parameteriseren in Python of SQL

De Python- en SQL-code waarmee uw gegevenssets worden gedefinieerd, kunnen worden geparameteriseerd door de instellingen van de pijplijn. Met parameterisatie worden de volgende use cases ingeschakeld:

  • Lange paden en andere variabelen scheiden van uw code.
  • Het verminderen van de hoeveelheid gegevens die in ontwikkel- of faseringsomgevingen worden verwerkt om het testen te versnellen.
  • Dezelfde transformatielogica hergebruiken om te verwerken vanuit meerdere gegevensbronnen.

In het volgende voorbeeld wordt de startDate configuratiewaarde gebruikt om de ontwikkelingspijplijn te beperken tot een subset van de invoergegevens:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Gegevensbronnen beheren met parameters

U kunt pijplijnparameters gebruiken om verschillende gegevensbronnen op te geven in verschillende configuraties van dezelfde pijplijn.

U kunt bijvoorbeeld verschillende paden opgeven voor ontwikkelings-, test- en productieconfiguraties voor een pijplijn met behulp van de variabele data_source_path en deze vervolgens verwijzen met behulp van de volgende code:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Dit patroon is nuttig voor het testen van de manier waarop opnamelogica schema- of onjuiste gegevens kan verwerken tijdens de eerste opname. U kunt de identieke code in alle omgevingen in alle omgevingen gebruiken terwijl u gegevenssets uitschakelt.