Поделиться через


Что такое режим уведомлений о файлах автозагрузчика?

В режиме уведомлений о файлах автозагрузчик автоматически настраивает службу уведомлений и службу очередей, которые подписываются на события файлов из входного каталога. Вы можете использовать уведомления файлов для масштабирования автозагрузчика для приема миллионов файлов в час. По сравнению с режимом перечисления каталогов режим уведомлений о файлах является более производительным и масштабируемым для больших каталогов ввода или большого объема файлов, но требует дополнительных облачных разрешений.

Вы можете переключаться между уведомлениями о файлах и списком каталогов в любое время и по-прежнему поддерживать гарантии однократной обработки данных.

Примечание.

Режим уведомлений файлов не поддерживается для учетных записей хранения Azure уровня "Премиум", так как учетные записи уровня "Премиум" не поддерживают хранилище очередей.

Предупреждение

Изменение исходного пути для автозагрузчика не поддерживается для режима уведомлений файлов. Если используется режим уведомлений о файлах и путь изменен, возможно, не удается обработать файлы, которые уже присутствуют в новом каталоге во время изменения каталога.

Режим уведомлений файлов поддерживается только в вычислительных ресурсах одного пользователя.

Облачные ресурсы, используемые в режиме уведомления файла автозагрузчика

Внимание

Вам нужны повышенные разрешения для автоматической настройки облачной инфраструктуры для режима уведомлений о файлах. Обратитесь к администратору облака или администратору рабочей области. Видеть:

Автозагрузчик может автоматически настраивать уведомления о файлах при настройке параметра cloudFiles.useNotificationstrue и предоставления необходимых разрешений для создания облачных ресурсов. Кроме того, может потребоваться предоставить дополнительные параметры для того, чтобы автозагрузчик получил разрешение на создание этих ресурсов.

В следующей таблице приведены сведения о том, какие ресурсы создаются автозагрузчиком.

Облачное хранилище Служба подписки Использование хранилища очередей из Python Префикс* Предел**
Amazon S3 AWS SNS AWS SQS databricks-auto-ingest 100 на каждый контейнер S3
ADLS 2-го поколения Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
GCS Публикация и подписка Google Публикация и подписка Google databricks-auto-ingest 100 на каждый контейнер GCS
Хранилище BLOB-объектов Azure Сетку событий Azure Хранилище очередей Azure databricks 500 на каждую учетную запись хранения
  • Автозагрузчик именует ресурсы с использованием этого префикса.

**Количество параллельных конвейеров уведомлений о файлах, которые можно запустить.

Если для конкретной учетной записи хранения требуется запускать больше, чем ограниченное количество конвейеров уведомлений о файлах, можно выполнить следующие действия.

  • Используйте службу, такую как AWS Lambda, Azure Functions или Google Cloud Functions, чтобы распространять уведомления из одной очереди, которая прослушивает весь контейнер или бакет, в очереди, специфичные для каталогов.

События уведомлений о файлах

Amazon S3 предоставляет событие ObjectCreated при загрузке файла в бакет S3, независимо от того, была ли использована однократная или многократная загрузка.

ADLS 2-го поколения предоставляет различные уведомления о событиях для файлов, отображаемых в контейнере 2-го поколения.

  • Автозагрузчик прослушивает на наличие события FlushWithClose для обработки файла.
  • Потоки автозагрузчика поддерживают RenameFile действие обнаружения файлов. RenameFile действия требуют запроса API к системе хранения, чтобы получить размер переименованного файла.
  • Потоки Автозагрузчика, созданные с помощью Databricks Runtime 9.0 и более поздних версий, поддерживают действие RenameDirectory для обнаружения файлов. RenameDirectory действия требуют запросов API к системе хранения для перечисления содержимого переименованного каталога.

Google Cloud Storage предоставляет событие OBJECT_FINALIZE при передаче файла, включая перезапись и копирование файлов. Неудачные загрузки не вызывают это событие.

Примечание.

Поставщики облачных служб не гарантируют доставку 100% всех событий файлов в очень редких случаях и не предоставляют строгие SLA по задержке в обработке событий файла. В Databricks рекомендуется активировать регулярные обратные заполнения с помощью Автозагрузчика, используя параметр cloudFiles.backfillInterval, чтобы гарантировать, что все файлы будут обнаружены в рамках данного соглашения об уровне обслуживания, если полнота данных является обязательным условием. Активация регулярных выполнения задним числом не приведет к появлению дубликатов.

Необходимые разрешения для настройки уведомлений о файлах для ADLS 2-го поколения и Хранилище BLOB-объектов Azure

Необходимо иметь разрешения на чтение для входного каталога. См. раздел Хранилище BLOB-объектов Azure.

Чтобы использовать режим уведомлений файлов, необходимо предоставить учетные данные проверки подлинности для настройки и доступа к службам уведомлений о событиях.

Для проверки подлинности требуется только субъект-служба.

  • Субъект-служба — использование встроенных ролей Azure

    Создайте приложение и субъект-службу Microsoft Entra ID (ранее Azure Active Directory) в виде идентификатора клиента и секрета клиента.

    Присвойте этому приложению указанные ниже роли для учетной записи хранения, в которой находится входной путь.

    • Участник: эта роль предназначена для настройки ресурсов в учетной записи хранения, таких как очереди и подписки на события.
    • Участник данных очереди хранилища: эта роль предназначена для выполнения операций с очередями, таких как извлечение и удаление сообщений из очередей. Эта роль требуется только при предоставлении субъекта-службы без строка подключения.

    Присвойте этому приложению указанную ниже роль в связанной группе ресурсов.

    • Участник подписки на Azure Event Grid. Эта роль предназначена для выполнения операций подписки Azure Event Grid, таких как создание или перечисление подписок на события.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

  • Основная учетная запись службы — использование пользовательской роли

    Если вы обеспокоены избыточными разрешениями, необходимыми для предыдущих ролей, можно создать пользовательскую роль по крайней мере со следующими разрешениями, перечисленными ниже в формате JSON роли Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Затем вы можете назначить эту настраиваемую роль приложению.

    Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.

параметры разрешений автозагрузчика

Необходимые разрешения для настройки уведомлений о файлах для Amazon S3

Необходимо иметь разрешения на чтение для входного каталога. Дополнительные сведения см. в сведениях о подключении S3.

Чтобы использовать режим уведомления о файлах, подключите следующий JSON-документ политики к пользователю или роли IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

где:

  • <bucket-name>: имя контейнера S3, из которого поток будет считывать файлы, например auto-logs. Можно использовать * в качестве подстановочного знака, например databricks-*-logs. Чтобы узнать базовый контейнер S3 для пути к DBFS, можно перечислить все точки подключения DBFS в записной книжке, выполнив %fs mounts.
  • <region>: регион AWS, в котором находится контейнер S3, например, us-west-2. Если вы не хотите указывать регион, используйте *.
  • <account-number>: номер учетной записи AWS, которой принадлежит контейнер S3, например 123456789012. Если вы не хотите указывать номер учетной записи, используйте *.

Строка databricks-auto-ingest-* в спецификации SQS и SNS ARN — это префикс имени, который источник cloudFiles использует при создании служб SQS и SNS. Так как Azure Databricks настраивает службы уведомлений при первоначальном запуске потока, после первоначального запуска (например, поток был остановлен, а затем перезапущен) вы можете использовать политику с ограниченными разрешениями.

Примечание.

Описанная выше политика связана только с разрешениями, необходимыми для настройки служб уведомлений о файлах, а именно служб уведомлений контейнера S3, SNS и SQS, и предполагается, что у вас уже есть доступ на чтение к контейнеру S3. Если вам нужно добавить разрешения только для чтения S3, добавьте следующее в список Action в инструкции DatabricksAutoLoaderSetup в документе JSON:

  • s3:ListBucket
  • s3:GetObject

Ограниченные разрешения после начальной настройки

Описанные выше разрешения на настройку ресурсов необходимы только во время начального запуска потока. После первого запуска можно переключиться на следующую политику IAM с ограниченными разрешениями.

Внимание

С ограниченными разрешениями вы не сможете запускать новые запросы потоковой передачи или повторно создавать ресурсы в случае сбоев (например, очередь SQS была случайно удалена); Вы также не можете использовать API управления облачными ресурсами для перечисления или отмены ресурсов.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:PurgeQueue"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Необходимые разрешения для настройки уведомлений о файлах для GCS

Необходимо иметь разрешения list и get для контейнера GCS и всех объектов. Дополнительные сведения см. в документации Google по разрешениям IAM.

Чтобы использовать режим уведомления о файлах, необходимо добавить разрешения для учетной записи службы GCS и учетной записи, используемой для доступа к ресурсам службы публикации/подписки Google Cloud.

Добавьте роль Pub/Sub Publisher в учетную запись службы GCS. Это позволит учетной записи публиковать уведомительные сообщения о событиях из контейнеров GCS в службе публикации и подписки Google Cloud.

Как и для учетной записи службы, используемой для ресурсов службы публикации и подписки Google Cloud, вам нужно добавить следующие разрешения.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Для этого можно создать настраиваемую роль IAM с этими разрешениями или назначить уже существующие роли GCP, чтобы обеспечить эти разрешения.

Поиск учетной записи службы GCS

В консоли Google Cloud для соответствующего проекта перейдите к Cloud Storage > Settings. Раздел "Учетная запись службы облачного хранения" содержит адрес электронной почты учетной записи службы GCS.

Учетная запись службы GCS

Создание настраиваемой роли IAM Google Cloud для режима уведомлений о файлах

В консоли Google Cloud для соответствующего проекта перейдите к IAM & Admin > Roles. Затем создайте роль в верхней части или обновите существующую роль. На экране для создания или редактирования ролей щелкните Add Permissions. Появится меню, с помощью которого вы можете добавить требуемые разрешения для роли.

Настраиваемые роли IAM GCP

Настройка ресурсов уведомлений файлов или управление ими вручную

Привилегированные пользователи могут вручную настраивать ресурсы уведомлений о файлах или управлять ими.

  • Настройте службы уведомлений файлов вручную через поставщика облачных служб и вручную укажите идентификатор очереди. Дополнительные сведения см. в разделе Параметры уведомлений о файлах.
  • Используйте API Scala для создания уведомлений и служб очереди или управления ими, как показано в следующем примере:

Примечание.

Необходимо иметь соответствующие разрешения для настройки или изменения облачной инфраструктуры. См. документацию по разрешениям для Azure, S3 или GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Используется setUpNotificationServices(<resource-suffix>) для создания очереди и подписки с именем <prefix>-<resource-suffix> (префикс зависит от системы хранения, суммируемой в облачных ресурсах, используемых в режиме уведомления файла автозагрузчика. Если ресурс с таким же именем уже существует, Azure Databricks будет повторно использовать уже существующий ресурс вместо того, чтобы создавать новый. Эта функция возвращает идентификатор очереди, который можно передать источнику cloudFiles, используя идентификатор в параметрах уведомления файла. Это позволяет исходному пользователю cloudFiles иметь меньше разрешений, чем пользователю, который создает ресурсы.

Укажите для параметра "path" значение newManager только при вызове setUpNotificationServices; это не требуется для listNotificationServices или tearDownNotificationServices. Это тот же путь path, который вы используете при выполнении запроса потоковой передачи.

В следующей матрице указывается, какие методы API поддерживаются в среде выполнения Databricks для каждого типа хранилища:

Облачное хранилище API установки API списка Уничтожение API
Amazon S3 Все версии Все версии Все версии
ADLS 2-го поколения Все версии Все версии Все версии
GCS Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше Databricks Runtime 9.1 и выше
Хранилище BLOB-объектов Azure Все версии Все версии Все версии
ADLS 1-го поколения Не поддерживается Не поддерживается Не поддерживается

Устранение распространенных ошибок

В этом разделе описываются распространенные ошибки при использовании автозагрузчика с режимом уведомлений о файлах и их устранении.

Не удалось создать подписку сетки событий

Если при первом запуске автозагрузчика отображается следующее сообщение об ошибке, сетка событий не зарегистрирована в качестве поставщика ресурсов в подписке Azure.

java.lang.RuntimeException: Failed to create event grid subscription.

Чтобы зарегистрировать Сетку событий в качестве поставщика ресурсов, сделайте следующее:

  1. На портале Azure перейдите к подписке.
  2. Кликните "Поставщики ресурсов" в разделе "Параметры".
  3. Зарегистрируйте поставщик Microsoft.EventGrid.

Авторизация, необходимая для выполнения операций подписки сетки событий

Если при первом запуске автозагрузчика отображается следующее сообщение об ошибке, убедитесь, что роль участника назначена субъекту-службе для сетки событий и учетной записи хранения.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Клиент Event Grid обходит прокси-сервер

В Databricks Runtime 15.2 и более поздних версиях подключения Event Grid в Auto Loader по умолчанию используют параметры прокси-сервера из системных свойств. В Databricks Runtime 13.3 LTS, 14.3 LTS и с 15.0 по 15.2 можно вручную настроить подключения Event Grid для использования прокси-сервера, задав свойство Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. См. статью Настройка свойств конфигурации Spark в Azure Databricks.