Dela via


Referens för DLT SQL-språk

Den här artikeln innehåller information om programmeringsgränssnittet för DLT SQL.

Du kan använda användardefinierade Python-funktioner (UDF: er) i dina SQL-frågor, men du måste definiera dessa UDF:er i Python-filer innan du anropar dem i SQL-källfiler. Se användardefinierade skalärfunktioner – Python.

Begränsningar

Satsen PIVOT stöds inte. Den pivot åtgärden i Spark kräver ivrig inläsning av indata för att beräkna utdataschemat. Den här funktionen stöds inte i DLT.

Skapa en materialiserad DLT-vy eller strömningstabell

Note

Syntaxen CREATE OR REFRESH LIVE TABLE för att skapa en materialiserad vy är inaktuell. Använd i stället CREATE OR REFRESH MATERIALIZED VIEW.

Du använder samma grundläggande SQL-syntax när du deklarerar en strömmande tabell eller en materialiserad vy.

Deklarera en materialiserad DLT-vy med SQL

Följande beskriver syntaxen för att deklarera en materialiserad vy i DLT med SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Deklarera en DLT-direktuppspelningstabell med SQL

Du kan bara deklarera strömmande tabeller med hjälp av frågor som läser från en strömmande källa. Databricks rekommenderar att du använder Auto Loader för strömmande inmatning av filer från molnobjektlagring. Se SQL-syntaxen för Auto Loader.

När du anger andra tabeller eller vyer i pipelinen som strömningskällor måste du ta med funktionen STREAM() runt ett datauppsättningsnamn.

Följande beskriver syntaxen för att deklarera en strömningstabell i DLT med SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Skapa en DLT-vy

Följande beskriver syntaxen för att deklarera vyer med SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-syntax för automatisk inläsning

Följande beskriver syntaxen för att arbeta med automatisk inläsning i SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

Du kan använda formatalternativ som stöds med Auto Loader. Med hjälp av funktionen map() kan du skicka alternativ till metoden read_files(). Alternativen är nyckel/värde-par, där nycklar och värden är strängar. Mer information om stödformat och alternativ finns i Filformatalternativ.

Exempel: Definiera tabeller

Du kan skapa en datauppsättning genom att läsa från en extern datakälla eller från datauppsättningar som definierats i en pipeline. Om du vill läsa från en intern datauppsättning anger du tabellnamnet som ska använda de konfigurerade pipelinestandarderna för katalog och schema. I följande exempel definieras två olika datauppsättningar: en tabell med namnet taxi_raw som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data som tar tabellen taxi_raw som indata:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Exempel: Läsa från en strömmande källa

Om du vill läsa data från en strömmande källa, till exempel Automatisk inläsare eller en intern datauppsättning, definierar du en STREAMING tabell:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Mer information om strömmande data finns i Transformera data med pipelines.

Ta bort poster permanent från en materialiserad vy eller direktuppspelningstabell

Om du vill ta bort poster permanent från en materialiserad vy eller strömningstabell med borttagningsvektorer aktiverade, till exempel för GDPR-efterlevnad, måste ytterligare åtgärder utföras på objektets underliggande Delta-tabeller. Information om hur du tar bort poster från en materialiserad vy finns i Ta bort poster permanent från en materialiserad vy med borttagningsvektorer aktiverade. Information om hur du tar bort poster från en strömmande tabell finns i Ta bort poster permanent från en strömmande tabell.

Kontrollera hur tabeller materialiseras

Tabeller ger också ytterligare kontroll över materialiseringen:

Not

För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att DLT kontrollerar dataorganisationen. Om du inte förväntar dig att tabellen ska växa utöver en terabyte rekommenderar Databricks att du inte anger partitionskolumner.

Exempel: Ange ett schema och klusterkolumner

Du kan också ange ett schema när du definierar en tabell. I följande exempel specificeras schemat för måltabellen, inklusive användning av kolumner genererade av Delta Lake och definiering av klustringskolumner för tabellen.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Som standard härleder DLT schemat från table definition om du inte anger något schema.

exempel: Ange partitionskolumner

Du kan också ange partitionskolumner för tabellen:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Flytande klustring ger en flexibel, optimerad lösning för klustring. Överväg att använda CLUSTER BY i stället för PARTITIONED BY för DLT.

Exempel: Definiera tabellbegränsningar

Not

DLT-stöd för tabellbegränsningar finns i offentlig förhandsversion. För att definiera tabellbegränsningar måste pipelinen vara en Unity Catalog-aktiverad pipeline och vara konfigurerad för att använda kanalen preview.

När du anger ett schema kan du definiera primära och externa nycklar. Begränsningarna är informationsmässiga och tillämpas inte. Se CONSTRAINT-satsen i SQL-språkreferensen.

I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parameterisera värden som används vid deklarering av tabeller eller vyer med SQL

Använd SET för att ange ett konfigurationsvärde i en fråga som deklarerar en tabell eller vy, inklusive Spark-konfigurationer. Alla tabeller eller vyer som du definierar i en notebook-fil efter att SET-instruktionen har åtkomst till det definierade värdet. Alla Spark-konfigurationer som anges med instruktionen SET används vid körning av Spark-frågan för en tabell eller vy som följer SET-instruktionen. Om du vill läsa ett konfigurationsvärde i en fråga använder du syntaxen för stränginterpolation ${}. I följande exempel anges ett Spark-konfigurationsvärde med namnet startDate och det värdet används i en fråga:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Om du vill ange flera konfigurationsvärden använder du en separat SET-instruktion för varje värde.

Exempel: Definiera ett radfilter och en kolumnmask

Viktig

Radfilter och kolumnmasker finns i offentlig förhandsversion.

Om du vill skapa en materialiserad vy eller en strömningstabell med ett radfilter och en kolumnmask använder du ROW FILTER-satsen och MASK-satsen. I följande exempel visas hur du definierar en materialiserad vy och en strömningstabell med både ett radfilter och en kolumnmask:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

Mer information om radfilter och kolumnmasker finns i Publicera tabeller med radfilter och kolumnmasker.

SQL-egenskaper

CREATE TABLE eller VIEW
TEMPORARY
Skapa en tabell men publicera inte metadata för tabellen. Satsen TEMPORARY instruerar DLT att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden, bevaras en tillfällig tabell under hela livslängden för den pipeline som skapar den, och inte bara för en enda uppdatering.
STREAMING
Skapa en tabell som läser en indatauppsättning som en dataström. Indatauppsättningen måste vara en strömmande datakälla, till exempel automatisk inläsning eller en STREAMING tabell.
CLUSTER BY
Aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar.
Se Använd flytande klustring för Delta-tabeller.
PARTITIONED BY
En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen.
LOCATION
En valfri lagringsplats för tabelldata. Om det inte anges kommer systemet att välja pipelinelagringsplatsen som standardinställning.
COMMENT
En valfri beskrivning för tabellen.
column_constraint
En valfri primärnyckel eller utländsk nyckelbegränsning för kolumnen.
MASK clause (offentlig förhandsversion)
Lägger till en kolumnmaskfunktion för att anonymisera känsliga data. Framtida frågor för kolumnen returnerar den utvärderade funktionens resultat i stället för kolumnens ursprungliga värde. Detta är användbart för detaljerad åtkomstkontroll, eftersom funktionen kan kontrollera användarens identitets- och gruppmedlemskap för att avgöra om värdet ska redigeras.
Se Kolumn mask klausul.
table_constraint
En valfri informativ primärnyckel- eller utländsk nyckelbegränsning i tabellen.
TBLPROPERTIES
En valfri lista över tabellegenskaper för tabellen.
WITH ROW FILTER clause (offentlig förhandsversion)
Lägger till en radfilterfunktion i tabellen. Framtida frågor för tabellen tar emot en delmängd av de rader som funktionen utvärderas till TRUE för. Detta är användbart för detaljerad åtkomstkontroll eftersom den gör att funktionen kan kontrollera identitets- och gruppmedlemskapen för den anropande användaren för att avgöra om vissa rader ska filtreras.
Se ROW FILTER-satsen.
select_statement
En DLT-fråga som definierar datauppsättningen för tabellen.
CONSTRAINT-sats
EXPECT expectation_name
Definiera datakvalitetsbegränsningar expectation_name. Om villkoret ON VIOLATION inte har definierats lägger du till rader som bryter mot villkoret i måldatauppsättningen.
ON VIOLATION
Valfri åtgärd att vidta för felaktiga rader:
  • FAIL UPDATE: Stoppa omedelbart pipelinekörningen.
  • DROP ROW: Släpp posten och fortsätt bearbetningen.

Ändra datainsamling med SQL i DLT

Använd APPLY CHANGES INTO-instruktionen för att använda DLT CDC-funktioner enligt beskrivningen i följande:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Du definierar datakvalitetsbegränsningar för ett APPLY CHANGES mål med samma CONSTRAINT-sats som icke-APPLY CHANGES frågor. Se Hantera datakvalitet med pipelineförväntningar.

Not

Standardbeteendet för INSERT och UPDATE händelser är att utföra upsert CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hanteringen av DELETE-händelser kan specificeras med villkoret APPLY AS DELETE WHEN.

Viktig

Du måste deklarera en måltabell för strömning för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för APPLY CHANGES måltabellen måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by.

Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med DLT.

Klausuler
KEYS
Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.
Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.
Den här satsen krävs.
IGNORE NULL UPDATES
Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES har angetts, kommer kolumner med en null att behålla sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null.
Den här satsen är valfri.
Standardvärdet är att skriva över befintliga kolumner med null värden.
APPLY AS DELETE WHEN
Anger när en CDC-händelse ska behandlas som en DELETE istället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSeconds tabellegenskap.
Den här satsen är valfri.
APPLY AS TRUNCATE WHEN
Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom denna klausul utlöser en fullständig trunkering av måltabellen, bör den endast användas för specifika användningsfall som kräver denna funktion.
Satsen APPLY AS TRUNCATE WHEN stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkeringsåtgärden.
Den här satsen är valfri.
SEQUENCE BY
Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. DLT använder den här sekvenseringen för att hantera ändringshändelser som kommer ur ordning.
Den angivna kolumnen måste vara en sorterbar datatyp.
Den här satsen krävs.
COLUMNS
Anger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:
  • Ange den fullständiga listan med kolumner som ska inkluderas: COLUMNS (userId, name, city).
  • Ange en lista med kolumner som ska undantas: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri.
Standardvärdet är att inkludera alla kolumner i måltabellen när COLUMNS-satsen inte har angetts.
STORED AS
Om registerposter ska lagras som SCD typ av 1 eller SCD typ av 2.
Den här satsen är valfri.
Standardvärdet är SCD typ 1.
TRACK HISTORY ON
Specificerar en delmängd av utdatakolumner för att generera historikposter när det finns ändringar i de angivna kolumnerna. Du kan antingen:
  • Ange den fullständiga listan med kolumner som ska spåras: COLUMNS (userId, name, city).
  • Ange en lista över kolumner som ska undantas från spårning: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri. Standardvärdet är att spåra historik för alla utdatakolumner när det finns ändringar, vilket motsvarar TRACK HISTORY ON *.