Carga de datos mediante tablas de streaming en Databricks SQL
Databricks recomienda usar tablas de streaming para ingerir datos mediante Databricks SQL. Una tabla de streaming es una tabla registrada en el Unity Catalog con compatibilidad adicional con el procesamiento de datos incremental o streaming. Se crea automáticamente una canalización de Delta Live Tables para cada tabla de streaming. Puede usar tablas de streaming para la carga incremental de datos desde Kafka y el almacenamiento de objetos en la nube.
En este artículo se muestra el uso de tablas de streaming para cargar datos desde el almacenamiento de objetos en la nube configurado como un volumen de Unity Catalog (recomendado) o una ubicación externa.
Nota
Para obtener información sobre el modo de uso de las tablas de Delta Lake como orígenes y receptores de streaming, consulte Lecturas y escrituras de streaming de tablas de Delta.
Importante
Las tablas de streaming creadas en Databricks SQL están respaldadas por una canalización de Delta Live Tables sin servidor. El área de trabajo debe admitir canalizaciones sin servidor para usar esta funcionalidad.
Antes de empezar
Antes de comenzar, deberá cumplir los siguientes requisitos.
Requisitos del área de trabajo:
- Una cuenta de Azure Databricks con el almacén sin servidor habilitado. Para obtener más información, consulte Habilitar almacenes de SQL sin servidor.
- Un área de trabajo con Unity Catalog habilitado. Para más información, consulte Configuración y administración de Unity Catalog.
Requisitos de proceso:
Debe usar uno de los siguientes enfoques:
Un almacén de SQL que usa el canal
Current
.Proceso con modo de acceso compartido en Databricks Runtime 13.3 LTS o superior.
Proceso con modo de acceso de usuario único en Databricks Runtime 15.4 LTS o superior.
En Databricks Runtime 15.3 y versiones posteriores, no se puede usar el proceso de usuario único para consultar tablas de streaming que son propiedad de otros usuarios. Puede usar el proceso de un solo usuario en Databricks Runtime 15.3 y versiones posteriores solo si posee la tabla de streaming. El creador de la tabla es el propietario.
Databricks Runtime 15.4 LTS y versiones posteriores admiten consultas en tablas generadas por Delta Live Tables en un proceso de usuario único, independientemente de la propiedad de la tabla. Para aprovechar el filtrado de datos proporcionado en Databricks Runtime 15.4 LTS y versiones posteriores, debe confirmar que el área de trabajo está habilitada para el proceso sin servidor, ya que la funcionalidad de filtrado de datos que admite tablas generadas por Delta Live Tables se ejecuta en un proceso sin servidor. Es posible que se le cobre por los recursos de proceso sin servidor al usar el proceso de un solo usuario para ejecutar operaciones de filtrado de datos. Consulte control de acceso específico en el proceso de un solo usuario.
Requisitos de permisos:
- El privilegio
READ FILES
en una ubicación externa de Unity Catalog. Para más información, consulte Creación de una ubicación externa para conectar el almacenamiento en la nube a Azure Databricks. - El privilegio
USE CATALOG
en el catálogo en el que se crea la tabla de streaming. - El privilegio
USE SCHEMA
en el esquema en el que se crea la tabla de streaming. - El privilegio
CREATE TABLE
en el esquema en el que se crea la tabla de streaming.
Otros requisitos:
La ruta de acceso a los datos de origen.
Ejemplo de ruta de acceso del volumen:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Ejemplo de ruta de acceso de ubicación externa:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Nota
En este artículo se da por supuesto que los datos que quiere cargar están en una ubicación de almacenamiento en la nube que se corresponde con un volumen de Unity Catalog o con una ubicación externa a la que tiene acceso.
Descubrir y previsualizar datos de origen
En la barra lateral del área de trabajo, haga clic en Consultas y, a continuación, en Crear consulta.
En el editor de consultas, seleccione un almacén de SQL que use el canal
Current
en la lista desplegable.Pegue lo siguiente en el editor, sustituyendo los valores entre corchetes angulares (
<>
) por la información que identifica los datos de origen y, a continuación, haga clic en Ejecutar.Nota
Es posible que encuentre errores de inferencia de esquema al ejecutar la función con valores de tabla
read_files
si los valores predeterminados de la función no pueden analizar los datos. Por ejemplo, es posible que tenga que configurar el modo de varias líneas para archivos CSV o JSON de varias líneas. Para obtener una lista de las opciones del analizador, consulte función con valores de tabla read_files./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Cargar datos en una tabla de streaming
Para crear una tabla de streaming a partir de datos en el almacenamiento de objetos en la nube, pegue lo siguiente en el editor de consultas y, a continuación, haga clic en Ejecutar:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Establecer el canal en tiempo de ejecución
Las tablas de streaming creadas mediante almacenes de SQL se actualizan automáticamente mediante una canalización de Delta Live Tables. Las canalizaciones de Delta Live Tables usan el tiempo de ejecución en el canal de current
de forma predeterminada. Consulte Notas de la versión de Delta Live Tables y el proceso de actualización de la versión para obtener información sobre el proceso de versión.
Databricks recomienda usar el canal de current
para cargas de trabajo de producción. Las nuevas características se publican por primera vez en el canal de preview
. Puede establecer una canalización en el canal delta Live Tables en versión preliminar para probar las nuevas características especificando preview
como una propiedad de tabla. Puede especificar esta propiedad al crear la tabla o después de crear la tabla mediante una instrucción ALTER.
En el ejemplo de código siguiente se muestra cómo establecer el canal en versión preliminar en una instrucción CREATE:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
Actualizar una tabla de streaming mediante una canalización DLT
En esta sección se describen los patrones para actualizar una tabla de streaming con los datos más recientes disponibles de los orígenes definidos en la consulta.
Al CREATE
o REFRESH
una tabla de streaming, la actualización procesa mediante una canalización delta Live Tables sin servidor. Cada tabla de streaming que defina tiene una canalización de Delta Live Tables asociada.
Después de ejecutar el comando REFRESH
, se devuelve el vínculo de canalización DLT. Puede usar el vínculo de canalización DLT para comprobar el estado de la actualización.
Nota
Solo el propietario de la tabla puede actualizar una tabla de streaming para obtener los datos más recientes. El usuario que crea la tabla es el propietario y el propietario no se puede cambiar. Es posible que tenga que actualizar la tabla de streaming antes de usar consultas de viaje en el tiempo.
Consulte ¿Qué es Delta Live Tables?
Ingerir solo nuevos datos
De forma predeterminada, la función read_files
lee todos los datos existentes en el directorio de origen durante la creación de la tabla y, a continuación, procesa los registros recién llegados con cada actualización.
Para evitar la ingesta de datos que ya existen en el directorio de origen en el momento de la creación de la tabla, establezca la opción includeExistingFiles
en false
. Esto significa que solo se procesan los datos que llegan al directorio después de que se procese la creación de la tabla. Por ejemplo:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Actualización completa de una tabla de streaming
Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no mantengan todo el historial de los datos o tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.
Por ejemplo:
REFRESH STREAMING TABLE my_bronze_table FULL
Programar una tabla de streaming para la actualización automática
Para configurar una tabla de streaming de modo que se actualice automáticamente según una programación definida, pegue lo siguiente en el editor de consultas y, a continuación, haga clic en Ejecutar:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Para ver consultas de programación de actualización de ejemplo, consulte ALTER STREAMING TABLE.
Seguir el estado de una actualización
Para ver el estado de una actualización de tabla de streaming, puede ver la canalización que administra la tabla de streaming en la interfaz de usuario de Delta Live Tables o ver la Información de actualización devuelta por el comando DESCRIBE EXTENDED
para la tabla de streaming.
DESCRIBE EXTENDED <table-name>
Ingesta de streaming desde Kafka
Para obtener un ejemplo de ingesta de streaming desde Kafka, consulte read_kafka.
Conceder a los usuarios acceso a una tabla de streaming
Para conceder a los usuarios el privilegio SELECT
en la tabla de streaming para que puedan consultarla, pegue lo siguiente en el editor de consultas y, a continuación, haga clic en Ejecutar:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Para más información sobre la concesión de privilegios en objetos protegibles de Unity Catalog, consulte Privilegios de Unity Catalog y objetos protegibles.
eliminar permanentemente registros de una tabla de streaming
Importante
La compatibilidad con la instrucción REORG
con tablas de streaming se encuentra en versión preliminar pública.
Nota
- El uso de una instrucción
REORG
con una tabla de streaming requiere Databricks Runtime 15.4 y versiones posteriores. - Aunque puede usar la instrucción
REORG
con cualquier tabla de transmisión, solo es necesario al eliminar registros de una tabla de transmisión cuando están habilitados los vectores de eliminación . El comando no tiene ningún efecto cuando se usa con una tabla de streaming sin vectores de eliminación habilitados.
Para eliminar físicamente los registros del almacenamiento subyacente para una tabla de streaming con vectores de eliminación habilitados, como en el caso del cumplimiento del RGPD, se deben llevar a cabo tareas adicionales para asegurarse de que una operación de VACUUM se ejecute en los datos de la tabla de streaming.
A continuación se describen estos pasos con más detalle:
- Actualice los registros o elimine los registros de la tabla de streaming.
- Ejecute una instrucción
REORG
en la tabla de streaming y especifique el parámetroAPPLY (PURGE)
. Por ejemplo,REORG TABLE <streaming-table-name> APPLY (PURGE);
. - Espere a que pase el período de retención de datos de la tabla de streaming. El período de retención de datos predeterminado es siete días, pero se puede configurar con la propiedad
delta.deletedFileRetentionDuration
tabla. Vea Configuración de la retención de datos para las consultas de viaje en el tiempo. REFRESH
la tabla de streaming. Consulte Actualizar una tabla de streaming mediante una canalización DLT. En un plazo de 24 horas de la operación deREFRESH
, las tareas de mantenimiento de Delta Live Tables, incluida la operación deVACUUM
necesaria para asegurarse de que los registros se eliminan permanentemente, se ejecutan automáticamente. Consulte Tareas de mantenimiento realizadas por Delta Live Tables.
Supervisar ejecuciones mediante el historial de consultas
Puede usar la página del historial de consultas para acceder a los detalles de la consulta y a los perfiles de consulta que pueden ayudarle a identificar consultas con un rendimiento deficiente y cuellos de botella en la canalización delta Live Tables que se usa para ejecutar las actualizaciones de la tabla de streaming. Para obtener información general sobre el tipo de información disponible en los historiales de consultas y los perfiles de consulta, consulte Historial de consultas y Perfil de consulta.
Importante
Esta característica está en versión preliminar pública. Los administradores de áreas de trabajo pueden habilitar esta característica desde la página Versiones preliminares. Consulte Administración de las versiones preliminares de Azure Databricks.
Todas las instrucciones relacionadas con las tablas de streaming aparecen en el historial de consultas. Puede usar el filtro desplegable de instrucción para seleccionar cualquier comando e inspeccionar las consultas relacionadas. Todas las instrucciones CREATE
van seguidas de una instrucción REFRESH
que se ejecuta de forma asincrónica en una canalización de Delta Live Tables. Las instrucciones REFRESH
suelen incluir planes de consulta detallados que proporcionan información sobre la optimización del rendimiento.
Para acceder a las instrucciones REFRESH
en la interfaz de usuario del historial de consultas, siga estos pasos:
- Haga clic en
en la barra lateral izquierda para abrir la interfaz de usuario Historial de consultas.
- Seleccione la casilla de verificación REFRESH del filtro desplegable de la Instrucción.
- Haga clic en el nombre de la instrucción de consulta para ver los detalles de resumen, como la duración de la consulta y las métricas agregadas.
- Haga clic en Ver perfil de consulta para abrir el perfil de consulta. Consulte Perfil de consulta para obtener más información sobre cómo navegar por el perfil de consulta.
- Opcionalmente, puede usar los vínculos de la sección Origen de consulta para abrir la consulta o canalización relacionada.
También puede acceder a los detalles de la consulta mediante vínculos en el editor de SQL o desde un cuaderno asociado a un almacén de SQL.