Co to jest tryb powiadamiania o pliku automatycznego ładowania?
W trybie powiadamiania plików automatyczne moduł ładujący automatycznie konfiguruje usługę powiadomień i usługę kolejki, która subskrybuje zdarzenia plików z katalogu wejściowego. Powiadomienia dotyczące plików umożliwiają skalowanie automatycznego modułu ładującego w celu pozyskiwania milionów plików na godzinę. W porównaniu z trybem listy katalogów tryb powiadomień plików jest bardziej wydajny i skalowalny dla dużych katalogów wejściowych lub dużej liczby plików, ale wymaga dodatkowych uprawnień do chmury.
Możesz przełączać się między powiadomieniami o plikach i listami katalogów w dowolnym momencie i nadal utrzymywać dokładnie jednokrotne gwarancje przetwarzania danych.
Uwaga
Tryb powiadamiania o plikach nie jest obsługiwany dla kont usługi Azure Premium Storage, ponieważ konta w warstwie Premium nie obsługują magazynu kolejek.
Ostrzeżenie
Zmiana ścieżki źródłowej dla automatycznego modułu ładującego nie jest obsługiwana w trybie powiadomień plików. Jeśli używany jest tryb powiadamiania o plikach i zmieniona zostanie ścieżka, może dojść do błędu w przypadku pobierania plików, które już znajdują się w nowym katalogu w chwili jego aktualizacji.
Tryb powiadomień plików jest obsługiwany tylko na komputerze jednego użytkownika.
Zasoby w chmurze używane w trybie powiadamiania o plikach automatycznego ładowania
Ważne
Aby automatycznie skonfigurować infrastrukturę chmury na potrzeby trybu powiadomień o plikach, potrzebne są podwyższone uprawnienia. Skontaktuj się z administratorem chmury lub administratorem obszaru roboczego. Widzieć:
Automatyczne ładowanie może automatycznie konfigurować powiadomienia o plikach po ustawieniu opcji cloudFiles.useNotifications
na true
i podać niezbędne uprawnienia do tworzenia zasobów w chmurze. Ponadto może być konieczne podanie dodatkowych opcji, aby umożliwić Auto Loaderowi autoryzację do utworzenia tych zasobów.
Poniższa tabela zawiera podsumowanie zasobów tworzonych przez moduł automatycznego ładowania.
Magazyn w chmurze | Usługa subskrypcji | Queue Service | Przedrostek* | Ograniczać** |
---|---|---|---|---|
Amazon S3 | AWS SNS | AWS SQS | databricks-auto-pozyskiwanie | 100 na zasobnik S3 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 na konto magazynu |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-pozyskiwanie | 100 na zasobnik GCS |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | 500 na konto magazynu |
- Automatyczne moduł ładujący nazywa zasoby tym prefiksem.
** Ile współbieżnych potoków powiadomień o plikach można uruchomić
Jeśli potrzebujesz więcej niż ograniczona liczba potoków powiadomień dotyczących plików dla danego konta magazynu, możesz:
- Skorzystaj z usługi, takiej jak AWS Lambda, Azure Functions lub Google Cloud Functions, aby rozsyłać powiadomienia z jednej kolejki, która nasłuchuje całego kontenera lub zasobnika do kolejek specyficznych dla katalogu.
Zdarzenia powiadomień o plikach
Usługa Amazon S3 udostępnia zdarzenie ObjectCreated
, gdy plik jest ładowany do zasobnika S3, niezależnie od tego, czy był przesłany za pomocą operacji 'put', czy za pomocą 'multi-part upload'.
Usługa ADLS Gen2 udostępnia różne powiadomienia o zdarzeniach dla plików wyświetlanych w kontenerze usługi Gen2.
- Moduł automatycznego ładowania nasłuchuje zdarzenia
FlushWithClose
do przetwarzania pliku. - Strumienie automatycznego modułu ładującego obsługują
RenameFile
akcję odnajdywania plików.RenameFile
akcje wymagają żądania interfejsu API do systemu pamięci masowej w celu uzyskania rozmiaru zmienionego pliku. - Strumienie automatycznego modułu ładującego utworzone za pomocą środowiska Databricks Runtime 9.0 i po obsłudze
RenameDirectory
akcji odnajdywania plików.RenameDirectory
akcje wymagają, aby żądania interfejsu API do systemu przechowywania wymieniły zawartość zmienionego katalogu.
Usługa Google Cloud Storage udostępnia OBJECT_FINALIZE
zdarzenie podczas przekazywania pliku, w tym zastępowania i kopiowania plików. Nieudane przesyłania nie generują tego zdarzenia.
Uwaga
Dostawcy usług w chmurze nie gwarantują 100% dostarczania wszystkich zdarzeń plików w bardzo rzadkich warunkach i nie zapewniają ścisłych umów SLA dotyczących opóźnienia zdarzeń plików. Usługa Databricks zaleca, aby wyzwalać regularne wypełnianie za pomocą modułu automatycznego ładującego przy użyciu cloudFiles.backfillInterval
opcji zagwarantowania, że wszystkie pliki zostaną odnalezione w ramach danej umowy SLA, jeśli wymagana jest kompletność danych. Wyzwalanie regularnych wypełniania nie powoduje duplikatów.
Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usług ADLS Gen2 i Azure Blob Storage
Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Zobacz Azure Blob Storage.
Aby użyć trybu powiadomień plików, należy podać poświadczenia uwierzytelniania na potrzeby konfigurowania usług powiadomień zdarzeń i uzyskiwania do nich dostępu.
Do uwierzytelniania potrzebna jest tylko jednostka usługi.
Jednostka usługi — korzystanie z wbudowanych ról platformy Azure
Utwórz aplikację i jednostkę usługi Microsoft Entra ID (dawniej Azure Active Directory) w postaci identyfikatora klienta i klucza tajnego klienta.
Przypisz tę aplikację następujące role do konta magazynu, w którym znajduje się ścieżka wejściowa:
- Współautor: Ta rola służy do konfigurowania zasobów na koncie magazynu, takich jak kolejki i subskrypcje zdarzeń.
- Współautor danych kolejki usługi Storage: ta rola służy do wykonywania operacji kolejki, takich jak pobieranie i usuwanie komunikatów z kolejek. Ta rola jest wymagana tylko w przypadku podania jednostki usługi bez parametry połączenia.
Przypisz tę aplikację następującą rolę do powiązanej grupy zasobów:
- EventGrid EventSubscription Contributor: ta rola służy do wykonywania operacji subskrypcji usługi Azure Event Grid (Event Grid), takich jak tworzenie lub wyświetlanie subskrypcji zdarzeń.
Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.
Jednostka usługi — używanie roli niestandardowej
Jeśli interesuje Cię nadmierne uprawnienia wymagane dla poprzednich ról, możesz utworzyć rolę niestandardową z co najmniej następującymi uprawnieniami wymienionymi poniżej w formacie JSON roli platformy Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Następnie możesz przypisać tę rolę niestandardową do aplikacji.
Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.
ustawienia uprawnień Auto Loader
Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usługi Amazon S3
Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Aby uzyskać więcej informacji, zobacz szczegóły połączenia S3.
Aby użyć trybu powiadomień plików, dołącz następujący dokument zasad JSON do użytkownika lub roli IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
gdzie:
-
<bucket-name>
: nazwa zasobnika S3, z którego strumień będzie odczytywał pliki, na przykładauto-logs
. Można użyć*
jako symbolu wieloznacznych, na przykładdatabricks-*-logs
. Aby dowiedzieć się podstawowego zasobnika S3 dla ścieżki systemu plików DBFS, możesz wyświetlić listę wszystkich punktów instalacji systemu plików DBFS w notesie, uruchamiając%fs mounts
. -
<region>
: region platformy AWS, w którym znajduje się zasobnik S3, na przykładus-west-2
. Jeśli nie chcesz określać regionu, użyj polecenia*
. -
<account-number>
: numer konta platformy AWS, który jest właścicielem zasobnika S3, na przykład123456789012
. Jeśli nie chcesz określać numeru konta, użyj polecenia*
.
Ciąg databricks-auto-ingest-*
w specyfikacji SQS i SNS ARN jest prefiksem nazwy używanym cloudFiles
przez źródło podczas tworzenia usług SQS i SNS. Ponieważ usługa Azure Databricks konfiguruje usługi powiadomień w początkowym uruchomieniu strumienia, możesz użyć zasad z ograniczonymi uprawnieniami po początkowym uruchomieniu (na przykład zatrzymać strumień, a następnie uruchomić go ponownie).
Uwaga
Powyższe zasady dotyczą tylko uprawnień wymaganych do konfigurowania usług powiadomień plików, a mianowicie powiadomień zasobników S3, SNS i SQS oraz zakłada, że masz już dostęp do odczytu do zasobnika S3. Jeśli musisz dodać uprawnienia tylko do odczytu S3, dodaj następujące elementy do listy Action
w instrukcji DatabricksAutoLoaderSetup
w dokumencie JSON:
s3:ListBucket
s3:GetObject
Ograniczone uprawnienia po początkowej konfiguracji
Uprawnienia konfiguracji zasobów opisane powyżej są wymagane tylko podczas początkowego uruchomienia strumienia. Po pierwszym uruchomieniu możesz przełączyć się na następujące zasady zarządzania dostępem i tożsamościami z ograniczonymi uprawnieniami.
Ważne
Przy ograniczonych uprawnieniach nie można uruchomić nowych zapytań przesyłania strumieniowego ani odtworzyć zasobów w przypadku awarii (na przykład kolejka SQS została przypadkowo usunięta); nie można również użyć interfejsu API zarządzania zasobami w chmurze do wyświetlania listy lub usuwania zasobów.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usługi GCS
Musisz mieć list
uprawnienia i get
w zasobniku GCS oraz na wszystkich obiektach. Aby uzyskać szczegółowe informacje, zobacz dokumentację firmy Google dotyczącą uprawnień do zarządzania dostępem i tożsamościami.
Aby użyć trybu powiadomień dotyczących plików, musisz dodać uprawnienia do konta usługi GCS i konta używanego do uzyskiwania dostępu do zasobów usługi Google Cloud Pub/Sub.
Pub/Sub Publisher
Dodaj rolę do konta usługi GCS. Dzięki temu konto może publikować komunikaty powiadomień o zdarzeniach z zasobników GCS do usługi Google Cloud Pub/Sub.
Jeśli chodzi o konto usługi używane dla zasobów Google Cloud Pub/Sub, należy dodać następujące uprawnienia:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
W tym celu możesz utworzyć rolę niestandardową IAM z tymi uprawnieniami lub przypisać istniejące role GCP, aby uwzględnić te uprawnienia.
Znajdowanie konta usługi GCS
W konsoli Google Cloud Console dla odpowiedniego projektu przejdź do Cloud Storage > Settings
strony .
Sekcja "Konto usługi magazynu w chmurze" zawiera adres e-mail konta usługi GCS.
Tworzenie niestandardowej roli IAM w chmurze Google dla trybu powiadomień plików
W konsoli Google Cloud dla odpowiedniego projektu przejdź do IAM & Admin > Roles
strony . Następnie utwórz rolę u góry lub zaktualizuj istniejącą rolę. Na ekranie tworzenia lub edytowania roli kliknij pozycję Add Permissions
. Zostanie wyświetlone menu, w którym można dodać odpowiednie uprawnienia do roli.
Ręczne konfigurowanie zasobów powiadomień o plikach lub zarządzanie nimi
Użytkownicy uprzywilejowani mogą ręcznie konfigurować zasoby powiadomień o plikach lub zarządzać nimi.
- Ręcznie skonfiguruj usługi powiadomień o plikach za pośrednictwem dostawcy usług w chmurze i ręcznie określ identyfikator kolejki. Aby uzyskać więcej informacji, zobacz Opcje powiadomień o plikach.
- Użyj interfejsów API języka Scala, aby utworzyć powiadomienia i usługi kolejkowania oraz zarządzać nimi, jak pokazano w poniższym przykładzie:
Uwaga
Musisz mieć odpowiednie uprawnienia do konfigurowania lub modyfikowania infrastruktury chmury. Zobacz dokumentację uprawnień dla platformy Azure, S3 lub GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Służy setUpNotificationServices(<resource-suffix>)
do tworzenia kolejki i subskrypcji o nazwie <prefix>-<resource-suffix>
(prefiks zależy od systemu magazynu podsumowanego w zasobach w chmurze używanych w trybie powiadomień pliku automatycznego ładowania. Jeśli istnieje zasób o tej samej nazwie, usługa Azure Databricks ponownie użyje istniejącego zasobu zamiast utworzyć nowy. Ta funkcja zwraca identyfikator kolejki, który można przekazać do źródła cloudFiles
, korzystając z identyfikatora w opcjach powiadomień pliku . Dzięki cloudFiles
temu użytkownik źródłowy może mieć mniej uprawnień niż użytkownik tworzący zasoby.
Podaj opcję "path"
tylko w przypadku wywołania newManager
metody ; nie jest ona wymagana dla setUpNotificationServices
elementu lub listNotificationServices
.tearDownNotificationServices
Jest to samo path
, które jest używane podczas uruchamiania zapytania przesyłania strumieniowego.
Poniższa macierz wskazuje, które metody interfejsu API są obsługiwane w środowisku Databricks Runtime dla każdego typu magazynu:
Magazyn w chmurze | Konfigurowanie interfejsu API | API listy | Zrywanie interfejsu API |
---|---|---|---|
Amazon S3 | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
ADLS Gen2 | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
GCS | Databricks Runtime 9.1 i nowsze | Databricks Runtime 9.1 i nowsze | Databricks Runtime 9.1 i nowsze |
Azure Blob Storage | Wszystkie wersje | Wszystkie wersje | Wszystkie wersje |
ADLS Gen1 | Nieobsługiwane | Nieobsługiwane | Nieobsługiwane |
Rozwiązywanie typowych błędów
W tej sekcji opisano typowe błędy występujące podczas korzystania z automatycznego modułu ładującego z trybem powiadamiania plików i sposobu ich rozwiązywania.
Nie można utworzyć subskrypcji usługi Event Grid
Jeśli podczas pierwszego uruchamiania automatycznego modułu ładującego zostanie wyświetlony następujący komunikat o błędzie, usługa Event Grid nie jest zarejestrowana jako dostawca zasobów w subskrypcji platformy Azure.
java.lang.RuntimeException: Failed to create event grid subscription.
Aby zarejestrować usługę Event Grid jako dostawcę zasobów, wykonaj następujące czynności:
- W witrynie Azure Portal przejdź do subskrypcji.
- Kliknij pozycję Dostawcy zasobów w sekcji Ustawienia.
- Zarejestruj dostawcę
Microsoft.EventGrid
.
Autoryzacja wymagana do wykonywania operacji subskrypcji usługi Event Grid
Jeśli podczas pierwszego uruchamiania Auto Loader'a wyświetli się następujący komunikat o błędzie, upewnij się, że rola Współautora jest przypisana do głównej usługi dla Event Grid i konta magazynu.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Klient usługi Event Grid pomija serwer proxy
W środowisku Databricks Runtime 15.2 lub nowszym połączenia usługi Event Grid w usłudze Auto Loader domyślnie używają ustawień serwera proxy z właściwości systemowych. W Databricks Runtime 13.3 LTS, 14.3 LTS i od 15.0 do 15.2 można ręcznie skonfigurować połączenia Event Grid, aby korzystać z serwera proxy, ustawiając właściwość Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true
. Zobacz Ustawianie właściwości konfiguracji platformy Spark w usłudze Azure Databricks.