Co je režim oznámení souboru automatického zavaděče?
V režimu oznámení souborů automaticky zavaděč nastaví službu oznámení a službu fronty, která se přihlásí k odběru událostí souborů ze vstupního adresáře. Oznámení o souborech můžete použít ke škálování automatického zavaděče na ingestování milionů souborů za hodinu. Ve srovnání s režimem výpisu adresářů je režim oznámení souboru výkonnější a škálovatelný pro velké vstupní adresáře nebo velký objem souborů, ale vyžaduje další cloudová oprávnění.
Mezi oznámeními o souborech a výpisem adresářů můžete kdykoli přepínat a přitom zachovat záruky zpracování dat přesně jednou.
Poznámka:
Režim oznámení souborů není pro účty Azure Premium Storage podporovaný, protože účty Premium nepodporují službu Queue Storage.
Upozorňující
Změna zdrojové cesty pro automatický zavaděč není podporována v režimu oznámení souboru. Pokud se použije režim oznámení souboru a cesta se změní, může se stát, že se nepodaří načíst soubory, které jsou již v novém adresáři v době změny adresáře update.
Režim oznámení o souboru se podporuje jenom na výpočetních prostředcích jednoho uživatele.
Cloudové prostředky používané v režimu oznámení souborů automatického zavaděče
Důležité
K automatické konfiguraci cloudové infrastruktury pro režim oznámení souborů potřebujete zvýšená oprávnění. Obraťte se na správce cloudu nebo správce pracovního prostoru. Vidět:
Automatický zavaděč může automaticky set oznámení o souborech, když set možnost cloudFiles.useNotifications
true
a poskytne potřebná oprávnění k vytváření cloudových prostředků. Kromě toho možná budete muset poskytnout další možnosti pro grant autorizaci automatického zavaděče pro vytvoření těchto prostředků.
Následující table shrnuje, které prostředky jsou vytvořeny Auto Loaderem.
Cloudové úložiště | Předplatná služba | Služba front | Předpona* | Limit ** |
---|---|---|---|---|
Amazon S3 | AWS SNS | AWS SQS | Automatické ingestování databricks | 100 na kbelík S3 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 na účet úložiště |
GCS | Google Pub/Sub | Google Pub/Sub | Automatické ingestování databricks | 100 na kbelík GCS |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | 500 na účet úložiště |
- Automaticky zavaděč pojmenuje prostředky s touto předponou.
** Kolik souběžných kanálů oznámení o souborech je možné spustit
Pokud potřebujete pro daný účet úložiště spustit více než omezený počet kanálů oznámení o souborech, můžete:
- Využijte službu, jako je AWS Lambda, Azure Functions nebo Google Cloud Functions, k rozdělení oznámení z jedné fronty, která naslouchá celému kontejneru nebo kbelíku, do front, které jsou specifické pro jednotlivé adresáře.
Události oznámení souboru
Amazon S3 poskytuje událost ObjectCreated
, když se soubor nahraje do S3 bucketu, bez ohledu na to, zda byl nahrán jednou operací nebo vícedílným nahráváním.
ADLS Gen2 poskytuje různá oznámení událostí pro soubory, které se zobrazují v kontejneru Gen2.
- Automatické zavaděče naslouchá
FlushWithClose
události pro zpracování souboru. - Streamy automatického zavaděče
RenameFile
podporují akci zjišťování souborů.RenameFile
akce vyžadují, aby byl na systém úložiště zaslán požadavek rozhraní API k get velikost přejmenovaného souboru. - Automatické streamy zavaděče vytvořené pomocí Databricks Runtime 9.0 a po podpoře
RenameDirectory
akce zjišťování souborů. API požadavky na systém úložiště jsou požadovány proRenameDirectory
akce k list obsahu přejmenovaného adresáře.
Google Cloud Storage poskytuje OBJECT_FINALIZE
událost při nahrání souboru, který zahrnuje přepsání a kopie souborů. Neúspěšná nahrání nezpůsobí tuto událost generate.
Poznámka:
Cloud providers nezaručuje 100% doručení všech událostí souborů za velmi vzácných podmínek a neposkytuje přísné smlouvy SLA týkající se latence událostí souboru. Databricks doporučuje aktivovat pravidelné zavaděče automatického zavaděče pomocí cloudFiles.backfillInterval
možnosti zaručit, že se všechny soubory v rámci dané smlouvy SLA zjistí, pokud je požadavek na dokončení dat. Aktivace pravidelných backfillů nezpůsobí duplicity.
Požadovaná oprávnění pro konfiguraci oznámení o souboru pro ADLS Gen2 a Azure Blob Storage
Pro vstupní adresář musíte mít oprávnění ke čtení. Viz Azure Blob Storage.
Pokud chcete použít režim oznámení souboru, musíte poskytnout ověřovací credentials pro nastavení a přístup ke službám oznámení událostí.
K ověřování potřebujete pouze instanční objekt.
Instanční objekt – použití předdefinovaných rolí Azure
Vytvořte aplikaci a instanční objekt Microsoft Entra ID (dříve Azure Active Directory) ve formě ID klienta a tajného klíče klienta.
Přiřaďte této aplikaci následující role k účtu úložiště, ve kterém se nachází vstupní cesta:
- Přispěvatel: Tato role slouží k nastavení prostředků ve vašem účtu úložiště, jako jsou fronty a odběry událostí.
- Přispěvatel dat fronty úložiště: Tato role slouží k provádění operací front, jako je načítání a odstraňování zpráv z front. Tato role se vyžaduje pouze v případě, že zadáte instanční objekt bez připojovací řetězec.
Přiřaďte této aplikaci následující roli ke související skupině prostředků:
- Přispěvatel pro odběr událostí Event Gridu: Tato role slouží k provádění operací odběru služby Azure Event Grid, jako je vytváření nebo seznam odběrů událostí.
Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.
Hlavní služba – použití vlastní role
Pokud máte obavy o nadměrná oprávnění požadovaná pro předchozí role, můžete vytvořit vlastní roli s alespoň následujícími oprávněními, která jsou uvedená níže ve formátu JSON role Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Potom můžete této vlastní roli přiřadit k aplikaci.
Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.
nastavení oprávnění automatického nakladače
Požadovaná oprávnění pro konfiguraci oznámení o souboru pro Amazon S3
Pro vstupní adresář musíte mít oprávnění ke čtení. Další podrobnosti najdete v podrobnostech o připojení S3.
Pokud chcete použít režim oznámení souboru, připojte k uživateli nebo roli IAM následující dokument zásad JSON.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
where:
-
<bucket-name>
: Název kontejneru S3 where stream bude číst soubory, napříkladauto-logs
. Můžete použít*
napříkladdatabricks-*-logs
jako zástupný znak . Pokud chcete zjistit základní kontejner S3 pro cestu DBFS, můžete list všechny přípojné body DBFS v poznámkovém bloku spuštěním%fs mounts
. -
<region>
: Oblast AWS where kontejnerU S3, napříkladus-west-2
. Pokud nechcete zadávat oblast, použijte*
. -
<account-number>
: Číslo účtu AWS, které vlastní kbelík S3,123456789012
například . Pokud nechcete zadat číslo účtu, použijte*
.
Řetězec databricks-auto-ingest-*
ve specifikaci SQS a SNS ARN je předpona názvu, kterou cloudFiles
zdroj používá při vytváření služeb SQS a SNS. Vzhledem k tomu, že Azure Databricks nastaví služby oznámení v počátečním spuštění streamu, můžete po počátečním spuštění použít zásadu s omezenými oprávněními (například zastavit stream a poté ho restartovat).
Poznámka:
Předchozí zásada se zabývá pouze oprávněními potřebnými k nastavení služeb oznámení souborů, konkrétně služby S3 bucket notification, SNS a SQS a předpokládá, že už máte přístup pro čtení do kontejneru S3. Pokud potřebujete přidat oprávnění jen pro čtení S3, přidejte do Action
list v příkazu DatabricksAutoLoaderSetup
v dokumentu JSON následující:
s3:ListBucket
s3:GetObject
Omezená oprávnění po počátečním nastavení
Výše popsaná oprávnění k nastavení prostředků se vyžadují pouze při počátečním spuštění datového proudu. Po prvním spuštění můžete přepnout na následující zásady IAM s omezenými oprávněními.
Důležité
S omezenými oprávněními nemůžete v případě selhání spustit nové dotazy streamování ani znovu vytvořit prostředky (například fronta SQS byla omylem odstraněna); Nemůžete také použít rozhraní API pro správu cloudových prostředků k list ani k odstraňování prostředků.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Požadovaná oprávnění ke konfiguraci oznámení o souboru pro službu GCS
Ke kontejneru GCS a ke všem objektům musíte mít list
oprávnění get
. Podrobnosti najdete v dokumentaci Google k oprávněním IAM.
Pokud chcete použít režim oznámení souborů, musíte přidat oprávnění pro účet služby GCS a účet použitý pro přístup k prostředkům Google Cloud Pub/Sub.
Pub/Sub Publisher
Přidejte roli do účtu služby GCS. Účet tak může publikovat zprávy oznámení o událostech z kontejnerů GCS do Google Cloud Pub/Sub.
Pokud jde o účet služby používaný pro prostředky Google Cloud Pub/Sub, musíte přidat následující oprávnění:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
K tomu můžete buď vytvořit vlastní roli IAM s těmito oprávněními, nebo přiřadit existující role GCP k pokrytí těchto oprávnění.
Vyhledání účtu služby GCS
V konzole Google Cloud Console pro odpovídající projekt přejděte na Cloud Storage > Settings
.
Část "Účet služby cloudového úložiště" obsahuje e-mail účtu služby GCS.
Vytvoření vlastní role IAM cloudu Google pro režim oznámení souborů
V konzole Google Cloud pro odpovídající projekt přejděte na IAM & Admin > Roles
. Pak buď vytvořte roli v horní části, nebo update existující roli. Na obrazovce pro vytvoření nebo úpravu role klikněte na Add Permissions
. Zobrazí se nabídka, ve které můžete přidat požadovaná oprávnění k roli.
Ruční konfigurace nebo správa prostředků oznámení o souborech
Privilegovaní uživatelé můžou ručně konfigurovat nebo spravovat prostředky oznámení o souborech.
- Set nastavte služby oznámení o souborech ručně prostřednictvím poskytovatele cloudu a následně ručně určete frontu identifier. Další podrobnosti najdete v tématu Možnosti oznámení o souboru.
- Rozhraní SCALA API slouží k vytvoření nebo správě oznámení a služeb řazení do front, jak je znázorněno v následujícím příkladu:
Poznámka:
Ke konfiguraci nebo úpravě cloudové infrastruktury musíte mít příslušná oprávnění. Prohlédni si dokumentaci k oprávněním pro Azure, S3 nebo GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Slouží setUpNotificationServices(<resource-suffix>)
k vytvoření fronty a předplatného s názvem <prefix>-<resource-suffix>
(předpona závisí na systému úložiště shrnutého v cloudových prostředcích používaných v režimu oznámení souboru automatického zavaděče. Pokud existuje existující prostředek se stejným názvem, Azure Databricks znovu použije existující prostředek místo vytvoření nového prostředku. Tato funkce vrátí frontu identifier, kterou můžete předat zdroji cloudFiles
použitím identifier v oznámení možností souboru . To umožňuje zdrojovému cloudFiles
uživateli mít méně oprávnění než uživatel, který prostředky vytvoří.
Zadejte možnost "path"
pouze v případě, že volání newManager
není nutné nebo setUpNotificationServices
listNotificationServices
.tearDownNotificationServices
To je stejné path
, jaké používáte při spouštění streamovacího dotazu.
Následující matice označuje, které metody rozhraní API jsou podporované v tom, ve kterých databricks Runtime pro každý typ úložiště:
Cloudové úložiště | Nastavení rozhraní API | List API | Roztrhání rozhraní API |
---|---|---|---|
Amazon S3 | Všechny verze | Všechny verze | Všechny verze |
ADLS Gen2 | Všechny verze | Všechny verze | Všechny verze |
GCS | Databricks Runtime 9.1 a novější | Databricks Runtime 9.1 a novější | Databricks Runtime 9.1 a novější |
Azure Blob Storage | Všechny verze | Všechny verze | Všechny verze |
ADLS Gen1 | Nepodporované | Nepodporované | Nepodporované |
Řešení běžných chyb
Tato část popisuje běžné chyby při použití Auto Loader v režimu upozornění na soubory a jejich řešení.
Vytvoření odběru Event Gridu se nezdařilo.
Pokud při prvním spuštění Auto Loaderu uvidíte následující chybovou zprávu, znamená to, že Event Grid není v rámci předplatného Azure registrován jako Poskytovatel prostředků.
java.lang.RuntimeException: Failed to create event grid subscription.
Pokud chcete službu Event Grid zaregistrovat jako poskytovatele prostředků, postupujte takto:
- Na webu Azure Portal přejděte do svého předplatného.
- V části Nastavení klikněte na Prostředek Providers.
- Zaregistrujte poskytovatele
Microsoft.EventGrid
.
Autorizace požadovaná k provádění operací odběru služby Event Grid
Pokud se při prvním spuštění nástroje Auto Loader zobrazí následující chybová zpráva, ověřte, že je role přispěvatele přiřazena k hlavní instanci pro Event Grid a úložišti.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Klient Event Grid obchází proxy server
Ve Databricks Runtime 15.2 a novějších používá Event Grid connections ve službě Auto Loader ve výchozím nastavení nastavení proxy serveru z vlastností systému. V Databricks Runtime 13.3 LTS, 14.3 LTS a 15.0 až 15.2 můžete ručně nakonfigurovat Event Grid connections k použití proxy tím, že nastavíte vlastnost Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true
. Viz Set vlastnosti konfigurace Sparku ve službě Azure Databricks.