Che cos'è la modalità di notifica file del caricatore automatico?
In modalità di notifica file, il caricatore automatico configura automaticamente un servizio di notifica e un servizio di accodamento che sottoscrive gli eventi di file dalla directory di input. È possibile usare le notifiche dei file per ridimensionare il caricatore automatico per inserire milioni di file un'ora. Rispetto alla modalità elenco directory, la modalità di notifica dei file è più efficiente e scalabile per directory di input di grandi dimensioni o un volume elevato di file, ma richiede autorizzazioni cloud aggiuntive.
È possibile passare tra le notifiche dei file e l'elenco di directory in qualsiasi momento e mantenere comunque garanzie di elaborazione dei dati esattamente una volta.
Nota
La modalità di notifica file non è supportata per gli account di archiviazione Premium di Azure perché gli account Premium non supportano l'archiviazione code.
Avviso
La modifica del percorso di origine per il caricatore automatico non è supportata per la modalità di notifica file. Se viene usata la modalità di notifica file e il percorso viene modificato, è possibile che non sia possibile inserire file già presenti nella nuova directory al momento della directory update.
La modalità di notifica file è supportata solo nel calcolo utente singolo.
Risorse cloud usate nella modalità di notifica file del caricatore automatico
Importante
Sono necessarie autorizzazioni elevate per configurare automaticamente l'infrastruttura cloud per la modalità di notifica file. Contattare l'amministratore del cloud o l'amministratore dell'area di lavoro. Vedere:
Il caricatore automatico può set automaticamente le notifiche sui file quando si set l'opzione cloudFiles.useNotifications
per true
e fornire le autorizzazioni necessarie per creare risorse cloud. Inoltre, potrebbe essere necessario fornire opzioni aggiuntive per grant autorizzazione del caricatore automatico per creare queste risorse.
Il seguente table riepiloga le risorse create da Auto Loader.
Archiviazione cloud | Servizio di sottoscrizione | Servizio di accodamento | Prefisso* | Limit ** |
---|---|---|---|---|
Amazon S3 | AWS SNS | AWS SQS | inserimento automatico di databricks | 100 per bucket S3 |
ADLS Gen2 | Griglia di eventi di Azure | Archiviazione code di Azure | databricks | 500 per account di archiviazione |
GCS | Google Pub/Sub | Google Pub/Sub | inserimento automatico di databricks | 100 per bucket GCS |
Archiviazione BLOB di Azure | Griglia di eventi di Azure | Archiviazione code di Azure | databricks | 500 per account di archiviazione |
- Il caricatore automatico assegna un nome alle risorse con questo prefisso.
** Quante pipeline di notifica file simultanee possono essere avviate
Se è necessario eseguire più di un numero limitato di pipeline di notifica file per un determinato account di archiviazione, è possibile:
- Utilizzare un servizio come AWS Lambda, Funzioni di Azure o Google Cloud Functions per distribuire notifiche da una singola coda che monitora un intero contenitore o bucket verso code specifiche di directory.
Eventi di notifica dei file
Amazon S3 fornisce un evento ObjectCreated
quando un file viene caricato in un bucket S3 indipendentemente dal fatto che sia stato caricato da un caricamento put o in più parti.
ADLS Gen2 fornisce notifiche di eventi diverse per i file visualizzati nel contenitore Gen2.
- Il caricatore automatico è in ascolto dell'evento per l'elaborazione
FlushWithClose
di un file. - I flussi del caricatore automatico supportano l'azione per l'individuazione
RenameFile
dei file.RenameFile
azioni richiedono una richiesta API al sistema di archiviazione per get le dimensioni del file rinominato. - Flussi del caricatore automatico creati con Databricks Runtime 9.0 e dopo aver supportato l'azione per l'individuazione
RenameDirectory
dei file. Le azioniRenameDirectory
richiedono richieste API al sistema di archiviazione per list il contenuto della directory rinominata.
Google Cloud Storage fornisce un OBJECT_FINALIZE
evento quando viene caricato un file, che include sovrascrizioni e copie di file. I caricamenti non riusciti non generate questo evento.
Nota
Il cloud providers non garantisce il 100% recapito di tutti gli eventi dei file in condizioni estremamente rare e non fornisce SLA rigorosi sulla latenza degli eventi dei file. Databricks consiglia di attivare normali backfill con il caricatore automatico usando l'opzione cloudFiles.backfillInterval
per garantire che tutti i file vengano individuati all'interno di un determinato contratto di servizio se il completamento dei dati è un requisito. L'attivazione di backfill regolari non causa duplicati.
Autorizzazioni necessarie per la configurazione della notifica file per ADLS Gen2 e Archiviazione BLOB di Azure
È necessario disporre delle autorizzazioni di lettura per la directory di input. Vedere Archiviazione BLOB di Azure.
Per usare la modalità di notifica file, è necessario fornire l'autenticazione credentials per la configurazione e l'accesso ai servizi di notifica degli eventi.
Per l'autenticazione è necessaria solo un'entità servizio.
Entità servizio: uso dei ruoli predefiniti di Azure
Creare un'app e un'entità servizio Microsoft Entra ID (in precedenza Azure Active Directory) sotto forma di ID client e segreto client.
Assegnare questa app ai ruoli seguenti all'account di archiviazione in cui risiede il percorso di input:
- Collaboratore: questo ruolo è per la configurazione delle risorse nell'account di archiviazione, ad esempio code e sottoscrizioni di eventi.
- Collaboratore ai dati della coda di archiviazione: questo ruolo è per l'esecuzione di operazioni di accodamento, ad esempio il recupero e l'eliminazione di messaggi dalle code. Questo ruolo è obbligatorio solo quando si fornisce un'entità servizio senza un stringa di connessione.
Assegnare questa app al gruppo di risorse correlato:
- EventGrid EventSubscription Collaboratore: Questo ruolo consiste nell'eseguire operazioni di sottoscrizione di Azure Event Grid, ad esempio la creazione o l'elenco delle sottoscrizioni di eventi.
Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.
Principale del servizio - uso di un ruolo personalizzato
Se si riguardano le autorizzazioni eccessive necessarie per i ruoli precedenti, è possibile creare un ruolo personalizzato con almeno le autorizzazioni seguenti, elencate di seguito in formato JSON del ruolo di Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
È quindi possibile assegnare questo ruolo personalizzato all'app.
Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.
Autorizzazioni necessarie per la configurazione della notifica dei file per Amazon S3
È necessario disporre delle autorizzazioni di lettura per la directory di input. Per altri dettagli, vedere Dettagli sulla connessione S3.
Per usare la modalità di notifica dei file, allegare il documento dei criteri JSON seguente all'utente o al ruolo IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
where:
-
<bucket-name>
: il nome del bucket S3 where, da cui il flusso leggerà i file, ad esempioauto-logs
. È possibile usare*
come carattere jolly, ad esempiodatabricks-*-logs
. Per individuare il bucket S3 sottostante per il percorso DBFS, è possibile list tutti i punti di montaggio DBFS in un notebook eseguendo%fs mounts
. -
<region>
: l'area AWS where il bucket S3 risiede, ad esempio,us-west-2
. Se non si vuole specificare l'area, usare*
. -
<account-number>
: numero di account AWS proprietario del bucket S3,123456789012
ad esempio . Se non si vuole specificare il numero di account, usare*
.
La stringa databricks-auto-ingest-*
nella specifica SQS e ARN SNS è il prefisso del nome usato dall'origine durante la cloudFiles
creazione di servizi SQS e SNS. Poiché Azure Databricks configura i servizi di notifica nell'esecuzione iniziale del flusso, è possibile usare un criterio con autorizzazioni ridotte dopo l'esecuzione iniziale, ad esempio arrestare il flusso e quindi riavviarlo.
Nota
Il criterio precedente riguarda solo le autorizzazioni necessarie per configurare i servizi di notifica file, ovvero la notifica bucket S3, SNS e SQS e presuppone che l'accesso in lettura al bucket S3 sia già disponibile. Se è necessario aggiungere autorizzazioni di sola lettura S3, aggiungere quanto segue alla Action
list nell'istruzione DatabricksAutoLoaderSetup
nel documento JSON:
s3:ListBucket
s3:GetObject
Autorizzazioni ridotte dopo l'installazione iniziale
Le autorizzazioni di installazione delle risorse descritte in precedenza sono necessarie solo durante l'esecuzione iniziale del flusso. Dopo la prima esecuzione, è possibile passare ai criteri IAM seguenti con autorizzazioni ridotte.
Importante
Con le autorizzazioni ridotte, non è possibile avviare nuove query di streaming o ricreare risorse in caso di errori( ad esempio, la coda SQS è stata eliminata accidentalmente); non è anche possibile usare l'API di gestione delle risorse cloud per list o eliminare le risorse.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Autorizzazioni necessarie per la configurazione della notifica file per GCS
È necessario disporre list
di autorizzazioni e get
per il bucket GCS e per tutti gli oggetti. Per informazioni dettagliate, vedere la documentazione di Google sulle autorizzazioni IAM.
Per usare la modalità di notifica file, è necessario aggiungere autorizzazioni per l'account del servizio GCS e l'account usato per accedere alle risorse Google Cloud Pub/Sub.
Aggiungere il Pub/Sub Publisher
ruolo all'account del servizio GCS. Ciò consente all'account di pubblicare messaggi di notifica degli eventi dai bucket GCS a Google Cloud Pub/Sub.
Per quanto riguarda l'account del servizio usato per le risorse Google Cloud Pub/Sub, è necessario aggiungere le autorizzazioni seguenti:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
A tale scopo, è possibile creare un ruolo personalizzato IAM con queste autorizzazioni o assegnare ruoli GCP preesistenti per coprire queste autorizzazioni.
Ricerca dell'account del servizio GCS
In Google Cloud Console per il progetto corrispondente passare a Cloud Storage > Settings
.
La sezione "Account del servizio di archiviazione cloud" contiene l'indirizzo di posta elettronica dell'account del servizio GCS.
Creazione di un ruolo IAM di Google Cloud personalizzato per la modalità di notifica file
Nella console di Google Cloud per il progetto corrispondente passare a IAM & Admin > Roles
. Creare quindi un ruolo nella parte superiore o update un ruolo esistente. Nella schermata relativa alla creazione o alla modifica del ruolo fare clic su Add Permissions
. Viene visualizzato un menu in cui è possibile aggiungere le autorizzazioni desiderate al ruolo.
Configurare o gestire manualmente le risorse di notifica file
Gli utenti con privilegi possono configurare o gestire manualmente le risorse di notifica dei file.
- Set i servizi di notifica file manualmente tramite il provider di servizi cloud e specificare manualmente la coda identifier. Per altri dettagli, vedere Opzioni di notifica file.
- Usare le API Scala per creare o gestire le notifiche e i servizi di accodamento, come illustrato nell'esempio seguente:
Nota
È necessario disporre delle autorizzazioni appropriate per configurare o modificare l'infrastruttura cloud. Vedere la documentazione relativa alle autorizzazioni per Azure, S3 o GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Usare setUpNotificationServices(<resource-suffix>)
per creare una coda e una sottoscrizione con il nome <prefix>-<resource-suffix>
(il prefisso dipende dal sistema di archiviazione riepilogato nelle risorse cloud usate nella modalità di notifica del file del caricatore automatico. Se è presente una risorsa esistente con lo stesso nome, Azure Databricks riutilizza la risorsa esistente anziché crearne una nuova. Questa funzione restituisce una coda identifier che è possibile passare all'origine cloudFiles
usando il identifier in Opzioni di notifica file. Ciò consente all'utente cloudFiles
di origine di avere meno autorizzazioni rispetto all'utente che crea le risorse.
Specificare l'opzione "path"
solo newManager
se si chiama setUpNotificationServices
; non è necessario per listNotificationServices
o tearDownNotificationServices
. Questa operazione è la stessa path
usata durante l'esecuzione di una query di streaming.
La matrice seguente indica i metodi API supportati in cui Databricks Runtime per ogni tipo di archiviazione:
Archiviazione cloud | API di installazione | API List | API di disinstallazione |
---|---|---|---|
Amazon S3 | Tutte le versioni | Tutte le versioni | Tutte le versioni |
ADLS Gen2 | Tutte le versioni | Tutte le versioni | Tutte le versioni |
GCS | Databricks Runtime 9.1 e versioni successive | Databricks Runtime 9.1 e versioni successive | Databricks Runtime 9.1 e versioni successive |
Archiviazione BLOB di Azure | Tutte le versioni | Tutte le versioni | Tutte le versioni |
ADLS Gen1 | Non supportato | Non supportato | Non supportato |
Risolvere gli errori comuni
Questa sezione descrive gli errori comuni quando si usa il caricatore automatico con la modalità di notifica file e come risolverli.
Impossibile creare una sottoscrizione di Griglia di eventi
Se viene visualizzato il messaggio di errore seguente quando si esegue il caricatore automatico per la prima volta, Griglia di eventi non viene registrata come provider di risorse nella sottoscrizione di Azure.
java.lang.RuntimeException: Failed to create event grid subscription.
Per registrare Griglia di eventi come provider di risorse, eseguire le operazioni seguenti:
- Nel portale di Azure, vai alla tua sottoscrizione.
- Fare clic su Risorsa Providers nella sezione Impostazioni.
- Registrazione del provider
Microsoft.EventGrid
.
Autorizzazione necessaria per eseguire operazioni di sottoscrizione di Griglia di eventi
Se visualizzi il seguente messaggio di errore quando esegui Auto Loader per la prima volta, verifica che il ruolo di Collaboratore sia assegnato all'entità servizio per Event Grid e all'account di archiviazione.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Il client di Griglia di eventi ignora il proxy
In Databricks Runtime 15.2 e versioni successive, Event Grid connections in Auto Loader utilizza le impostazioni proxy dalle proprietà di sistema per impostazione predefinita. In Databricks Runtime 13.3 LTS, 14.3 LTS e da 15.0 a 15.2, è possibile configurare manualmente Griglia di Eventi connections per usare un proxy impostando la proprietà Spark Config spark.databricks.cloudFiles.eventGridClient.useSystemProperties true
. Vedere Set proprietà di configurazione di Spark in Azure Databricks.