Che cos'è la modalità di notifica file del caricatore automatico?
In modalità di notifica file, il caricatore automatico configura automaticamente un servizio di notifica e un servizio di accodamento che sottoscrive gli eventi di file dalla directory di input. È possibile usare le notifiche dei file per ridimensionare il caricatore automatico per inserire milioni di file un'ora. Rispetto alla modalità elenco directory, la modalità di notifica dei file è più efficiente e scalabile per directory di input di grandi dimensioni o un volume elevato di file, ma richiede autorizzazioni cloud aggiuntive.
È possibile passare tra le notifiche dei file e l'elenco di directory in qualsiasi momento e mantenere comunque garanzie di elaborazione dei dati esattamente una volta.
Nota
La modalità di notifica file non è supportata per gli account di archiviazione Premium di Azure perché gli account Premium non supportano l'archiviazione code.
Avviso
La modifica del percorso di origine per il caricatore automatico non è supportata per la modalità di notifica file. Se viene usata la modalità di notifica file e il percorso viene modificato, potrebbe non essere possibile inserire file già presenti nella nuova directory al momento dell'aggiornamento della directory.
La modalità di notifica file è supportata solo nelle risorse di calcolo a utente singolo.
Risorse cloud usate nella modalità di notifica file del caricatore automatico
Importante
Sono necessarie autorizzazioni elevate per configurare automaticamente l'infrastruttura cloud per la modalità di notifica file. Contattare l'amministratore del cloud o l'amministratore dell'area di lavoro. Vedere:
Il caricatore automatico può configurare automaticamente le notifiche dei file quando si imposta l'opzione cloudFiles.useNotifications
su true
e si forniscono le autorizzazioni necessarie per creare risorse cloud. Potrebbe anche essere necessario fornire opzioni aggiuntive per concedere l'autorizzazione del caricatore automatico per creare queste risorse.
La tabella seguente riepiloga le risorse create dal caricatore automatico.
Archiviazione cloud | Servizio di sottoscrizione | Servizio di accodamento | Prefisso* | Limite** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | inserimento automatico di databricks | 100 per bucket S3 |
ADLS Gen2 | Griglia di eventi di Azure | Archiviazione code di Azure | databricks | 500 per account di archiviazione |
GCS | Google Pub/Sub | Google Pub/Sub | inserimento automatico di databricks | 100 per bucket GCS |
Archiviazione BLOB di Azure | Azure Event Grid | Archiviazione code di Azure | databricks | 500 per account di archiviazione |
- Il caricatore automatico assegna un nome alle risorse con questo prefisso.
** Quante pipeline di notifica file simultanee possono essere avviate
Se è necessario eseguire più di un numero limitato di pipeline di notifica file per un determinato account di archiviazione, è possibile:
- Sfruttare un servizio come AWS Lambda, Funzioni di Azure o Google Cloud Functions per visualizzare le notifiche da una singola coda in ascolto di un intero contenitore o bucket in code specifiche della directory.
Eventi di notifica dei file
AWS S3 fornisce un ObjectCreated
evento quando un file viene caricato in un bucket S3 indipendentemente dal fatto che sia stato caricato da un caricamento in put o in più parti.
ADLS Gen2 fornisce notifiche di eventi diverse per i file visualizzati nel contenitore Gen2.
- Il caricatore automatico è in ascolto dell'evento per l'elaborazione
FlushWithClose
di un file. - I flussi del caricatore automatico supportano l'azione per l'individuazione
RenameFile
dei file.RenameFile
le azioni richiedono una richiesta API al sistema di archiviazione per ottenere le dimensioni del file rinominato. - Flussi del caricatore automatico creati con Databricks Runtime 9.0 e dopo aver supportato l'azione per l'individuazione
RenameDirectory
dei file.RenameDirectory
le azioni richiedono richieste API al sistema di archiviazione per elencare il contenuto della directory rinominata.
Google Cloud Storage fornisce un OBJECT_FINALIZE
evento quando viene caricato un file, che include sovrascrizioni e copie di file. I caricamenti non riusciti non generano questo evento.
Nota
I provider di servizi cloud non garantiscono il 100% di recapito di tutti gli eventi di file in condizioni molto rare e non forniscono contratti di servizio rigorosi sulla latenza degli eventi di file. Databricks consiglia di attivare normali backfill con il caricatore automatico usando l'opzione cloudFiles.backfillInterval
per garantire che tutti i file vengano individuati all'interno di un determinato contratto di servizio se il completamento dei dati è un requisito. L'attivazione di backfill regolari non causa duplicati.
Autorizzazioni necessarie per la configurazione della notifica file per ADLS Gen2 e Archiviazione BLOB di Azure
È necessario disporre delle autorizzazioni di lettura per la directory di input. Vedere Archiviazione BLOB di Azure.
Per usare la modalità di notifica dei file, è necessario fornire le credenziali di autenticazione per la configurazione e l'accesso ai servizi di notifica degli eventi. Per l'autenticazione è necessaria solo un'entità servizio.
Entità servizio: uso dei ruoli predefiniti di Azure
Creare un'app e un'entità servizio Microsoft Entra ID (in precedenza Azure Active Directory) sotto forma di ID client e segreto client.
Assegnare questa app ai ruoli seguenti all'account di archiviazione in cui risiede il percorso di input:
- Collaboratore: questo ruolo è per la configurazione delle risorse nell'account di archiviazione, ad esempio code e sottoscrizioni di eventi.
- Collaboratore ai dati della coda di archiviazione: questo ruolo è per l'esecuzione di operazioni di accodamento, ad esempio il recupero e l'eliminazione di messaggi dalle code. Questo ruolo è obbligatorio solo quando si fornisce un'entità servizio senza un stringa di connessione.
Assegnare questa app al gruppo di risorse correlato:
- Collaboratore EventGrid EventSubscription: questo ruolo è per l'esecuzione di operazioni di sottoscrizione di Griglia di eventi, ad esempio la creazione o l'elenco di sottoscrizioni di eventi.
Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.
Entità servizio : uso di un ruolo personalizzato
Se si riguardano le autorizzazioni eccessive necessarie per i ruoli precedenti, è possibile creare un ruolo personalizzato con almeno le autorizzazioni seguenti, elencate di seguito in formato JSON del ruolo di Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
È quindi possibile assegnare questo ruolo personalizzato all'app.
Per ulteriori informazioni, vedi Assegnare ruoli di Azure usando il portale di Azure.
Risoluzione dei problemi comuni
Errore:
java.lang.RuntimeException: Failed to create event grid subscription.
Se viene visualizzato questo messaggio di errore quando si esegue l'Autoloader per la prima volta, Griglia di eventi non viene registrata come provider di risorse nella sottoscrizione di Azure. Per registrare ciò nel portale di Azure:
- Vai alla tua sottoscrizione.
- Fare clic su Provider di risorse nella sezione Impostazioni.
- Registrazione del provider
Microsoft.EventGrid
.
Errore:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Se viene visualizzato questo messaggio di errore quando si esegue l'Autoloader per la prima volta, assicurarsi di avere assegnato il ruolo Collaboratore all'entità servizio per Griglia di eventi e all'account di archiviazione.
Autorizzazioni necessarie per la configurazione della notifica file per AWS S3
È necessario disporre delle autorizzazioni di lettura per la directory di input. Per altri dettagli, vedere Dettagli sulla connessione S3.
Per usare la modalità di notifica dei file, allegare il documento dei criteri JSON seguente all'utente o al ruolo IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
dove:
<bucket-name>
: il nome del bucket S3 in cui il flusso leggerà i file,auto-logs
ad esempio . È possibile usare*
come carattere jolly, ad esempiodatabricks-*-logs
. Per individuare il bucket S3 sottostante per il percorso DBFS, è possibile elencare tutti i punti di montaggio DBFS in un notebook eseguendo%fs mounts
.<region>
: l'area AWS in cui risiede il bucket S3, ad esempious-west-2
. Se non si vuole specificare l'area, usare*
.<account-number>
: numero di account AWS proprietario del bucket S3,123456789012
ad esempio . Se non si vuole specificare il numero di account, usare*
.
La stringa databricks-auto-ingest-*
nella specifica SQS e ARN SNS è il prefisso del nome usato dall'origine durante la cloudFiles
creazione di servizi SQS e SNS. Poiché Azure Databricks configura i servizi di notifica nell'esecuzione iniziale del flusso, è possibile usare un criterio con autorizzazioni ridotte dopo l'esecuzione iniziale, ad esempio arrestare il flusso e quindi riavviarlo.
Nota
Il criterio precedente riguarda solo le autorizzazioni necessarie per configurare i servizi di notifica file, ovvero la notifica bucket S3, SNS e SQS e presuppone che l'accesso in lettura al bucket S3 sia già disponibile. Se è necessario aggiungere autorizzazioni di sola lettura S3, aggiungere quanto segue all'elenco nell'istruzione Action
DatabricksAutoLoaderSetup
nel documento JSON:
s3:ListBucket
s3:GetObject
Autorizzazioni ridotte dopo l'installazione iniziale
Le autorizzazioni di installazione delle risorse descritte in precedenza sono necessarie solo durante l'esecuzione iniziale del flusso. Dopo la prima esecuzione, è possibile passare ai criteri IAM seguenti con autorizzazioni ridotte.
Importante
Con le autorizzazioni ridotte, non è possibile avviare nuove query di streaming o ricreare risorse in caso di errori( ad esempio, la coda SQS è stata eliminata accidentalmente); non è anche possibile usare l'API di gestione delle risorse cloud per elencare o eliminare le risorse.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Autorizzazioni necessarie per la configurazione della notifica file per GCS
È necessario disporre list
di autorizzazioni e get
per il bucket GCS e per tutti gli oggetti. Per informazioni dettagliate, vedere la documentazione di Google sulle autorizzazioni IAM.
Per usare la modalità di notifica file, è necessario aggiungere autorizzazioni per l'account del servizio GCS e l'account usato per accedere alle risorse Google Cloud Pub/Sub.
Aggiungere il Pub/Sub Publisher
ruolo all'account del servizio GCS. Ciò consente all'account di pubblicare messaggi di notifica degli eventi dai bucket GCS a Google Cloud Pub/Sub.
Per quanto riguarda l'account del servizio usato per le risorse Google Cloud Pub/Sub, è necessario aggiungere le autorizzazioni seguenti:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
A tale scopo, è possibile creare un ruolo personalizzato IAM con queste autorizzazioni o assegnare ruoli GCP preesistenti per coprire queste autorizzazioni.
Ricerca dell'account del servizio GCS
In Google Cloud Console per il progetto corrispondente passare a Cloud Storage > Settings
.
La sezione "Account del servizio di archiviazione cloud" contiene l'indirizzo di posta elettronica dell'account del servizio GCS.
Creazione di un ruolo IAM di Google Cloud personalizzato per la modalità di notifica file
Nella console di Google Cloud per il progetto corrispondente passare a IAM & Admin > Roles
. Creare quindi un ruolo nella parte superiore o aggiornare un ruolo esistente. Nella schermata relativa alla creazione o alla modifica del ruolo fare clic su Add Permissions
. Viene visualizzato un menu in cui è possibile aggiungere le autorizzazioni desiderate al ruolo.
Configurare o gestire manualmente le risorse di notifica file
Gli utenti con privilegi possono configurare o gestire manualmente le risorse di notifica dei file.
- Configurare manualmente i servizi di notifica file tramite il provider di servizi cloud e specificare manualmente l'identificatore della coda. Per altri dettagli, vedere Opzioni di notifica file.
- Usare le API Scala per creare o gestire le notifiche e i servizi di accodamento, come illustrato nell'esempio seguente:
Nota
È necessario disporre delle autorizzazioni appropriate per configurare o modificare l'infrastruttura cloud. Vedere la documentazione relativa alle autorizzazioni per Azure, S3 o GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Usare setUpNotificationServices(<resource-suffix>)
per creare una coda e una sottoscrizione con il nome <prefix>-<resource-suffix>
(il prefisso dipende dal sistema di archiviazione riepilogato nelle risorse cloud usate nella modalità di notifica del file del caricatore automatico. Se è presente una risorsa esistente con lo stesso nome, Azure Databricks riutilizza la risorsa esistente anziché crearne una nuova. Questa funzione restituisce un identificatore della coda che è possibile passare all'origine cloudFiles
usando l'identificatore nelle opzioni di notifica file. Ciò consente all'utente cloudFiles
di origine di avere meno autorizzazioni rispetto all'utente che crea le risorse.
Specificare l'opzione "path"
solo newManager
se si chiama setUpNotificationServices
; non è necessario per listNotificationServices
o tearDownNotificationServices
. Questa operazione è la stessa path
usata durante l'esecuzione di una query di streaming.
La matrice seguente indica i metodi API supportati in cui Databricks Runtime per ogni tipo di archiviazione:
Archiviazione cloud | API di installazione | API Elenco | API di disinstallazione |
---|---|---|---|
AWS S3 | Tutte le versioni | Tutte le versioni | Tutte le versioni |
ADLS Gen2 | Tutte le versioni | Tutte le versioni | Tutte le versioni |
GCS | Databricks Runtime 9.1 e versioni successive | Databricks Runtime 9.1 e versioni successive | Databricks Runtime 9.1 e versioni successive |
Archiviazione BLOB di Azure | Tutte le versioni | Tutte le versioni | Tutte le versioni |
ADLS Gen1 | Non supportato | Non supportato | Non supportato |