Compartir vía


CREATE STREAMING TABLE

Se aplica a: casilla marcada como Sí Databricks SQL

Crea una tabla de flujo de datoses una tabla delta con compatibilidad adicional para el procesamiento de datos incremental o de transmisión.

Las tablas de streaming solo se admiten en Delta Live Tables y en Databricks SQL con el catálogo de Unity. Al ejecutar este comando en el proceso de Databricks Runtime compatible, solo se analiza la sintaxis. Consulte Desarrollo de código de canalización con SQL.

Sintaxis

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parámetros

  • REFRESH

    Si se especifica, actualiza la tabla con los datos más recientes disponibles de los orígenes definidos en la consulta. Solo se procesan los nuevos datos que llegan antes de que se inicie la consulta. Los nuevos datos que se agregan a los orígenes durante la ejecución del comando se omiten hasta la siguiente actualización. La operación de actualización de CREATE OR REFRESH es totalmente declarativa. Si un comando refresh no especifica todos los metadatos de la instrucción original de creación de la tabla, se eliminan los metadatos no especificados.

  • IF NOT EXISTS

    Crea la tabla de streaming si no existe. Si ya existe una tabla con este nombre, se omitirá la instrucción CREATE STREAMING TABLE.

    Puede especificar como máximo uno de IF NOT EXISTS o OR REFRESH.

  • table_name

    Nombre de la tabla que se va a crear. El nombre no debe incluir una especificación temporal ni una especificación de opciones. Si el nombre no está completo, la tabla se crea en el esquema actual.

  • table_specification

    Esta cláusula opcional define la lista de columnas y sus tipos, propiedades, descripciones y restricciones de columnas.

    Si no define columnas en el esquema de la tabla, debe especificar AS query.

    • column_identifier

      Nombre único para la columna.

      • column_type

        Especifica el tipo de datos de la columna.

      • NOT NULL

        Si se especifica, la columna no acepta valores NULL.

      • COMMENT column_comment

        Literal de cadena para describir la columna.

      • column_constraint

        Importante

        Esta característica está en versión preliminar pública.

        Agrega una clave principal o una restricción de clave externa a la columna de una tabla de transmisión. No se admiten restricciones para las tablas del catálogo hive_metastore.

      • Cláusula MASK

        Importante

        Esta característica está en versión preliminar pública.

        Agrega una función de máscara de columna para anonimizar datos confidenciales. Todas las consultas siguientes desde esa columna recibirán el resultado de evaluar esa función sobre la columna en lugar del valor original de la columna. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si expurga el valor.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Agrega expectativas de calidad de datos a la tabla. Estas expectativas de calidad de los datos se pueden realizar con el tiempo y acceder a ellas a través del registro de eventos de la tabla de streaming. Una FAIL UPDATE expectativa hace que se produzca un error en el procesamiento al crear la tabla, así como al actualizar la tabla. Una DROP ROW expectativa hace que se quite toda la fila si no se cumple la expectativa.

        expectation_expr puede estar compuesto por literales, identificadores de columna dentro de la tabla y funciones u operadores SQL integrados y deterministas, a excepción de lo siguiente:

        expr tampoco debe contener ninguna subconsulta.

      • table_constraint

        Importante

        Esta característica está en versión preliminar pública.

        Agrega una clave principal informativa o restricciones de clave externa informativas a una tabla de transmisión. No se admiten restricciones de clave para las tablas del catálogo hive_metastore.

  • table_clauses

    Opcionalmente, especifique la creación de particiones, los comentarios, las propiedades definidas por el usuario y una programación de actualización para la nueva tabla. Cada subcláusula solo se puede especificar una vez.

    • PARTITIONED BY

      Lista opcional de columnas de la tabla por la que se va a particionar la tabla.

    • COMMENT table_comment

      Una STRING literal para describir la tabla.

    • TBLPROPERTIES

      Este parámetro opcional le permite establecer una o más propiedades que defina el usuario.

      Use esta configuración para especificar el canal en tiempo de ejecución de Delta Live Tables que se usa para ejecutar esta instrucción. Establezca el valor de la pipelines.channel propiedad en "PREVIEW" o "CURRENT". El valor predeterminado es "CURRENT". Para obtener más información sobre los canales de Delta Live Tables, consulte Canales en tiempo de ejecución de Delta Live Tables.

    • SCHEDULE [ REFRESH ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        Importante

        Esta característica está en versión preliminar pública.

        Para programar una actualización que se produce periódicamente, use EVERY la sintaxis . Si EVERY se especifica la sintaxis, la tabla de streaming o la vista materializada se actualiza periódicamente en el intervalo especificado según el valor proporcionado, como HOUR, HOURS, DAY, DAYS, WEEKo WEEKS. En la tabla siguiente se enumeran los valores enteros aceptados para number.

        Time unit Valor entero
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        Nota:

        Las formas singulares y plurales de la unidad de tiempo incluida son semánticamente equivalentes.

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        Para programar una actualización utilizando un valor cron de cuarzo. Se aceptan time_zone_values válidos. No se admite AT TIME ZONE LOCAL.

        Si AT TIME ZONE no está presente, se usa la zona horaria de la sesión. Si AT TIME ZONE no está presente y no se establece la zona horaria de la sesión, se produce un error. SCHEDULE es equivalente semánticamente a SCHEDULE REFRESH.

      La programación se puede proporcionar como parte del CREATE comando. Use ALTER STREAMING TABLE o ejecute el comando CREATE OR REFRESH con la cláusula SCHEDULE para modificar la programación de una tabla de streaming después de la creación.

    • WITH Cláusula ROW FILTER

      Importante

      Esta característica está en versión preliminar pública.

      Agrega una función de filtro de fila a la tabla. Todas las consultas siguientes desde esa tabla recibirán un subconjunto de filas para las que la función se evalúa como un valor TRUE booleano. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si se filtran determinadas filas.

  • AS query

    Esta cláusula rellena la tabla con los datos de query. Esta consulta debe ser una consulta de streaming. Esto se puede lograr agregando la STREAM palabra clave a cualquier relación que desee procesar de forma incremental. Al especificar un query elemento y un elemento table_specification juntos, el esquema de tabla especificado en table_specification debe contener todas las columnas devueltas por query, de lo contrario, obtendrá un error. Todas las columnas especificadas en table_specification pero no devueltas por query valores devueltos null cuando se consultan.

Diferencias entre tablas de streaming y otras tablas

Las tablas de streaming son tablas con estado, diseñadas para controlar cada fila solo una vez a medida que se procesa un conjunto de datos creciente. Dado que la mayoría de los conjuntos de datos crecen continuamente con el tiempo, las tablas de streaming son adecuadas para la mayoría de las cargas de trabajo de ingesta. Las tablas de streaming son óptimas para las canalizaciones que requieren actualización de datos y baja latencia. Las tablas de streaming también pueden ser útiles para las transformaciones de escala masiva, ya que los resultados se pueden calcular incrementalmente a medida que llegan nuevos datos, manteniendo los resultados actualizados sin necesidad de volver a calcular completamente todos los datos de origen con cada actualización. Las tablas de streaming están diseñadas para orígenes de datos que son de solo anexión.

Las tablas de streaming aceptan comandos adicionales, como REFRESH, que procesa los datos más recientes disponibles en los orígenes proporcionados en la consulta. Los cambios en la consulta proporcionada solo se reflejan en los nuevos datos mediante una llamada a, REFRESHno los datos procesados previamente. Para aplicar también los cambios en los datos existentes, debe ejecutar REFRESH TABLE <table_name> FULL para realizar una FULL REFRESH. Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no mantengan todo el historial de los datos o tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.

Filtros de fila y máscaras de columna

Importante

Esta característica está en versión preliminar pública.

Filtros de fila permiten especificar una función que se aplica como filtro cada vez que un recorrido de tabla captura filas. Estos filtros garantizan que las consultas posteriores solo devuelven filas para las que el predicado de filtro se evalúa como true.

Las máscaras de columna permiten enmascarar los valores de una columna cada vez que un examen de tabla captura filas. Todas las consultas futuras que implican esa columna recibirán el resultado de evaluar la función sobre la columna, reemplazando el valor original de la columna’.

Para obtener más información sobre cómo usar filtros de fila y máscaras de columna, vea Filtrar datos confidenciales de la tabla mediante filtros de fila y máscaras de columna.

Administración de filtros de fila y máscaras de columna

Los filtros de fila y las máscaras de columna de las tablas de streaming deben agregarse, actualizarse o quitarse a través de la instrucción CREATE OR REFRESH.

Comportamiento

  • Actualizar como definidor: Cuando las instrucciones CREATE OR REFRESH o REFRESH actualizan una tabla de streaming, las funciones de filtros de fila se ejecutan con los derechos del definidor’(como propietario de la tabla). Esto significa que la actualización de la tabla usa el contexto de seguridad del usuario que creó la tabla de streaming.
  • Consulta: aunque la mayoría de los filtros se ejecutan con los derechos del definidor, las funciones que comprueban el contexto del usuario (como CURRENT_USER y IS_MEMBER) son excepciones. Estas funciones se ejecutan como invocador. Este enfoque aplica controles de acceso y seguridad de datos específicos del usuario en función del contexto del usuario actual.

Observabilidad

Use DESCRIBE EXTENDED, INFORMATION_SCHEMAo el Explorador de catálogos para examinar los filtros de fila y las máscaras de columna existentes que se aplican a una tabla de streaming determinada. Esta funcionalidad permite a los usuarios auditar y revisar las medidas de acceso y protección de datos en tablas de streaming.

Limitaciones

  • Solo los propietarios de tablas pueden actualizar las tablas de streaming para obtener los datos más recientes.

  • ALTER TABLE Los comandos no se permiten en las tablas de streaming. La definición y las propiedades de la tabla se deben modificar mediante las instrucciones CREATE OR REFRESH o ALTER STREAMING TABLE.

  • No se admiten las consultas de viaje de tiempo.

  • No se admite la evolución del esquema de tabla a través de comandos DML como INSERT INTO, y MERGE .

  • Los siguientes comandos no se admiten en tablas de streaming:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • No se admite el uso compartido delta.

  • No se admite cambiar el nombre de la tabla ni cambiar el propietario.

  • No se admiten restricciones de tabla como PRIMARY KEY y FOREIGN KEY.

  • No se admiten columnas generadas, columnas de identidad y columnas predeterminadas.

Ejemplos

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')