Partager via


Interroger des données de diffusion en continu

Vous pouvez utiliser Azure Databricks pour interroger des sources de données de diffusion en continu à l’aide de Structured Streaming. Azure Databricks fournit une prise en charge étendue des charges de travail de diffusion en continu dans Python et Scala, et prend en charge la plupart des fonctionnalités de diffusion en continu structurée avec SQL.

Les exemples suivants illustrent l’utilisation d’un récepteur de mémoire pour l’inspection manuelle des données de streaming pendant le développement interactif dans les notebooks. En raison des limites de sortie de ligne dans l’interface utilisateur du notebook, il se peut que vous n’observiez pas toutes les données lues par les requêtes de diffusion en continu. Dans les charges de travail de production, vous devriez déclencher des requêtes en streaming uniquement en les écrivant dans une table cible ou un système externe.

Remarque

La prise en charge de SQL pour les requêtes interactives sur les données de diffusion en continu est limitée aux notebooks s’exécutant sur des calculs polyvalents. Vous pouvez également utiliser SQL lorsque vous déclarez des tables de diffusion en continu dans Databricks SQL ou Delta Live Tables. Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL et Qu’est-ce que Delta Live Tables ?

Interroger des données à partir de systèmes de diffusion en continu

Azure Databricks fournit des lecteurs de données de streaming pour les systèmes de diffusion en continu suivants :

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Vous devez fournir des détails de configuration lorsque vous initialisez des requêtes sur ces systèmes, ce qui varie en fonction de votre environnement configuré et du système que vous choisissez pour la lecture. Consultez Configurer des sources de données de diffusion en continu.

Les charges de travail courantes qui impliquent des systèmes de diffusion en continu incluent l’ingestion des données vers le lakehouse et le traitement en continu pour envoyer des données vers des systèmes externes. Pour plus d’informations sur les charges de travail de diffusion en continu, consultez Streaming sur Azure Databricks.

Les exemples suivants illustrent une lecture interactive en continu à partir de Kafka :

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Interroger une table en tant que lecture en continu

Azure Databricks crée toutes les tables à l’aide de Delta Lake par défaut. Lorsque vous effectuez une requête de diffusion en continu sur une table Delta, la requête récupère automatiquement de nouveaux enregistrements lorsqu’une version de la table est validée. Par défaut, les requêtes de diffusion en continu s’attendent à ce que les tables sources contiennent uniquement des enregistrements ajoutés. Si vous avez besoin de travailler avec des données de diffusion en continu qui contiennent des mises à jour et des suppressions, Databricks recommande d’utiliser Delta Live Tables et APPLY CHANGES INTO. Consultez les API APPLIQUER LES MODIFICATIONS : Simplifiez la capture des données modifiées avec Delta Live Tables.

Les exemples suivants démontrent comment effectuer une lecture interactive en continu à partir d'une table :

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Interroger des données dans le stockage d’objets cloud avec le chargeur automatique

Vous pouvez diffuser en continu des données à partir du stockage d’objets cloud à l’aide du chargeur automatique, le connecteur de données cloud Azure Databricks. Vous pouvez utiliser le connecteur avec des fichiers stockés dans des volumes de catalogue Unity ou d’autres emplacements de stockage d’objets cloud. Databricks recommande d’utiliser des volumes pour gérer l’accès aux données dans le stockage d’objets cloud. Consultez Se connecter aux sources de données.

Azure Databricks optimise ce connecteur pour l’ingestion en continu de données dans le stockage d’objets cloud stockés dans des formats structurés, semi-structurés et non structurés populaires. Databricks recommande de stocker les données ingérées dans un format presque brut pour optimiser le débit et réduire la perte de données potentielle en raison d’enregistrements endommagés ou de modifications de schéma.

Pour plus de recommandations sur l’ingestion de données à partir du stockage d’objets cloud, consultez Ingérer des données dans un lakehouse Databricks.

Les exemples suivants illustrent une lecture interactive en continu à partir d’un répertoire de fichiers JSON dans un volume :

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')