Compartir a través de


Consulta de datos de streaming

Puede usar Azure Databricks para consultar orígenes de datos de streaming mediante flujo estructurado. Azure Databricks proporciona una amplia compatibilidad con cargas de trabajo de streaming en Python y Scala, y admite la mayoría de las funcionalidades de flujo estructurado con SQL.

En los ejemplos siguientes, se muestra el uso de un receptor de memoria para la inspección manual de datos de streaming durante el desarrollo interactivo en cuadernos. Debido a los límites de salida de fila en la interfaz de usuario del cuaderno, es posible que no observe todos los datos leídos por consultas de streaming. En las cargas de trabajo de producción, solo se deberían desencadenar consultas de streaming escribiéndolas en una tabla de destino o en un sistema externo.

Nota:

La compatibilidad de SQL con consultas interactivas en datos de streaming se limita a los cuadernos que se ejecuten en un proceso multiuso. También es posible usar SQL al declarar tablas de streaming en Databricks SQL o Delta Live Tables. Consulte Carga de datos mediante tablas de secuencia en Databricks SQL y ¿Qué es Delta Live Tables?.

Consulta de datos de sistemas de streaming

Azure Databricks proporciona lectores de datos de streaming para los siguientes sistemas de streaming:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Es necesario proporcionar detalles de configuración al inicializar consultas en estos sistemas, que varían en función del entorno configurado y del sistema del que elija leer. Consulte Configuración de orígenes de datos de streaming.

Las cargas de trabajo comunes que implican sistemas de streaming incluyen la ingesta de datos en almacenes de lago y el procesamiento de transmisiones para receptores de datos en sistemas externos. Para obtener más información sobre las cargas de trabajo de streaming, consulte Streaming en Azure Databricks.

En los ejemplos siguientes, se muestra una lectura interactiva de streaming de Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Consulta de una tabla como lectura de streaming

Azure Databricks crea todas las tablas con Delta Lake de forma predeterminada. Al realizar una consulta de streaming en una tabla Delta, la consulta recoge automáticamente nuevos registros cuando se confirma una versión de la tabla. De manera predeterminada, las consultas de streaming esperan que las tablas de origen solo contengan registros anexados. Si necesita trabajar con datos de streaming que contengan actualizaciones y eliminaciones, Databricks recomienda usar Delta Live Tables y APPLY CHANGES INTO. Consulte API APPLY CHANGES: simplificación de la captura de datos modificados con Delta Live Tables.

En los ejemplos siguientes, se muestra cómo realizar una lectura interactiva de streaming de una tabla:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Consulta de datos en el almacenamiento de objetos en la nube con el cargador automático

Es posible transmitir datos desde el almacenamiento de objetos en la nube con el cargador automático, el conector de datos en la nube de Azure Databricks. Es posible usar el conector con archivos almacenados en volúmenes del catálogo de Unity u otras ubicaciones de almacenamiento de objetos en la nube. Databricks recomienda usar volúmenes para administrar el acceso a los datos en el almacenamiento de objetos en la nube. Consulte Conexión a orígenes de datos.

Azure Databricks optimiza este conector para la ingesta de streaming de datos en el almacenamiento de objetos en la nube que se almacena en formatos estructurados, semiestructurados y no estructurados populares. Databricks recomienda almacenar datos ingeridos en un formato casi sin procesar para maximizar el rendimiento y minimizar la posible pérdida de datos debido a registros dañados o cambios de esquema.

Para obtener más recomendaciones sobre la ingesta de datos desde el almacenamiento de objetos en la nube, consulte Ingesta de datos en una instancia de Databricks Lakehouse.

En los ejemplos siguientes, se muestra una lectura interactiva de streaming desde un directorio de archivos JSON en un volumen:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')