Partager via


Charger des données à l’aide de tables de streaming dans Databricks SQL

Databricks recommande d’utiliser des tables de diffusion en continu pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table inscrite dans le catalogue Unity avec une prise en charge supplémentaire de la diffusion en continu ou du traitement incrémentiel des données. Un pipeline Delta Live Tables est automatiquement créé pour chaque table de streaming. Vous pouvez utiliser des tables de diffusion en continu pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.

Cet article montre comment utiliser des tables de streaming pour charger des données à partir du stockage d’objets cloud configurés en tant que volume Unity Catalog (recommandé) ou emplacement externe.

Remarque

Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.

Important

Les tables de diffusion en continu créées dans Databricks SQL sont prises en charge par un pipeline Delta Live Tables serverless. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.

Avant de commencer

Avant de commencer, vous devez satisfaire aux exigences suivantes.

Exigences pour l’espace de travail :

Exigences de calcul :

Vous devez utiliser une des options suivantes :

  • Un entrepôt SQL qui utilise le canal Current.

  • Calcul avec mode d’accès partagé sur Databricks Runtime 13.3 LTS ou ultérieur.

  • Calcul avec mode d’accès mono-utilisateur sur Databricks Runtime 15.4 LTS ou une version ultérieure.

    Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser le calcul d’un seul utilisateur pour interroger des tables de diffusion en continu appartenant à d’autres utilisateurs. Vous pouvez utiliser le calcul mono-utilisateur sur Databricks Runtime 15.3 et antérieur seulement si vous êtes propriétaire de la table de diffusion en continu. Le créateur de la table est le propriétaire.

    Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge les requêtes sur les tables générées par Delta Live Tables sur le calcul mono-utilisateur, quelle que soit la propriété de la table. Pour tirer parti du filtrage des données fourni dans Databricks Runtime 15.4 LTS et versions ultérieures, vous devez confirmer que votre espace de travail est activé pour le calcul serverless, car la fonctionnalité de filtrage des données qui prend en charge les tables générées par Delta Live Tables s’exécute sur un calcul serverless. Vous pouvez donc être facturé pour des ressources de calcul serverless quand vous utilisez un calcul mono-utilisateur pour exécuter des opérations de filtrage de données. Consulter Contrôle d’accès affiné sur le calcul mono-utilisateur.

Conditions requises pour les autorisations :

  • Privilège READ FILES sur un emplacement externe Unity Catalog. Pour plus d’informations, consultez Créer un emplacement externe pour connecter le stockage cloud à Azure Databricks.
  • Privilège USE CATALOG sur le catalogue dans lequel vous créez la table de streaming.
  • Privilège USE SCHEMA sur le schéma dans lequel vous créez la table de streaming.
  • Privilège CREATE TABLE sur le schéma dans lequel vous créez la table de streaming.

Autres exigences :

  • Chemin d’accès à vos données sources.

    Exemple de chemin d’accès au volume : /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Exemple de chemin d’accès à l’emplacement externe : abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Remarque

    Cet article suppose que les données à charger se trouvent dans un emplacement de stockage cloud qui correspond à un volume de catalogue Unity ou à un emplacement externe auquel vous avez accès.

Découvrir et afficher un aperçu des données sources

  1. Dans la barre latérale de votre espace de travail, cliquez sur Requêtes, puis sur Créer une requête.

  2. Dans l’éditeur de requête, sélectionnez un entrepôt SQL qui utilise le canal Current dans la liste déroulante.

  3. Collez ce qui suit dans l’éditeur, en remplaçant les valeurs entre crochets (<>) pour les informations identifiant vos données sources, puis cliquez sur Exécuter.

    Remarque

    Vous pouvez rencontrer des erreurs d’inférence de schéma lors de l’exécution de la fonction table read_files si les valeurs par défaut de la fonction ne peuvent pas analyser vos données. Par exemple, vous devrez peut-être configurer le mode multiligne pour les fichiers CSV ou JSON multilignes. Pour obtenir la liste des options de l’analyseur, consultez read_files fonction table.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Charger des données dans une table de streaming

Pour créer une table de streaming à partir de données dans le stockage d’objets cloud, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Consultez Configurer le canal d’exécution

Les tables de streaming créées à l’aide d’entrepôts SQL sont automatiquement actualisées à l’aide d’un pipeline Delta Live Tables. Les pipelines Delta Live Tables utilisent le runtime dans le canal current par défaut. Consultez les notes de publication Delta Live Tables et le processus de mise à niveau des versions pour en savoir plus sur le processus de mise en production.

Databricks recommande d’utiliser le canal current pour les charges de travail de production. Les nouvelles fonctionnalités sont d’abord publiées sur le canal preview. Vous pouvez configurer un pipeline vers la chaîne Delta Live Tables en préversion pour tester de nouvelles fonctionnalités, par la spécification de preview comme propriété de table. Vous pouvez spécifier cette propriété lorsque vous créez la table ou après la création de la table à l'aide d'une instruction ALTER.

L’exemple de code suivant montre comment configurer le canal à afficher en préversion dans une instruction CREATE :

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Actualiser une table de streaming à l’aide d’un pipeline DLT

Cette section décrit les modèles d’actualisation d’une table de diffusion en continu avec les données les plus récentes disponibles à partir des sources définies dans la requête.

Lorsque vous CREATE ou REFRESH une table de diffusion en continu, les processus de mise à jour utilisent un pipeline Delta Live Tables serverless. Chaque table de diffusion en continu que vous définissez a un pipeline Delta Live Tables associé.

Après avoir exécuté la commande REFRESH, le lien de pipeline DLT est retourné. Vous pouvez utiliser le lien de pipeline DLT pour vérifier l’état de l’actualisation.

Remarque

Seul le propriétaire de la table peut actualiser une table de flux pour obtenir les données les plus récentes. L’utilisateur qui crée la table est le propriétaire et le propriétaire ne peut pas être modifié. Vous devrez peut-être rafraîchir votre table de diffusion en continu avant d’utiliser des requêtes de voyage dans le temps.

Consultez Qu’est-ce que Delta Live Tables ?.

Ingérer de nouvelles données uniquement

Par défaut, la fonction read_files lit toutes les données existantes dans le répertoire source lors de la création de la table, puis traite les enregistrements nouvellement arrivés avec chaque actualisation.

Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de la table, définissez l’option includeExistingFiles sur false. Cela signifie que seules les données qui arrivent dans le répertoire après la création de la table sont traitées. Exemple :

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Actualiser entièrement une table de diffusion en continu

Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique entier des données ou qui ont des périodes de rétention courtes, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Exemple :

REFRESH STREAMING TABLE my_bronze_table FULL

Planifier une table de diffusion en continu pour l’actualisation automatique

Pour configurer une table de diffusion en continu pour qu’elle s’actualise automatiquement en fonction d’une planification définie, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.

Suivre l’état d’une actualisation

Vous pouvez afficher l’état d’une actualisation d’une table de diffusion en continu en consultant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur Delta Live Tables ou en affichant les informations d’actualisation retournées par la commande DESCRIBE EXTENDED pour la table de diffusion en continu.

DESCRIBE EXTENDED <table-name>

Ingestion en streaming à partir de Kafka

Pour obtenir un exemple d’ingestion en streaming à partir de Kafka, consultez read_kafka.

Accorder aux utilisateurs l’accès à une table de diffusion en continu

Pour accorder aux utilisateurs le privilège SELECT sur la table de diffusion en continu afin qu’ils puissent l’interroger, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Pour plus d’informations sur les privilèges Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.

Supprimer définitivement les enregistrements d’une table de diffusion en continu

Important

La prise en charge de l’instruction REORG avec des tables de diffusion en continu est en préversion publique.

Remarque

  • L’utilisation d’une instruction REORG avec une table de diffusion en continu nécessite Databricks Runtime 15.4 et versions ultérieures.
  • Bien que vous puissiez utiliser l’instruction REORG avec n’importe quelle table de diffusion en continu, il n’est nécessaire que lors de la suppression d’enregistrements d’une table de diffusion en continu avec vecteurs de suppression activés. La commande n’a aucun effet lorsqu’elle est utilisée avec une table de diffusion en continu sans vecteurs de suppression activés.

Pour supprimer physiquement les enregistrements du stockage sous-jacent d'une table de streaming avec des vecteurs de suppression activés, comme pour la conformité RGPD, des étapes supplémentaires doivent être prises pour s'assurer qu'une opération de type VACUUM s'exécute sur les données de la table de streaming.

Les étapes suivantes décrivent ces étapes plus en détail :

  1. Mettez à jour les enregistrements ou supprimez des enregistrements de la table de diffusion en continu.
  2. Exécutez une instruction REORG sur la table de streaming, en spécifiant le paramètre APPLY (PURGE). Par exemple, REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Attendez que la période de conservation des données de la table de streaming soit écoulée. La période de rétention des données par défaut est de sept jours, mais elle peut être configurée avec la propriété de table delta.deletedFileRetentionDuration. Voir Configurer la conservation des données pour des requêtes de voyage dans le temps.
  4. REFRESH la table de diffusion en continu. Consultez Actualiser une table de streaming à l’aide d’un pipeline DLT. Dans les 24 heures de l’opération de REFRESH, les tâches de maintenance Delta Live Tables, y compris l’opération de VACUUM requise pour s’assurer que les enregistrements sont supprimés définitivement, sont exécutées automatiquement. Consultez Tâches de maintenance effectuées par Delta Live Tables.

Le moniteur s'exécute à l'aide de l'historique des requêtes

Vous pouvez utiliser la page d'historique des requêtes pour accéder aux détails des requêtes et aux profils de requête qui peuvent vous aider à identifier les requêtes peu performantes et les goulots d'étranglement dans le pipeline Delta Live Tables utilisé pour exécuter vos mises à jour de table en streaming. Pour obtenir une vue d’ensemble du type d’informations disponibles dans les historiques de requête et les profils de requête, consultez Historique des requêtes et Profil des requêtes.

Important

Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent activer cette fonctionnalité à partir de la page Aperçus. Consultez Gérer les préversions d'Azure Databricks.

Toutes les déclarations liées aux tables de lecture en continu apparaissent dans l’historique des requêtes. Vous pouvez utiliser le filtre déroulant Instruction pour sélectionner n’importe quelle commande et inspecter les requêtes associées. Toutes les instructions CREATE sont suivies d’une instruction REFRESH qui s’exécute de manière asynchrone sur un pipeline Delta Live Tables. Les instructions REFRESH incluent généralement des plans de requête détaillés qui fournissent des insights sur l’optimisation des performances.

Pour accéder aux instructions REFRESH de l’interface utilisateur de l’historique des requêtes, procédez comme suit :

  1. Cliquez sur Icône Historique dans la barre latérale gauche pour ouvrir l’interface utilisateur Historique des requêtes.
  2. Sélectionnez la case à cocher REFRESH dans le filtre déroulant de la déclaration .
  3. Cliquez sur le nom de l'instruction de requête pour afficher les détails récapitulatifs tels que la durée de la requête et les métriques agrégées.
  4. Cliquez sur Afficher le profil de requête pour ouvrir le profil de requête. Pour plus d’informations sur la navigation dans le profil de requête, consultez Profil de requête.
  5. Si vous le souhaitez, vous pouvez utiliser les liens dans la section Source de requête pour ouvrir la requête ou le pipeline associé.

Vous pouvez également accéder aux détails de la requête à l’aide de liens dans l’éditeur SQL ou à partir d’un bloc-notes attaché à un entrepôt SQL.

Ressources supplémentaires