Sdílet prostřednictvím


Co je režim oznámení souboru automatického zavaděče?

V režimu oznámení souborů automaticky zavaděč nastaví službu oznámení a službu fronty, která se přihlásí k odběru událostí souborů ze vstupního adresáře. Oznámení o souborech můžete použít ke škálování automatického zavaděče na ingestování milionů souborů za hodinu. Ve srovnání s režimem výpisu adresářů je režim oznámení souboru výkonnější a škálovatelný pro velké vstupní adresáře nebo velký objem souborů, ale vyžaduje další cloudová oprávnění.

Mezi oznámeními o souborech a výpisem adresářů můžete kdykoli přepínat a přitom zachovat záruky zpracování dat přesně jednou.

Poznámka:

Režim oznámení souborů není pro účty Azure Premium Storage podporovaný, protože účty Premium nepodporují službu Queue Storage.

Upozorňující

Změna zdrojové cesty pro automatický zavaděč není podporována v režimu oznámení souboru. Pokud se použije režim oznámení souboru a cesta se změní, může se stát, že se nepodaří načíst soubory, které jsou již v novém adresáři v době změny adresáře update.

Režim oznámení o souboru se podporuje jenom na výpočetních prostředcích jednoho uživatele.

Cloudové prostředky používané v režimu oznámení souborů automatického zavaděče

Důležité

K automatické konfiguraci cloudové infrastruktury pro režim oznámení souborů potřebujete zvýšená oprávnění. Obraťte se na správce cloudu nebo správce pracovního prostoru. Vidět:

Automatický zavaděč může automaticky set oznámení o souborech, když set možnost cloudFiles.useNotificationstrue a poskytne potřebná oprávnění k vytváření cloudových prostředků. Kromě toho možná budete muset poskytnout další možnosti pro grant autorizaci automatického zavaděče pro vytvoření těchto prostředků.

Následující table shrnuje, které prostředky jsou vytvořeny Auto Loaderem.

Cloudové úložiště Předplatná služba Služba front Předpona* Limit **
Amazon S3 AWS SNS AWS SQS Automatické ingestování databricks 100 na kbelík S3
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 500 na účet úložiště
GCS Google Pub/Sub Google Pub/Sub Automatické ingestování databricks 100 na kbelík GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 500 na účet úložiště
  • Automaticky zavaděč pojmenuje prostředky s touto předponou.

** Kolik souběžných kanálů oznámení o souborech je možné spustit

Pokud potřebujete pro daný účet úložiště spustit více než omezený počet kanálů oznámení o souborech, můžete:

  • Využijte službu, jako je AWS Lambda, Azure Functions nebo Google Cloud Functions, k rozdělení oznámení z jedné fronty, která naslouchá celému kontejneru nebo kbelíku, do front, které jsou specifické pro jednotlivé adresáře.

Události oznámení souboru

Amazon S3 poskytuje událost ObjectCreated, když se soubor nahraje do S3 bucketu, bez ohledu na to, zda byl nahrán jednou operací nebo vícedílným nahráváním.

ADLS Gen2 poskytuje různá oznámení událostí pro soubory, které se zobrazují v kontejneru Gen2.

  • Automatické zavaděče naslouchá FlushWithClose události pro zpracování souboru.
  • Streamy automatického zavaděče RenameFile podporují akci zjišťování souborů. RenameFile akce vyžadují, aby byl na systém úložiště zaslán požadavek rozhraní API k get velikost přejmenovaného souboru.
  • Automatické streamy zavaděče vytvořené pomocí Databricks Runtime 9.0 a po podpoře RenameDirectory akce zjišťování souborů. API požadavky na systém úložiště jsou požadovány pro RenameDirectory akce k list obsahu přejmenovaného adresáře.

Google Cloud Storage poskytuje OBJECT_FINALIZE událost při nahrání souboru, který zahrnuje přepsání a kopie souborů. Neúspěšná nahrání nezpůsobí tuto událost generate.

Poznámka:

Cloud providers nezaručuje 100% doručení všech událostí souborů za velmi vzácných podmínek a neposkytuje přísné smlouvy SLA týkající se latence událostí souboru. Databricks doporučuje aktivovat pravidelné zavaděče automatického zavaděče pomocí cloudFiles.backfillInterval možnosti zaručit, že se všechny soubory v rámci dané smlouvy SLA zjistí, pokud je požadavek na dokončení dat. Aktivace pravidelných backfillů nezpůsobí duplicity.

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro ADLS Gen2 a Azure Blob Storage

Pro vstupní adresář musíte mít oprávnění ke čtení. Viz Azure Blob Storage.

Pokud chcete použít režim oznámení souboru, musíte poskytnout ověřovací credentials pro nastavení a přístup ke službám oznámení událostí.

K ověřování potřebujete pouze instanční objekt.

  • Instanční objekt – použití předdefinovaných rolí Azure

    Vytvořte aplikaci a instanční objekt Microsoft Entra ID (dříve Azure Active Directory) ve formě ID klienta a tajného klíče klienta.

    Přiřaďte této aplikaci následující role k účtu úložiště, ve kterém se nachází vstupní cesta:

    • Přispěvatel: Tato role slouží k nastavení prostředků ve vašem účtu úložiště, jako jsou fronty a odběry událostí.
    • Přispěvatel dat fronty úložiště: Tato role slouží k provádění operací front, jako je načítání a odstraňování zpráv z front. Tato role se vyžaduje pouze v případě, že zadáte instanční objekt bez připojovací řetězec.

    Přiřaďte této aplikaci následující roli ke související skupině prostředků:

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

  • Hlavní služba – použití vlastní role

    Pokud máte obavy o nadměrná oprávnění požadovaná pro předchozí role, můžete vytvořit vlastní roli s alespoň následujícími oprávněními, která jsou uvedená níže ve formátu JSON role Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Potom můžete této vlastní roli přiřadit k aplikaci.

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

nastavení oprávnění automatického nakladače

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro Amazon S3

Pro vstupní adresář musíte mít oprávnění ke čtení. Další podrobnosti najdete v podrobnostech o připojení S3.

Pokud chcete použít režim oznámení souboru, připojte k uživateli nebo roli IAM následující dokument zásad JSON.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

where:

  • <bucket-name>: Název kontejneru S3 where stream bude číst soubory, například auto-logs. Můžete použít * například databricks-*-logsjako zástupný znak . Pokud chcete zjistit základní kontejner S3 pro cestu DBFS, můžete list všechny přípojné body DBFS v poznámkovém bloku spuštěním %fs mounts.
  • <region>: Oblast AWS where kontejnerU S3, například us-west-2. Pokud nechcete zadávat oblast, použijte *.
  • <account-number>: Číslo účtu AWS, které vlastní kbelík S3, 123456789012například . Pokud nechcete zadat číslo účtu, použijte *.

Řetězec databricks-auto-ingest-* ve specifikaci SQS a SNS ARN je předpona názvu, kterou cloudFiles zdroj používá při vytváření služeb SQS a SNS. Vzhledem k tomu, že Azure Databricks nastaví služby oznámení v počátečním spuštění streamu, můžete po počátečním spuštění použít zásadu s omezenými oprávněními (například zastavit stream a poté ho restartovat).

Poznámka:

Předchozí zásada se zabývá pouze oprávněními potřebnými k nastavení služeb oznámení souborů, konkrétně služby S3 bucket notification, SNS a SQS a předpokládá, že už máte přístup pro čtení do kontejneru S3. Pokud potřebujete přidat oprávnění jen pro čtení S3, přidejte do Actionlist v příkazu DatabricksAutoLoaderSetup v dokumentu JSON následující:

  • s3:ListBucket
  • s3:GetObject

Omezená oprávnění po počátečním nastavení

Výše popsaná oprávnění k nastavení prostředků se vyžadují pouze při počátečním spuštění datového proudu. Po prvním spuštění můžete přepnout na následující zásady IAM s omezenými oprávněními.

Důležité

S omezenými oprávněními nemůžete v případě selhání spustit nové dotazy streamování ani znovu vytvořit prostředky (například fronta SQS byla omylem odstraněna); Nemůžete také použít rozhraní API pro správu cloudových prostředků k list ani k odstraňování prostředků.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Požadovaná oprávnění ke konfiguraci oznámení o souboru pro službu GCS

Ke kontejneru GCS a ke všem objektům musíte mít list oprávnění get . Podrobnosti najdete v dokumentaci Google k oprávněním IAM.

Pokud chcete použít režim oznámení souborů, musíte přidat oprávnění pro účet služby GCS a účet použitý pro přístup k prostředkům Google Cloud Pub/Sub.

Pub/Sub Publisher Přidejte roli do účtu služby GCS. Účet tak může publikovat zprávy oznámení o událostech z kontejnerů GCS do Google Cloud Pub/Sub.

Pokud jde o účet služby používaný pro prostředky Google Cloud Pub/Sub, musíte přidat následující oprávnění:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

K tomu můžete buď vytvořit vlastní roli IAM s těmito oprávněními, nebo přiřadit existující role GCP k pokrytí těchto oprávnění.

Vyhledání účtu služby GCS

V konzole Google Cloud Console pro odpovídající projekt přejděte na Cloud Storage > Settings. Část "Účet služby cloudového úložiště" obsahuje e-mail účtu služby GCS.

Účet služby GCS

Vytvoření vlastní role IAM cloudu Google pro režim oznámení souborů

V konzole Google Cloud pro odpovídající projekt přejděte na IAM & Admin > Roles. Pak buď vytvořte roli v horní části, nebo update existující roli. Na obrazovce pro vytvoření nebo úpravu role klikněte na Add Permissions. Zobrazí se nabídka, ve které můžete přidat požadovaná oprávnění k roli.

Vlastní role GCP IAM

Ruční konfigurace nebo správa prostředků oznámení o souborech

Privilegovaní uživatelé můžou ručně konfigurovat nebo spravovat prostředky oznámení o souborech.

  • Set nastavte služby oznámení o souborech ručně prostřednictvím poskytovatele cloudu a následně ručně určete frontu identifier. Další podrobnosti najdete v tématu Možnosti oznámení o souboru.
  • Rozhraní SCALA API slouží k vytvoření nebo správě oznámení a služeb řazení do front, jak je znázorněno v následujícím příkladu:

Poznámka:

Ke konfiguraci nebo úpravě cloudové infrastruktury musíte mít příslušná oprávnění. Prohlédni si dokumentaci k oprávněním pro Azure, S3 nebo GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Slouží setUpNotificationServices(<resource-suffix>) k vytvoření fronty a předplatného s názvem <prefix>-<resource-suffix> (předpona závisí na systému úložiště shrnutého v cloudových prostředcích používaných v režimu oznámení souboru automatického zavaděče. Pokud existuje existující prostředek se stejným názvem, Azure Databricks znovu použije existující prostředek místo vytvoření nového prostředku. Tato funkce vrátí frontu identifier, kterou můžete předat zdroji cloudFiles použitím identifier v oznámení možností souboru . To umožňuje zdrojovému cloudFiles uživateli mít méně oprávnění než uživatel, který prostředky vytvoří.

Zadejte možnost "path" pouze v případě, že volání newManagernení nutné nebo setUpNotificationServiceslistNotificationServices.tearDownNotificationServices To je stejné path , jaké používáte při spouštění streamovacího dotazu.

Následující matice označuje, které metody rozhraní API jsou podporované v tom, ve kterých databricks Runtime pro každý typ úložiště:

Cloudové úložiště Nastavení rozhraní API List API Roztrhání rozhraní API
Amazon S3 Všechny verze Všechny verze Všechny verze
ADLS Gen2 Všechny verze Všechny verze Všechny verze
GCS Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější
Azure Blob Storage Všechny verze Všechny verze Všechny verze
ADLS Gen1 Nepodporované Nepodporované Nepodporované

Řešení běžných chyb

Tato část popisuje běžné chyby při použití Auto Loader v režimu upozornění na soubory a jejich řešení.

Vytvoření odběru Event Gridu se nezdařilo.

Pokud při prvním spuštění Auto Loaderu uvidíte následující chybovou zprávu, znamená to, že Event Grid není v rámci předplatného Azure registrován jako Poskytovatel prostředků.

java.lang.RuntimeException: Failed to create event grid subscription.

Pokud chcete službu Event Grid zaregistrovat jako poskytovatele prostředků, postupujte takto:

  1. Na webu Azure Portal přejděte do svého předplatného.
  2. V části Nastavení klikněte na Prostředek Providers.
  3. Zaregistrujte poskytovatele Microsoft.EventGrid.

Autorizace požadovaná k provádění operací odběru služby Event Grid

Pokud se při prvním spuštění nástroje Auto Loader zobrazí následující chybová zpráva, ověřte, že je role přispěvatele přiřazena k hlavní instanci pro Event Grid a úložišti.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Klient Event Grid obchází proxy server

Ve Databricks Runtime 15.2 a novějších používá Event Grid connections ve službě Auto Loader ve výchozím nastavení nastavení proxy serveru z vlastností systému. V Databricks Runtime 13.3 LTS, 14.3 LTS a 15.0 až 15.2 můžete ručně nakonfigurovat Event Grid connections k použití proxy tím, že nastavíte vlastnost Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Viz Set vlastnosti konfigurace Sparku ve službě Azure Databricks.