Parameters gebruiken met Delta Live Tables-pijplijnen
In dit artikel wordt uitgelegd hoe u pijplijnconfiguraties van Delta Live Tables kunt gebruiken om pijplijncode te parameteriseren.
Referentieparameters
Tijdens updates heeft uw pijplijnbroncode toegang tot pijplijnparameters met behulp van de syntaxis om waarden voor Spark-configuraties op te halen.
U verwijst naar pijplijnparameters met behulp van de sleutel. De waarde wordt als tekenreeks in uw broncode geïnjecteerd voordat de broncodelogica wordt geëvalueerd.
In de volgende voorbeeldsyntaxis wordt een parameter met sleutel source_catalog
en waarde dev_catalog
gebruikt om de gegevensbron voor een gerealiseerde weergave op te geven:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Parameters instellen
Parameters doorgeven aan pijplijnen door willekeurige sleutel-waardeparen door te geven als configuraties voor de pijplijn. U kunt parameters instellen tijdens het definiëren of bewerken van een pijplijnconfiguratie met behulp van de gebruikersinterface van de werkruimte of JSON. Zie Een Delta Live Tables-pijplijn configureren.
Pijplijnparametersleutels mogen alleen alfanumerieke tekens bevatten _ - .
. Parameterwaarden worden ingesteld als tekenreeksen.
Pijplijnparameters bieden geen ondersteuning voor dynamische waarden. U moet de waarde bijwerken die is gekoppeld aan een sleutel in de pijplijnconfiguratie.
Belangrijk
Gebruik geen trefwoorden die conflicteren met gereserveerde pijplijn of Apache Spark-configuratiewaarden.
Gegevenssetdeclaraties parameteriseren in Python of SQL
De Python- en SQL-code waarmee uw gegevenssets worden gedefinieerd, kunnen worden geparameteriseerd door de instellingen van de pijplijn. Met parameterisatie worden de volgende use cases ingeschakeld:
- Lange paden en andere variabelen scheiden van uw code.
- Het verminderen van de hoeveelheid gegevens die in ontwikkel- of faseringsomgevingen worden verwerkt om het testen te versnellen.
- Dezelfde transformatielogica hergebruiken om te verwerken vanuit meerdere gegevensbronnen.
In het volgende voorbeeld wordt de startDate
configuratiewaarde gebruikt om de ontwikkelingspijplijn te beperken tot een subset van de invoergegevens:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Gegevensbronnen beheren met parameters
U kunt pijplijnparameters gebruiken om verschillende gegevensbronnen op te geven in verschillende configuraties van dezelfde pijplijn.
U kunt bijvoorbeeld verschillende paden opgeven voor ontwikkelings-, test- en productieconfiguraties voor een pijplijn met behulp van de variabele data_source_path
en deze vervolgens verwijzen met behulp van de volgende code:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Dit patroon is nuttig voor het testen van de manier waarop opnamelogica schema- of onjuiste gegevens kan verwerken tijdens de eerste opname. U kunt de identieke code in alle omgevingen in alle omgevingen gebruiken terwijl u gegevenssets uitschakelt.