CREATE STREAMING TABLE
Se aplica a: Databricks SQL
Crea una tabla de flujo de datoses una tabla delta con compatibilidad adicional para el procesamiento de datos incremental o de transmisión.
Las tablas de streaming solo se admiten en Delta Live Tables y en Databricks SQL con el catálogo de Unity. Al ejecutar este comando en el proceso de Databricks Runtime compatible, solo se analiza la sintaxis. Consulte Desarrollo de código de canalización con SQL.
Sintaxis
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parámetros
REFRESH
Si se especifica, actualiza la tabla con los datos más recientes disponibles de los orígenes definidos en la consulta. Solo se procesan los nuevos datos que llegan antes de que se inicie la consulta. Los nuevos datos que se agregan a los orígenes durante la ejecución del comando se omiten hasta la siguiente actualización. La operación de actualización de CREATE OR REFRESH es totalmente declarativa. Si un comando refresh no especifica todos los metadatos de la instrucción original de creación de la tabla, se eliminan los metadatos no especificados.
IF NOT EXISTS
Crea la tabla de streaming si no existe. Si ya existe una tabla con este nombre, se omitirá la instrucción
CREATE STREAMING TABLE
.Puede especificar como máximo uno de
IF NOT EXISTS
oOR REFRESH
.-
Nombre de la tabla que se va a crear. El nombre no debe incluir una especificación temporal ni una especificación de opciones. Si el nombre no está completo, la tabla se crea en el esquema actual.
table_specification
Esta cláusula opcional define la lista de columnas y sus tipos, propiedades, descripciones y restricciones de columnas.
Si no define columnas en el esquema de la tabla, debe especificar
AS query
.-
Nombre único para la columna.
-
Especifica el tipo de datos de la columna.
NOT NULL
Si se especifica, la columna no acepta valores
NULL
.COMMENT column_comment
Literal de cadena para describir la columna.
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal o una restricción de clave externa a la columna de una tabla de transmisión. No se admiten restricciones para las tablas del catálogo
hive_metastore
. -
Importante
Esta característica está en versión preliminar pública.
Agrega una función de máscara de columna para anonimizar datos confidenciales. Todas las consultas siguientes desde esa columna recibirán el resultado de evaluar esa función sobre la columna en lugar del valor original de la columna. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si expurga el valor.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Agrega expectativas de calidad de datos a la tabla. Estas expectativas de calidad de los datos se pueden realizar con el tiempo y acceder a ellas a través del registro de eventos de la tabla de streaming. Una
FAIL UPDATE
expectativa hace que se produzca un error en el procesamiento al crear la tabla, así como al actualizar la tabla. UnaDROP ROW
expectativa hace que se quite toda la fila si no se cumple la expectativa.expectation_expr
puede estar compuesto por literales, identificadores de columna dentro de la tabla y funciones u operadores SQL integrados y deterministas, a excepción de lo siguiente:- Funciones de agregado
- Funciones de ventana analítica
- Funciones de ventana de categoría
- Funciones de generador con valores de tabla
expr
tampoco debe contener ninguna subconsulta.- Funciones de agregado
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal informativa o restricciones de clave externa informativas a una tabla de transmisión. No se admiten restricciones de clave para las tablas del catálogo
hive_metastore
.
-
-
table_clauses
Opcionalmente, especifique la creación de particiones, los comentarios, las propiedades definidas por el usuario y una programación de actualización para la nueva tabla. Cada subcláusula solo se puede especificar una vez.
-
Lista opcional de columnas de la tabla por la que se va a particionar la tabla.
COMMENT table_comment
Una
STRING
literal para describir la tabla.-
Este parámetro opcional le permite establecer una o más propiedades que defina el usuario.
Use esta configuración para especificar el canal en tiempo de ejecución de Delta Live Tables que se usa para ejecutar esta instrucción. Establezca el valor de la
pipelines.channel
propiedad en"PREVIEW"
o"CURRENT"
. El valor predeterminado es"CURRENT"
. Para obtener más información sobre los canales de Delta Live Tables, consulte Canales en tiempo de ejecución de Delta Live Tables. SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Importante
Esta característica está en versión preliminar pública.
Para programar una actualización que se produce periódicamente, use
EVERY
la sintaxis . SiEVERY
se especifica la sintaxis, la tabla de streaming o la vista materializada se actualiza periódicamente en el intervalo especificado según el valor proporcionado, comoHOUR
,HOURS
,DAY
,DAYS
,WEEK
oWEEKS
. En la tabla siguiente se enumeran los valores enteros aceptados paranumber
.Time unit Valor entero HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Nota:
Las formas singulares y plurales de la unidad de tiempo incluida son semánticamente equivalentes.
CRON cron_string [ AT TIME ZONE timezone_id ]
Para programar una actualización utilizando un valor cron de cuarzo. Se aceptan time_zone_values válidos. No se admite
AT TIME ZONE LOCAL
.Si
AT TIME ZONE
no está presente, se usa la zona horaria de la sesión. SiAT TIME ZONE
no está presente y no se establece la zona horaria de la sesión, se produce un error.SCHEDULE
es equivalente semánticamente aSCHEDULE REFRESH
.
La programación se puede proporcionar como parte del
CREATE
comando. Use ALTER STREAMING TABLE o ejecute el comandoCREATE OR REFRESH
con la cláusulaSCHEDULE
para modificar la programación de una tabla de streaming después de la creación.WITH Cláusula ROW FILTER
Importante
Esta característica está en versión preliminar pública.
Agrega una función de filtro de fila a la tabla. Todas las consultas siguientes desde esa tabla recibirán un subconjunto de filas para las que la función se evalúa como un valor TRUE booleano. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si se filtran determinadas filas.
-
AS query
Esta cláusula rellena la tabla con los datos de
query
. Esta consulta debe ser una consulta de streaming. Esto se puede lograr agregando laSTREAM
palabra clave a cualquier relación que desee procesar de forma incremental. Al especificar unquery
elemento y un elementotable_specification
juntos, el esquema de tabla especificado entable_specification
debe contener todas las columnas devueltas porquery
, de lo contrario, obtendrá un error. Todas las columnas especificadas entable_specification
pero no devueltas porquery
valores devueltosnull
cuando se consultan.
Diferencias entre tablas de streaming y otras tablas
Las tablas de streaming son tablas con estado, diseñadas para controlar cada fila solo una vez a medida que se procesa un conjunto de datos creciente. Dado que la mayoría de los conjuntos de datos crecen continuamente con el tiempo, las tablas de streaming son adecuadas para la mayoría de las cargas de trabajo de ingesta. Las tablas de streaming son óptimas para las canalizaciones que requieren actualización de datos y baja latencia. Las tablas de streaming también pueden ser útiles para las transformaciones de escala masiva, ya que los resultados se pueden calcular incrementalmente a medida que llegan nuevos datos, manteniendo los resultados actualizados sin necesidad de volver a calcular completamente todos los datos de origen con cada actualización. Las tablas de streaming están diseñadas para orígenes de datos que son de solo anexión.
Las tablas de streaming aceptan comandos adicionales, como REFRESH
, que procesa los datos más recientes disponibles en los orígenes proporcionados en la consulta. Los cambios en la consulta proporcionada solo se reflejan en los nuevos datos mediante una llamada a, REFRESH
no los datos procesados previamente. Para aplicar también los cambios en los datos existentes, debe ejecutar REFRESH TABLE <table_name> FULL
para realizar una FULL REFRESH
. Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no mantengan todo el historial de los datos o tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.
Filtros de fila y máscaras de columna
Importante
Esta característica está en versión preliminar pública.
Filtros de fila permiten especificar una función que se aplica como filtro cada vez que un recorrido de tabla captura filas. Estos filtros garantizan que las consultas posteriores solo devuelven filas para las que el predicado de filtro se evalúa como true.
Las máscaras de columna permiten enmascarar los valores de una columna cada vez que un examen de tabla captura filas. Todas las consultas futuras que implican esa columna recibirán el resultado de evaluar la función sobre la columna, reemplazando el valor original de la columna’.
Para obtener más información sobre cómo usar filtros de fila y máscaras de columna, vea Filtrar datos confidenciales de la tabla mediante filtros de fila y máscaras de columna.
Administración de filtros de fila y máscaras de columna
Los filtros de fila y las máscaras de columna de las tablas de streaming deben agregarse, actualizarse o quitarse a través de la instrucción CREATE OR REFRESH
.
Comportamiento
- Actualizar como definidor: Cuando las instrucciones
CREATE OR REFRESH
oREFRESH
actualizan una tabla de streaming, las funciones de filtros de fila se ejecutan con los derechos del definidor’(como propietario de la tabla). Esto significa que la actualización de la tabla usa el contexto de seguridad del usuario que creó la tabla de streaming. - Consulta: aunque la mayoría de los filtros se ejecutan con los derechos del definidor, las funciones que comprueban el contexto del usuario (como
CURRENT_USER
yIS_MEMBER
) son excepciones. Estas funciones se ejecutan como invocador. Este enfoque aplica controles de acceso y seguridad de datos específicos del usuario en función del contexto del usuario actual.
Observabilidad
Use DESCRIBE EXTENDED
, INFORMATION_SCHEMA
o el Explorador de catálogos para examinar los filtros de fila y las máscaras de columna existentes que se aplican a una tabla de streaming determinada. Esta funcionalidad permite a los usuarios auditar y revisar las medidas de acceso y protección de datos en tablas de streaming.
Limitaciones
Solo los propietarios de tablas pueden actualizar las tablas de streaming para obtener los datos más recientes.
ALTER TABLE
Los comandos no se permiten en las tablas de streaming. La definición y las propiedades de la tabla se deben modificar mediante las instruccionesCREATE OR REFRESH
o ALTER STREAMING TABLE.No se admiten las consultas de viaje de tiempo.
No se admite la evolución del esquema de tabla a través de comandos DML como
INSERT INTO
, yMERGE
.Los siguientes comandos no se admiten en tablas de streaming:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
No se admite el uso compartido delta.
No se admite cambiar el nombre de la tabla ni cambiar el propietario.
No se admiten restricciones de tabla como
PRIMARY KEY
yFOREIGN KEY
.No se admiten columnas generadas, columnas de identidad y columnas predeterminadas.
Ejemplos
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')