Dela via


SKAPA DIREKTUPPSPELNINGSTABELL

Gäller för: markerad ja Databricks SQL

Skapar en strömmande tabell, en Delta-tabell med extra stöd för direktuppspelning eller inkrementell databearbetning.

Direktuppspelningstabeller stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Utveckla pipelinekod med SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parametrar

  • SVALKA

    Om det anges uppdaterar tabellen med de senaste tillgängliga data från källorna som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras till nästa uppdatering. Uppdateringsåtgärden från CREATE OR REFRESH är helt deklarativ. Om ett uppdateringskommando inte anger alla metadata från den ursprungliga tabellskapandet tas de ospecificerade metadata bort.

  • OM INTE FINNS

    Skapar strömningstabellen om den inte finns. Om det redan finns en tabell med det här namnet ignoreras -instruktionen CREATE STREAMING TABLE .

    Du kan ange högst en av IF NOT EXISTS eller OR REFRESH.

  • table_name

    Namnet på tabellen som ska skapas. Namnet får inte innehålla en temporal specifikation eller alternativspecifikation. Om namnet inte är kvalificerat skapas tabellen i det aktuella schemat.

  • table_specification

    Den här valfria satsen definierar listan över kolumner, deras typer, egenskaper, beskrivningar och kolumnbegränsningar.

    Om du inte definierar kolumner i tabellschemat måste du ange AS query.

    • column_identifier

      Ett unikt namn för kolumnen.

      • column_type

        Anger kolumnens datatyp .

      • INTE NULL

        Om det anges accepterar NULL kolumnen inte värden.

      • KOMMENTAR column_comment

        En strängliteral som beskriver kolumnen.

      • column_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en primärnyckel eller sekundärnyckelbegränsning i kolumnen i en strömmande tabell. Begränsningar stöds inte för tabeller i hive_metastore katalogen.

      • MASK-sats

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en kolumnmaskfunktion för att anonymisera känsliga data. Alla efterföljande frågor från den kolumnen får resultatet av utvärderingen av funktionen över kolumnen i stället för kolumnens ursprungliga värde. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om värdet ska redigeras.

      • BEGRÄNSNING expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | SLÄPP RAD } ]

        Lägger till datakvalitetsförväntningar i tabellen. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningstabellens händelselogg. En FAIL UPDATE förväntan gör att bearbetningen misslyckas när både tabellen skapas och tabellen uppdateras. En DROP ROW förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.

        expectation_expr kan bestå av literaler, kolumnidentifierare i tabellen och deterministiska, inbyggda SQL-funktioner eller operatorer förutom:

        Får inte heller expr innehålla någon underfråga.

      • table_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en informations primärnyckel eller informationsmässiga begränsningar för sekundärnyckel i en strömmande tabell. Viktiga begränsningar stöds inte för tabeller i hive_metastore katalogen.

  • table_clauses

    Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett uppdateringsschema för den nya tabellen. Varje undersats kan bara anges en gång.

    • PARTITIONERAD AV

      En valfri lista över kolumner i tabellen som tabellen ska partitioneras efter.

    • KOMMENTAR table_comment

      En STRING literal för att beskriva tabellen.

    • TBLPROPERTIES

      Du kan också ange en eller flera användardefinierade egenskaper.

      Använd den här inställningen om du vill ange den Delta Live Tables-körningskanal som används för att köra den här instruktionen. Ange värdet för pipelines.channel egenskapen till "PREVIEW" eller "CURRENT". Standardvärdet är "CURRENT". Mer information om Delta Live Tables-kanaler finns i Delta Live Tables-körningskanaler.

    • SCHEMA [ UPPDATERA ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Om du vill schemalägga en uppdatering som sker regelbundet använder du EVERY syntax. Om EVERY syntax anges uppdateras strömningstabellen eller den materialiserade vyn regelbundet med det angivna intervallet baserat på det angivna värdet, till exempel HOUR, HOURS, DAY, DAYS, WEEKeller WEEKS. I följande tabell visas godkända heltalsvärden för number.

        Time unit Heltalsvärde
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        Kommentar

        Singular- och pluralformerna i den inkluderade tidsenheten är semantiskt likvärdiga.

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        Så här schemalägger du en uppdatering med hjälp av ett quartz cron-värde . Giltiga time_zone_values accepteras. AT TIME ZONE LOCAL stöds inte.

        Om AT TIME ZONE den saknas används tidszonen för sessionen. Om AT TIME ZONE är frånvarande och sessionens tidszon inte har angetts utlöses ett fel. SCHEDULE är semantiskt likvärdigt med SCHEDULE REFRESH.

      Schemat kan anges som en del av CREATE kommandot. Använd ALTER STREAMING TABLE eller kör CREATE OR REFRESH kommandot med SCHEDULE -satsen för att ändra schemat för en strömmande tabell när du har skapat den.

    • MED ROW FILTER-sats

      Viktigt!

      Den här funktionen finns som allmänt tillgänglig förhandsversion.

      Lägger till en radfilterfunktion i tabellen. Alla efterföljande frågor från tabellen tar emot en delmängd av de rader där funktionen utvärderas till boolesk TRUE. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om vissa rader ska filtreras.

  • AS-fråga

    Den här satsen fyller i tabellen med hjälp av data från query. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet i STREAM valfri relation som du vill bearbeta stegvis. När du anger en query och en table_specification tillsammans måste tabellschemat som anges i table_specification innehålla alla kolumner som returneras av query, annars får du ett fel. Alla kolumner som anges i table_specification men inte returneras av query returvärden null när du frågar.

Skillnader mellan strömmande tabeller och andra tabeller

Direktuppspelningstabeller är tillståndskänsliga tabeller som endast är utformade för att hantera varje rad en gång när du bearbetar en växande datauppsättning. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tabeller bra för de flesta inmatningsarbetsbelastningar. Strömmande tabeller är optimala för pipelines som kräver data färskhet och låg svarstid. Strömningstabeller kan också vara användbara för omfattande skalningstransformeringar, eftersom resultaten kan beräknas stegvis när nya data anländer, vilket håller resultaten uppdaterade utan att helt omkomplera alla källdata med varje uppdatering. Strömmande tabeller är utformade för datakällor som endast är tilläggstabeller.

Strömmande tabeller accepterar ytterligare kommandon, till exempel REFRESH, som bearbetar de senaste data som är tillgängliga i de källor som anges i frågan. Ändringar i den angivna frågan återspeglas bara på nya data genom att anropa en REFRESH, inte tidigare bearbetade data. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL för att utföra en FULL REFRESH. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.

Radfilter och kolumnmasker

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Med radfilter kan du ange en funktion som tillämpas som ett filter när en tabellgenomsökning hämtar rader. Dessa filter säkerställer att efterföljande frågor endast returnerar rader som filterpredikatet utvärderas till sant för.

Med kolumnmasker kan du maskera en kolumns värden när en tabellgenomsökning hämtar rader. Alla framtida frågor som rör den kolumnen får resultatet av utvärderingen av funktionen över kolumnen och ersätter kolumnens ursprungliga värde.

Mer information om hur du använder radfilter och kolumnmasker finns i Filtrera känsliga tabelldata med hjälp av radfilter och kolumnmasker.

Hantera radfilter och kolumnmasker

Radfilter och kolumnmasker i strömmande tabeller ska läggas till, uppdateras eller tas bort via -instruktionen CREATE OR REFRESH .

Funktionssätt

  • Uppdatera som definierare: När -instruktionen CREATE OR REFRESH eller REFRESH uppdaterar en strömmande tabell körs radfilterfunktionerna med definierarens rättigheter (som tabellägare). Det innebär att tabelluppdateringen använder säkerhetskontexten för den användare som skapade strömningstabellen.
  • Fråga: De flesta filter körs med definierarens rättigheter, men funktioner som kontrollerar användarkontexten (till exempel CURRENT_USER och IS_MEMBER) är undantag. Dessa funktioner körs som anropare. Den här metoden tillämpar användarspecifika datasäkerhets- och åtkomstkontroller baserat på den aktuella användarens kontext.

Överskådlighet

Använd DESCRIBE EXTENDED, INFORMATION_SCHEMAeller Katalogutforskaren för att undersöka befintliga radfilter och kolumnmasker som gäller för en viss strömmande tabell. Med den här funktionen kan användare granska och granska dataåtkomst och skyddsåtgärder för strömmande tabeller.

Begränsningar

  • Endast tabellägare kan uppdatera strömmande tabeller för att hämta de senaste data.

  • ALTER TABLE kommandon tillåts inte för strömmande tabeller. Tabellens definition och egenskaper bör ändras via instruktionen CREATE OR REFRESH ELLER ALTER STREAMING TABLE .

  • Frågor om tidsresor stöds inte.

  • Utveckla tabellschemat via DML-kommandon som INSERT INTO, och MERGE stöds inte.

  • Följande kommandon stöds inte i strömmande tabeller:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Deltadelning stöds inte.

  • Det går inte att byta namn på tabellen eller ändra ägaren.

  • Tabellbegränsningar som PRIMARY KEY och FOREIGN KEY stöds inte.

  • Genererade kolumner, identitetskolumner och standardkolumner stöds inte.

Exempel

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')