Wat is de meldingsmodus voor het automatisch laden van bestanden?
In de bestandsmeldingsmodus stelt Automatisch laden automatisch een meldingsservice en wachtrijservice in die zich abonneert op bestandsevenementen uit de invoermap. U kunt bestandsmeldingen gebruiken om automatisch laden te schalen om miljoenen bestanden per uur op te nemen. In vergelijking met de modus mapvermelding is de modus voor bestandsmeldingen beter presterend en schaalbaar voor grote invoermappen of een groot aantal bestanden, maar hiervoor zijn extra cloudmachtigingen vereist.
U kunt op elk gewenst moment schakelen tussen bestandsmeldingen en adreslijstvermeldingen en nog steeds exactly-once gegevensverwerkingsgaranties onderhouden.
Notitie
De bestandsmeldingsmodus wordt niet ondersteund voor Azure Premium-opslagaccounts omdat Premium-accounts geen ondersteuning bieden voor wachtrijopslag.
Waarschuwing
Het wijzigen van het bronpad voor automatisch laden wordt niet ondersteund voor de modus voor bestandsmeldingen. Als de modus voor bestandsmeldingen wordt gebruikt en het pad wordt gewijzigd, kunt u mogelijk geen bestanden opnemen die al aanwezig zijn in de nieuwe map op het moment van de map update.
De modus Voor bestandsmeldingen wordt alleen ondersteund op rekenkracht van één gebruiker.
Cloudresources die worden gebruikt in de meldingsmodus voor het automatisch laden van bestanden
Belangrijk
U hebt verhoogde machtigingen nodig om de cloudinfrastructuur automatisch te configureren voor de modus voor bestandsmeldingen. Neem contact op met uw cloudbeheerder of werkruimtebeheerder. Zien:
Automatisch laden kan voor u automatisch bestandsmeldingen set wanneer u de optie cloudFiles.useNotifications
om te true
set en de benodigde machtigingen verleent voor het creëren van cloudresources. Bovendien moet u mogelijk aanvullende opties opgeven om autoladerautorisatie te grant om deze resources te maken.
In de volgende table wordt samengevat welke resources worden gemaakt door Auto Loader.
Cloudopslag | Abonnementsservice | Wachtrijservice | Voorvoegsel* | Limit ** |
---|---|---|---|---|
Amazon S3 | AWS SNS | AWS SQS | databricks-auto-opname | 100 per S3-bucket |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 per opslagaccount |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-opname | 100 per GCS-bucket |
Azure Blob-opslag | Azure Event Grid | Azure Queue Storage | databricks | 500 per opslagaccount |
- Automatisch laadprogramma noemt de resources met dit voorvoegsel.
** Hoeveel gelijktijdige bestandsmeldingspijplijnen kunnen worden gestart
Als u meer dan het beperkte aantal bestandsmeldingspijplijnen voor een bepaald opslagaccount wilt uitvoeren, kunt u het volgende doen:
- Gebruik een service zoals AWS Lambda, Azure Functions of Google Cloud Functions om meldingen van een enkele wachtrij, die naar een hele container of bucket luistert, te verspreiden naar mapspecifieke wachtrijen.
Gebeurtenissen voor bestandsmeldingen
Amazon S3 biedt een ObjectCreated
gebeurtenis wanneer een bestand wordt geüpload naar een S3-bucket, ongeacht of het is geüpload door een put- of multi-part upload.
ADLS Gen2 biedt verschillende gebeurtenismeldingen voor bestanden die worden weergegeven in uw Gen2-container.
- Auto Loader luistert naar de gebeurtenis voor het
FlushWithClose
verwerken van een bestand. - Automatische laadprogramma's ondersteunen de actie voor het
RenameFile
detecteren van bestanden. Acties metRenameFile
vereisen een API-aanvraag naar het opslagsysteem om de grootte van het hernoemde bestand te get. - Automatisch laden streams gemaakt met Databricks Runtime 9.0 en na ondersteuning van de actie voor het
RenameDirectory
detecteren van bestanden.RenameDirectory
acties vereisen API-verzoeken voor het opslagsysteem om de inhoud van de hernoemde map te list.
Google Cloud Storage biedt een OBJECT_FINALIZE
gebeurtenis wanneer een bestand wordt geüpload, waaronder overschrijven en bestandskopieën. Mislukte uploads veroorzaken generate deze gebeurtenis niet.
Notitie
Cloud providers biedt geen garantie voor 100% levering van alle bestandsgebeurtenissen onder zeer zeldzame omstandigheden en biedt helemaal geen strikte SLA's voor de latentie van de bestandsgebeurtenissen. Databricks raadt u aan om reguliere backfills te activeren met autolader met behulp van de cloudFiles.backfillInterval
optie om te garanderen dat alle bestanden in een bepaalde SLA worden gedetecteerd als volledigheid van gegevens een vereiste is. Het activeren van reguliere backfills veroorzaakt geen duplicaten.
Vereiste machtigingen voor het configureren van bestandsmeldingen voor ADLS Gen2 en Azure Blob Storage
U moet leesmachtigingen hebben voor de invoermap. Zie Azure Blob Storage.
Als u de modus voor bestandsmeldingen wilt gebruiken, moet u verificatie opgeven credentials voor het instellen en openen van de gebeurtenismeldingsservices.
U hebt alleen een service-principal nodig voor verificatie.
Service-principal: ingebouwde Azure-rollen gebruiken
Maak een Microsoft Entra ID-app (voorheen Azure Active Directory) en service-principal in de vorm van client-id en clientgeheim.
Wijs deze app de volgende rollen toe aan het opslagaccount waarin het invoerpad zich bevindt:
- Inzender: Deze rol is bedoeld voor het instellen van resources in uw opslagaccount, zoals wachtrijen en gebeurtenisabonnementen.
- Inzender voor opslagwachtrijgegevens: deze rol is bedoeld voor het uitvoeren van wachtrijbewerkingen, zoals het ophalen en verwijderen van berichten uit de wachtrijen. Deze rol is alleen vereist wanneer u een service-principal zonder verbindingsreeks opgeeft.
Wijs deze app de volgende rol toe aan de gerelateerde resourcegroep:
- EventGrid EventSubscription-inzender: deze rol is voor het uitvoeren van Azure Event Grid-abonnementsbewerkingen (Event Grid), zoals het maken of vermelden van gebeurtenisabonnementen.
Zie voor meer informatie Azure-rollen toewijzen met behulp van de Azure-portal.
Service-principal: een aangepaste rol gebruiken
Als u zich bezig houdt met de overmatige machtigingen die zijn vereist voor de voorgaande rollen, kunt u een aangepaste rol maken met ten minste de volgende machtigingen, die hieronder worden vermeld in de JSON-indeling van De Azure-rol:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Vervolgens kunt u deze aangepaste rol toewijzen aan uw app.
Zie voor meer informatie Azure-rollen toewijzen met behulp van de Azure-portal.
instellingen voor
Vereiste machtigingen voor het configureren van bestandsmeldingen voor Amazon S3
U moet leesmachtigingen hebben voor de invoermap. Zie S3-verbindingsgegevens voor meer informatie.
Als u de modus voor bestandsmeldingen wilt gebruiken, voegt u het volgende JSON-beleidsdocument toe aan uw IAM-gebruiker of -rol.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
where:
-
<bucket-name>
: de naam van de S3-bucket where uw stream bestanden leest, bijvoorbeeldauto-logs
. U kunt bijvoorbeeld*
als jokerteken gebruikendatabricks-*-logs
. Als u de onderliggende S3-bucket voor uw DBFS-pad wilt achterhalen, kunt u alle DBFS-koppelpunten in een notebook list door%fs mounts
uit te voeren. -
<region>
: de AWS-regio waar de S3-bucket (where) zich bevindt, bijvoorbeeldus-west-2
. Als u de regio niet wilt opgeven, gebruikt*
u . -
<account-number>
: het AWS-accountnummer dat eigenaar is van de S3-bucket, bijvoorbeeld123456789012
. Als u het accountnummer niet wilt opgeven, gebruikt u*
.
De tekenreeks databricks-auto-ingest-*
in de SPECIFICATIE SQS en SNS ARN is het naamvoorvoegsel dat de cloudFiles
bron gebruikt bij het maken van SQS- en SNS-services. Omdat Azure Databricks de meldingsservices instelt in de eerste uitvoering van de stream, kunt u een beleid met beperkte machtigingen gebruiken na de eerste uitvoering (bijvoorbeeld de stream stoppen en vervolgens opnieuw opstarten).
Notitie
Het voorgaande beleid heeft alleen betrekking op de machtigingen die nodig zijn voor het instellen van bestandsmeldingsservices, namelijk S3 bucketmeldingen, SNS- en SQS-services en gaat ervan uit dat u al leestoegang hebt tot de S3-bucket. Als u S3 alleen-lezenmachtigingen wilt toevoegen, voegt u het volgende toe aan de Action
list in de DatabricksAutoLoaderSetup
-instructie in het JSON-document:
s3:ListBucket
s3:GetObject
Beperkte machtigingen na de eerste installatie
De hierboven beschreven machtigingen voor het instellen van resources zijn alleen vereist tijdens de eerste uitvoering van de stream. Na de eerste uitvoering kunt u overschakelen naar het volgende IAM-beleid met beperkte machtigingen.
Belangrijk
Met de beperkte machtigingen kunt u geen nieuwe streamingquery's starten of resources opnieuw maken in geval van fouten (de SQS-wachtrij is bijvoorbeeld per ongeluk verwijderd); U kunt de API voor cloudresourcebeheer ook niet gebruiken om resources te list of te verwijderen.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Vereiste machtigingen voor het configureren van bestandsmeldingen voor GCS
U moet beschikken over en list
machtigingen hebben get
voor uw GCS-bucket en voor alle objecten. Zie de Google-documentatie over IAM-machtigingen voor meer informatie.
Als u de modus voor bestandsmeldingen wilt gebruiken, moet u machtigingen toevoegen voor het GCS-serviceaccount en het account dat wordt gebruikt voor toegang tot de Google Cloud Pub/Sub-resources.
Voeg de Pub/Sub Publisher
rol toe aan het GCS-serviceaccount. Hierdoor kan het account meldingen van gebeurtenissen van uw GCS-buckets publiceren naar Google Cloud Pub/Sub.
Wat betreft het serviceaccount dat wordt gebruikt voor de Google Cloud Pub/Sub-resources, moet u de volgende machtigingen toevoegen:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Hiervoor kunt u een aangepaste IAM-rol maken met deze machtigingen of vooraf bestaande GCP-rollen toewijzen om deze machtigingen te dekken.
Het GCS-serviceaccount zoeken
Navigeer in de Google Cloud Console voor het bijbehorende project naar Cloud Storage > Settings
.
De sectie Cloud Storage-serviceaccount bevat het e-mailadres van het GCS-serviceaccount.
Een aangepaste Google Cloud IAM-rol maken voor de modus Voor bestandsmeldingen
Ga in de Google Cloud-console voor het bijbehorende project naar IAM & Admin > Roles
. Maak vervolgens een rol bovenaan of update een bestaande rol. Klik op Add Permissions
het scherm voor het maken of bewerken van rollen. Er wordt een menu weergegeven waarin u de gewenste machtigingen aan de rol kunt toevoegen.
Bestandsmeldingsbronnen handmatig configureren of beheren
Bevoegde gebruikers kunnen bronnen voor bestandsmeldingen handmatig configureren of beheren.
- Set stel de bestandsmeldingsdiensten handmatig in via de cloudprovider en specificeer handmatig de wachtrij identifier. Zie opties voor bestandsmeldingen voor meer informatie.
- Gebruik Scala-API's om de meldingen en wachtrijservices te maken of te beheren, zoals wordt weergegeven in het volgende voorbeeld:
Notitie
U moet over de juiste machtigingen beschikken om de cloudinfrastructuur te configureren of te wijzigen. Raadpleeg de documentatie over machtigingen voor Azure, S3 of GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Gebruik setUpNotificationServices(<resource-suffix>)
dit diagram om een wachtrij en een abonnement met de naam <prefix>-<resource-suffix>
te maken (het voorvoegsel is afhankelijk van het opslagsysteem dat wordt samengevat in cloudresources die worden gebruikt in de meldingsmodus voor het automatisch laden van bestanden. Als er een bestaande resource met dezelfde naam is, gebruikt Azure Databricks de bestaande resource opnieuw in plaats van een nieuwe resource te maken. Met deze functie wordt een wachtrij identifier geretourneerd die u met identifier kunt doorgeven aan de cloudFiles
bron binnen de bestandsmeldingopties . Hierdoor kan de cloudFiles
brongebruiker minder machtigingen hebben dan de gebruiker die de resources maakt.
Geef de "path"
optie alleen op newManager
als u belt setUpNotificationServices
; dit is niet nodig voor listNotificationServices
of tearDownNotificationServices
. Dit is hetzelfde path
dat u gebruikt bij het uitvoeren van een streamingquery.
De volgende matrix geeft aan welke API-methoden worden ondersteund waarin Databricks Runtime voor elk type opslag wordt ondersteund:
Cloudopslag | API instellen | List API | Api voor uitsplitsen |
---|---|---|---|
Amazon S3 | Alle versies | Alle versies | Alle versies |
ADLS Gen2 | Alle versies | Alle versies | Alle versies |
GCS | Databricks Runtime 9.1 en hoger | Databricks Runtime 9.1 en hoger | Databricks Runtime 9.1 en hoger |
Azure Blob-opslag | Alle versies | Alle versies | Alle versies |
ADLS Gen1 | Niet ondersteund | Niet ondersteund | Niet ondersteund |
Veelvoorkomende fouten oplossen
In deze sectie worden veelvoorkomende fouten beschreven bij het gebruik van automatisch laden met de modus voor bestandsmeldingen en hoe u deze kunt oplossen.
Kan geen Event Grid-abonnement maken
Als u het volgende foutbericht ziet wanneer u AutoLoader voor het eerst uitvoert, wordt Event Grid niet geregistreerd als resourceprovider in het Azure-abonnement.
java.lang.RuntimeException: Failed to create event grid subscription.
Ga als volgt te werk om Event Grid als resourceprovider te registreren:
- Ga in Azure Portal naar uw abonnement.
- Klik op Resource Providers onder de sectie Instellingen.
- Registreer de provider
Microsoft.EventGrid
.
Autorisatie vereist voor het uitvoeren van Event Grid-abonnementsbewerkingen
Als u het volgende foutbericht ziet wanneer u AutoLoader voor het eerst uitvoert, controleert u of de rol Inzender is toegewezen aan de service-principal voor Event Grid en het opslagaccount.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Event Grid-client omzeilt proxy
In Databricks Runtime 15.2 en hoger gebruikt Event Grid connections in Auto Loader standaard proxy-instellingen uit systeemeigenschappen. In Databricks Runtime 13.3 LTS, 14.3 LTS en 15.0 tot en met 15.2 kunt u Event Grid-connections handmatig configureren om een proxy te gebruiken door de eigenschap van de Spark Configuratie-eigenschapspark.databricks.cloudFiles.eventGridClient.useSystemProperties true
in te stellen. Zie Set Spark-configuratie-eigenschappen in Azure Databricks.