Dela via


CREATE STREAMING TABLE

Gäller för:markerad ja Databricks SQL

Skapar en strömmande table, en Delta-table med extra stöd för direktuppspelning eller inkrementell databearbetning.

Strömmande tables stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Utveckla pipelinekod med SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    Om det är angivet, uppdaterar table med de senaste tillgängliga data från de källor som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras tills nästa refresh. Åtgärden refresh från CREATE OR REFRESH är helt deklarativ. Om ett refresh-kommando inte anger alla metadata från den ursprungliga table skapande-instruktionen tas de ospecificerade metadata bort.

  • OM INTE FINNS

    Skapar strömmen table om den inte finns. Om det redan finns en table med det här namnet ignoreras CREATE STREAMING TABLE-instruktionen.

    Du kan ange högst en av IF NOT EXISTS eller OR REFRESH.

  • table_name

    Namnet på table som ska skapas. Namnet får inte innehålla en temporal specifikation eller alternativspecifikation. Om namnet inte är kvalificerat skapas table i den aktuella schema.

  • table_specification

    Den här valfria satsen definierar list för columns, deras typer, egenskaper, beskrivningar och column begränsningar.

    Om du inte definierar columns i tableschema måste du ange AS query.

    • column_identifier

      Ett unikt namn för column.

      • column_type

        Anger datatyp för column.

      • INTE NULL

        Om det anges accepterar column inte NULLvalues.

      • KOMMENTAR column_comment

        En textsträng som beskriver column.

      • column_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en primärnyckel eller främmande nyckel constraint till column i en strömmande table. Begränsningar stöds inte för tables i hive_metastorecatalog.

      • MASK-sats

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en column maskfunktion för att anonymisera känsliga data. Alla efterföljande frågor från den column får resultatet av att utvärdera funktionen över column, istället för column:s ursprungliga värde. Detta kan vara användbart i detaljerade åtkomstkontrollsyften where funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om värdet ska redigeras.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | TA BORT RAD } ]

        Lägger till datakvalitetsförväntningar i table. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningens table:s -händelselogg. En FAIL UPDATE förväntan gör att bearbetningen misslyckas när både table skapas och tableuppdateras. En DROP ROW förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.

        expectation_expr kan bestå av literaler, column identifierare inom tableoch deterministiska, inbyggda SQL-funktioner eller operatorer förutom:

        Får inte heller expr innehålla någon underfråga.

      • table_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en informativ primärnyckel eller informativa främmande nyckelbegränsningar till en strömmande table. Viktiga begränsningar stöds inte för tables i hive_metastorecatalog.

  • table_clauses

    Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett refresh schema för den nya table. Varje undersats kan bara anges en gång.

    • PARTITIONERAD AV

      En valfri list av columns av table att partitiontable av.

    • KOMMENTAR table_comment

      En STRING literal som beskriver table.

    • TBLPROPERTIES

      Du kan också ange en eller flera användardefinierade egenskaper.

      Använd den här inställningen om du vill ange delta live-Tables-körningskanal som används för att köra den här instruktionen. Ange Set värdet på egenskapen pipelines.channel till "PREVIEW" eller "CURRENT". Standardvärdet är "CURRENT". Mer information om Delta Live Tables kanaler finns i Delta Live Tables runtime-kanaler.

    • SCHEDULE [ REFRESH ] schema_klausul

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      För att schemalägga en refresh som inträffar regelbundet, använd syntaxen EVERY. Om EVERY syntax anges uppdateras den strömmande table- eller materialiserade vyn regelbundet med det angivna intervallet baserat på det angivna värdet, till exempel HOUR, HOURS, DAY, DAYS, WEEKeller WEEKS. Följande table visar godkända heltal values för number.

      Time unit Heltalsvärde
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      Kommentar

      Singular- och pluralformerna i den inkluderade tidsenheten är semantiskt likvärdiga.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      Så här schemalägger du en refresh med hjälp av ett kvarts cron- värde. Giltiga time_zone_values accepteras. AT TIME ZONE LOCAL stöds inte.

      Om AT TIME ZONE den saknas används tidszonen för sessionen. Om AT TIME ZONE saknas och sessionens tidszon inte är setutlöses ett fel. SCHEDULE är semantiskt likvärdigt med SCHEDULE REFRESH.

    Schemat kan anges som en del av CREATE kommandot. Använd kommandot ALTER STREAMING TABLE eller kör CREATE OR REFRESH med SCHEDULE-satsen för att ändra schemat för en strömmande table efter skapandet.

  • WITH ROW FILTER-klausul

    Viktigt!

    Den här funktionen finns som allmänt tillgänglig förhandsversion.

    Lägger till en radfilterfunktion i table. Alla efterföljande frågor från den table tar emot en delmängd av raderna where som utvärderas av funktionen till boolesk TRUE. Detta kan vara användbart för detaljerad åtkomstkontroll where funktionen kan inspektera identitets- eller gruppmedlemskap för den anropande användaren för att avgöra om vissa rader ska filtreras.

  • AS-fråga

    Den här satsen fyller i table med hjälp av data från query. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet i STREAM valfri relation som du vill bearbeta stegvis. När du anger en query och en table_specification tillsammans måste tableschema som anges i table_specification innehålla alla columns som returneras av query, annars get du ett fel. Alla columns som anges i table_specification men som inte returneras av query returnerar nullvalues när de efterfrågas.

Skillnader mellan strömmande tables och andra tables

Strömmande tables är tillståndsorienterade tables, utformade för att endast bearbeta varje rad en gång när du hanterar ett expanderande dataset. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tables bra för de flesta inmatningsarbetsbelastningar. Strömmande tables är optimala för pipelines som kräver data färskhet och låg svarstid. Streaming tables kan också vara användbart för omfattande transformationer i stor skala, eftersom resultaten kan beräknas inkrementellt när nya data anländer, vilket håller resultaten uppdaterade utan att behöva beräkna om alla källdata med varje update. Strömmande tables är utformade för datakällor som endast läggs till.

Strömmande tables accepterar ytterligare kommandon, till exempel REFRESH, som bearbetar de senaste tillgängliga data i källorna i frågan. Ändringar i den angivna frågan get återspeglas på nya data genom att anropa REFRESHsom inte tidigare bearbetats. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL för att utföra en FULL REFRESH. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du utför total uppdatering på källor som inte bevarar hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom hela refresh kan radera befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.

Radfilter och column-maskerna

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Med radfilter kan du ange en funktion som används som ett filter när en table genomsökning hämtar rader. Dessa filter säkerställer att efterföljande frågor endast returnerar rader som filterpredikatet utvärderas till sant för.

Column masker låter dig maskera en column:s values när en table genomsökning hämtar rader. Alla framtida frågor som rör den column får resultatet av utvärderingen av funktionen via columnoch ersätter columnursprungliga värde.

Mer information om hur du använder radfilter och column masker finns i Filtrera känsliga table data med hjälp av radfilter och column masker.

Hantera radfilter och Column masker

Radfilter och column-masker på direktuppspelning tables ska läggas till, uppdateras eller tas bort via CREATE OR REFRESH-instruktionen.

Funktionssätt

  • Refresh som Definer: När CREATE OR REFRESH- eller REFRESH-uttrycken refresh en strömmande table, körs radfilterfunktionerna med definierarens rättigheter (som table-ägare). Det innebär att tablerefresh använder säkerhetskontexten för den användare som skapade det strömmande table.
  • Fråga: De flesta filter körs med definierarens rättigheter, men funktioner som kontrollerar användarkontexten (till exempel CURRENT_USER och IS_MEMBER) är undantag. Dessa funktioner körs som anropare. Den här metoden tillämpar användarspecifika datasäkerhets- och åtkomstkontroller baserat på den aktuella användarens kontext.

Överskådlighet

Använd DESCRIBE EXTENDED, INFORMATION_SCHEMAeller Catalog Explorer för att undersöka befintliga radfilter och column-masker som gäller för en viss strömmande table. Den här funktionen gör det möjligt för användare att kontrollera och granska dataåtkomst och skyddsåtgärder för strömning tables.

Begränsningar

  • Endast table innehavare kan refresh strömma tables till get de senaste uppgifterna.
  • ALTER TABLE-kommandon är inte tillåtna för direktuppspelning tables. Definitionen och egenskaperna för table bör ändras via instruktionen CREATE OR REFRESH eller ALTER STREAMING TABLE.
  • Utveckling av tableschema via DML-kommandon som INSERT INTOoch MERGE stöds inte.
  • Följande kommandon stöds inte vid direktuppspelning tables:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Deltadelning stöds inte.
  • Det går inte att byta namn på table eller ändra ägaren.
  • Table begränsningar som PRIMARY KEY och FOREIGN KEY stöds inte.
  • Genererade columns, identitet columnsoch standard columns stöds inte.

Exempel

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')