Delen via


Sql-taalreferentie voor Delta Live Tables

In dit artikel vindt u meer informatie over de SQL-programmeerinterface van Delta Live Tables.

U kunt door de gebruiker gedefinieerde Python-functies (UDF's) in uw SQL-query's gebruiken, maar u moet deze UDF's definiëren in Python-bestanden voordat u ze aanroept in SQL-bronbestanden. Zie door de gebruiker gedefinieerde scalaire functies - Python.

Beperkingen

De PIVOT component wordt niet ondersteund. De pivot-bewerking in Spark vereist het gretige laden van invoergegevens om het uitvoerschema te berekenen. Deze mogelijkheid wordt niet ondersteund in Delta Live Tables.

Een gerealiseerde weergave of streamingtabel voor Delta Live-tabellen maken

Notitie

De CREATE OR REFRESH LIVE TABLE syntaxis voor het maken van een gerealiseerde weergave is afgeschaft. Gebruik in plaats daarvan CREATE OR REFRESH MATERIALIZED VIEW.

U gebruikt dezelfde eenvoudige SQL-syntaxis bij het declareren van een streamingtabel of een gerealiseerde weergave.

Een gematerialiseerde weergave voor Delta Live Tables definiëren met SQL

Hier volgt een beschrijving van de syntaxis voor het declareren van een gerealiseerde weergave in Delta Live Tables met SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Een streamingtabel voor Delta Live Tables declareren met SQL

U kunt alleen streamingtabellen declareren met behulp van query's die worden gelezen op basis van een streamingbron. Databricks raadt het gebruik van automatische laadprogramma's aan voor het streamen van bestanden uit de opslag van cloudobjecten. Zie de SQL-syntaxis van het automatisch laden.

Wanneer u andere tabellen of weergaven in uw pijplijn opgeeft als streamingbronnen, moet u de STREAM() functie rond een naam van een gegevensset opnemen.

Hieronder wordt de syntaxis beschreven voor het declareren van een streamingtabel in Delta Live Tables met SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Een Delta Live Tables-weergave maken

Hier volgt een beschrijving van de syntaxis voor het declareren van weergaven met SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-syntaxis voor automatisch laden

Hier volgt een beschrijving van de syntaxis voor het werken met automatisch laden in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

U kunt ondersteunde indelingsopties gebruiken met autolaadprogramma's. Met behulp van de map() functie kunt u opties doorgeven aan de read_files() methode. Opties zijn sleutel-waardeparen, waarbij de sleutels en waarden tekenreeksen zijn. Zie De bestandsindelingsopties voor meer informatie over ondersteuningsindelingen en opties.

Voorbeeld: Tabellen definiëren

U kunt een gegevensset maken door gegevens te lezen uit een externe gegevensbron of uit gegevenssets die zijn gedefinieerd in een pijplijn. Als u wilt lezen uit een interne gegevensset, geeft u de tabelnaam op die gebruikmaakt van de geconfigureerde pijplijnstandaarden voor catalogus en schema. In het volgende voorbeeld worden twee verschillende gegevenssets gedefinieerd: een tabel met de naam taxi_raw die een JSON-bestand als invoerbron gebruikt en een tabel met de naam filtered_data die de taxi_raw tabel als invoer gebruikt:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Voorbeeld: Lezen uit een streamingbron

Als u gegevens wilt lezen uit een streamingbron, bijvoorbeeld Automatisch laden of een interne gegevensset, definieert u een STREAMING tabel:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Zie Gegevens transformeren met Delta Live Tablesvoor meer informatie over het streamen van gegevens.

Beheersen hoe tabellen worden gematerialiseerd

Tabellen bieden ook extra controle over hun materialisatie:

  • Geef op hoe tabellen worden gepartitioneerd met behulp van PARTITIONED BY. U kunt partitionering gebruiken om query's te versnellen.
  • U kunt tabeleigenschappen instellen met TBLPROPERTIES. Zie tabeleigenschappenvan Delta Live Tables.
  • Stel een opslaglocatie in met behulp van de LOCATION-instelling. Tabelgegevens worden standaard opgeslagen in de opslaglocatie voor pijplijnen als LOCATION niet is ingesteld.
  • U kunt gegenereerde kolommen in uw schemadefinitie gebruiken. Zie Voorbeeld: Een schema en partitiekolommen opgeven.

Notitie

Voor tabellen met een grootte van minder dan 1 TB raadt Databricks aan om delta livetabellen de gegevensorganisatie te laten beheren. Tenzij u verwacht dat uw tabel groter wordt dan een terabyte, raadt Databricks aan dat u geen partitiekolommen opgeeft.

voorbeeld: een schema en partitiekolommen opgeven

U kunt desgewenst een schema opgeven wanneer u een tabel definieert. In het volgende voorbeeld wordt het schema voor de doeltabel opgegeven, waaronder het gebruik van Delta Lake gegenereerde kolommen en het definiëren van partitiekolommen voor de tabel:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Delta Live Tables geeft standaard het schema af van de table definitie als u geen schema opgeeft.

voorbeeld: tabelbeperkingen definiëren

Notitie

Delta Live Tables-ondersteuning voor tabelbeperkingen bevindt zich in publieke preview. Als u tabelbeperkingen wilt definiëren, moet uw pijplijn een Unity Catalog-pijplijn zijn en zijn geconfigureerd voor het gebruik van het preview-kanaal.

Wanneer u een schema opgeeft, kunt u primaire en vreemde sleutels definiëren. De beperkingen zijn informatief en worden niet afgedwongen. Zie de CONSTRAINT clausule in de SQL-taalreferentie.

In het volgende voorbeeld wordt een tabel met een primaire en vreemde-sleutelbeperking gedefinieerd.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Waarden parameteriseren die worden gebruikt bij het declareren van tabellen of weergaven met SQL

Gebruik SET om een configuratiewaarde op te geven in een query die een tabel of weergave declareert, inclusief Spark-configuraties. Elke tabel of weergave die u in een notitieblok definieert nadat de instructie SET toegang heeft tot de gedefinieerde waarde. Spark-configuraties die zijn opgegeven met behulp van de SET-instructie, worden gebruikt bij het uitvoeren van de Spark-query voor een tabel of weergave na de SET-instructie. Als u een configuratiewaarde in een query wilt lezen, gebruikt u de syntaxis ${}van de tekenreeksinterpolatie. In het volgende voorbeeld wordt een Spark-configuratiewaarde met de naam ingesteld startDate en wordt die waarde in een query gebruikt:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Als u meerdere configuratiewaarden wilt opgeven, gebruikt u een afzonderlijke SET-instructie voor elke waarde.

voorbeeld: Een rijfilter en kolommasker definiëren

Belangrijk

Rijfilters en kolommaskers bevinden zich in openbare testversie.

Als u een gerealiseerde weergave of streamingtabel wilt maken met een rijfilter en kolommasker, gebruikt u de ROW FILTER component en de MASK-component. In het volgende voorbeeld ziet u hoe u een gerealiseerde weergave en een streamingtabel definieert met zowel een rijfilter als een kolommasker:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

Zie Tabellen publiceren met rijfilters en kolommaskersvoor meer informatie over rijfilters en kolommaskers.

SQL-eigenschappen

CREATE TABLE of WEERGAVE
TEMPORARY

Maak een tabel, maar publiceer geen metagegevens voor de tabel. Met de TEMPORARY-component worden Delta Live-tabellen geïnstrueerd om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, blijft een tijdelijke tabel behouden gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.
STREAMING

Maak een tabel die een invoergegevensset leest als een stroom. De invoergegevensset moet een streaminggegevensbron zijn, bijvoorbeeld Automatisch laden of een STREAMING tabel.
CLUSTER BY

Schakel vloeistofclustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clustersleutels.

Zie Liquid Clustering gebruiken voor Delta-tabellen.
PARTITIONED BY

Een optionele lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel.
LOCATION

Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn.
COMMENT

Een optionele beschrijving voor de tabel.
column_constraint

Een optionele informatieve primaire sleutel of buitenlandse sleutelbeperking op de kolom.
MASK clause (Openbare preview)

Voegt een kolommaskerfunctie toe om gevoelige gegevens anoniem te maken. Toekomstige query's voor die kolom retourneren het resultaat van de geëvalueerde functie in plaats van de oorspronkelijke waarde van de kolom. Dit is handig voor fijnmazig toegangsbeheer, omdat de functie de identiteit en groepslidmaatschappen van de gebruiker kan controleren om te bepalen of de waarde moet worden bewerkt.

Zie kolommaskerclausule.
table_constraint

Een optionele informatieve primaire of vreemde-sleutelbeperking op de tabel.
TBLPROPERTIES

Een optionele lijst met tabeleigenschappen voor de tabel.
WITH ROW FILTER clause (Openbare preview)

Hiermee voegt u een rijfilterfunctie toe aan de tabel. Toekomstige query's voor die tabel ontvangen een subset van de rijen waarvoor de functie de waarde WAAR oplevert. Dit is handig voor gedetailleerd toegangsbeheer, omdat hiermee de functie de identiteit en groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of bepaalde rijen moeten worden gefilterd.

Zie ROW FILTER clausule.
select_statement

Een Delta Live Tables-query waarmee de gegevensset voor de tabel wordt gedefinieerd.
CONSTRAINT-clausule
EXPECT expectation_name

Beperking voor gegevenskwaliteit definiëren expectation_name. Als de ON VIOLATION beperking niet is gedefinieerd, voegt u rijen toe die de beperking aan de doelgegevensset schenden.
ON VIOLATION

Optionele actie die moet worden uitgevoerd voor mislukte rijen:

- FAIL UPDATE: Stop de uitvoering van pijplijnen onmiddellijk.
- DROP ROW: Verwijder de record en ga verder met verwerken.

Gegevensvastlegging wijzigen met SQL in Delta Live Tables

Gebruik de APPLY CHANGES INTO-instructie om de CDC-functionaliteit van Delta Live Tables te gebruiken, zoals wordt beschreven in het volgende:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

U definieert beperkingen voor gegevenskwaliteit voor een APPLY CHANGES doel met dezelfde CONSTRAINT component als niet-query'sAPPLY CHANGES . Zie Gegevenskwaliteit beheren met de verwachtingen van pijplijnen.

Notitie

Het standaardgedrag voor INSERT- en UPDATE-gebeurtenissen is het -toevoegen of bijwerken van-CDC-gebeurtenissen uit de bron: bijwerken van alle rijen die overeenkomen met de opgegeven sleutel(en) in de doeltabel of een nieuwe rij invoegen wanneer er geen overeenkomstige rij in de doeltabel aanwezig is. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN voorwaarde.

Belangrijk

U moet een streamingdoeltabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de APPLY CHANGES doeltabel opgeeft, moet u ook de kolommen __START_AT en __END_AT met hetzelfde gegevenstype opnemen als het sequence_by veld.

Zie De API's VOOR HET TOEPASSEN VAN WIJZIGINGEN: Het vereenvoudigen van het vastleggen van wijzigingsgegevens met Delta Live Tables.

Clausules
KEYS

De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.

Als u een combinatie van kolommen wilt definiëren, gebruikt u een door komma's gescheiden lijst met kolommen.

Deze component is vereist.
IGNORE NULL UPDATES

Toestaan dat updates worden ingevoerd die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en IGNORE NULL UPDATES is opgegeven, behouden kolommen met een null hun bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met een waarde van null.

Deze component is optioneel.

De standaardinstelling is om bestaande kolommen te overschrijven met null waarden.
APPLY AS DELETE WHEN

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE upsert in plaats van een upsert. Als u niet-volgorde gegevens wilt verwerken, wordt de verwijderde rij tijdelijk als marker bewaard in de onderliggende Delta-tabel, en wordt er een weergave in de metastore gemaakt die deze markers uitfiltert. Het bewaarinterval kan worden geconfigureerd met de
pipelines.cdc.tombstoneGCThresholdInSeconds tabeleigenschap.

Deze component is optioneel.
APPLY AS TRUNCATE WHEN

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE. Omdat deze clausule een volledige truncatie van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke gebruiksgevallen die deze functionaliteit vereisen.

De APPLY AS TRUNCATE WHEN component wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor de afkappende bewerking.

Deze component is optioneel.
SEQUENCE BY

De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. Delta Live Tables maakt gebruik van deze sequentiëring om wijzigingsgebeurtenissen af te handelen die niet op volgorde aankomen.

De opgegeven kolom moet een sorteerbaar gegevenstype zijn.

Deze component is vereist.
COLUMNS

Hiermee geeft u een subset van kolommen op die moeten worden opgenomen in de doeltabel. U hebt de volgende mogelijkheden:

- Geef de volledige lijst met kolommen op die moeten worden opgenomen: COLUMNS (userId, name, city).
- Geef een lijst met kolommen op die moeten worden uitgesloten: COLUMNS * EXCEPT (operation, sequenceNum)

Deze component is optioneel.

De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer de COLUMNS component niet is opgegeven.
STORED AS

Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.

Deze component is optioneel.

De standaardwaarde is SCD type 1.
TRACK HISTORY ON

Hiermee geeft u een subset van uitvoerkolommen op voor het genereren van geschiedenisrecords wanneer er wijzigingen zijn in die opgegeven kolommen. U hebt de volgende mogelijkheden:

- Geef de volledige lijst met kolommen op die moeten worden bijgehouden: COLUMNS (userId, name, city).
- Geef een lijst met kolommen op die moeten worden uitgesloten van het bijhouden: COLUMNS * EXCEPT (operation, sequenceNum)

Deze component is optioneel. De standaardinstelling is het bijhouden van de geschiedenis voor alle uitvoerkolommen wanneer er wijzigingen zijn, gelijk aan TRACK HISTORY ON *.