什么是自动加载程序文件通知模式?

在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 使用大型输入目录或大量文件时,文件通知模式相对于目录列表模式而言具有更高的性能和可伸缩性,但需要额外的云权限。

可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。

注意

Azure 高级存储帐户不支持文件通知模式,因为高级帐户不支持队列存储。

警告

文件通知模式不支持更改自动加载程序的源路径。 如果启用文件通知模式并更改了路径,您可能会在目录更改为 update时无法引入新目录中已存在的文件。

仅单用户计算环境支持文件通知模式。

自动加载程序文件通知模式中使用的云资源

重要

需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:

如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。

以下 table 总结了由 Auto Loader 创建的资源。

云存储 订阅服务 队列服务 前缀 * Limit **
Amazon S3 AWS SNS AWS SQS databricks-auto-ingest 每个 S3 存储桶 100 个
ADLS Gen2 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 每个 GCS 存储桶有 100 个
Azure Blob 存储 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
  • 自动加载程序使用此前缀命名资源。

** 可以启动多少个并发文件通知管道

如果需要为给定存储帐户运行超过有限数量的文件通知管道,则可以:

  • 利用 AWS Lambda、Azure Functions 或 Google Cloud Functions 等服务,将来自单个队列(该队列侦听整个容器或存储桶)的通知扇出,并放入目录特定的队列。

文件通知事件

当文件上传到 S3 存储桶时,Amazon S3 都会提供一个 ObjectCreated 事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。

ADLS Gen2 为 Gen2 容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 自动加载程序流支持用于发现文件的 RenameFile 操作。 RenameFile 操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory 操作,以便发现文件。 RenameDirectory 操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

Google Cloud Storage 会在上传文件时提供一个 OBJECT_FINALIZE 事件,其中包括覆盖和文件副本。 失败的上传不会生成此事件。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

为 ADLS Gen2 和 Azure Blob 存储配置文件通知所需的权限

你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储

若要使用文件通知模式,必须提供身份验证 credentials 来设置和访问事件通知服务。

只需创建用于身份验证的服务主体。

  • 服务主体 - 使用 Azure 内置角色

    以客户端 ID 和客户端密码的形式创建 Microsoft Entra ID(前 Azure Active Directory)应用和服务主体

    为此应用分配输入路径所在的存储帐户的以下角色:

    • 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
    • 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,才需要此角色。

    为此应用分配相关资源组的以下角色:

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 服务主体 - 使用自定义角色

    如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    然后,可以将此自定义角色分配给应用。

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

自动加载程序权限设置

Amazon S3 配置文件通知所需的权限

你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息

若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

where:

  • <bucket-name>:流将在其中读取文件的 S3 存储桶名称,例如 auto-logs。 可以使用 * 作为通配符,例如 databricks-*-logs。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行 %fs mounts 列出笔记本中的所有 DBFS 装入点。
  • <region>:S3 存储桶所在的 AWS 区域,例如 us-west-2。 如果不想指定区域,请使用 *
  • <account-number>:拥有 S3 存储桶的 AWS 帐号,例如 123456789012。 如果不想指定帐号,请使用 *

SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*cloudFiles 源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。

注意

上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果需要添加 S3 只读权限,请将以下内容添加到 JSON 文档中 DatabricksAutoLoaderSetup 语句中的 Actionlist:

  • s3:ListBucket
  • s3:GetObject

初始设置后权限减少

上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。

重要

由于权限减少,在发生故障时无法启动新的流式处理查询或重新创建资源(例如,SQS 队列已被意外删除):也不能使用云资源管理 API 来 list 或拆解资源。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:PurgeQueue"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

为 GCS 配置文件通知所需的权限

你必须对你的 GCS Bucket 和所有对象具有 listget 权限。 有关详细信息,请参阅有关 IAM 权限的 Google 文档。

若要使用文件通知模式,需要为 GCS 服务帐户以及用于访问 Google Cloud Pub/Sub 资源的帐户添加权限。

Pub/Sub Publisher 角色添加到 GCS 服务帐户。 这样,该帐户就可以将事件通知消息从 GCS 桶发布到 Google Cloud Pub/Sub。

对于用于 Google Cloud Pub/Sub 资源的服务帐户,需要添加以下权限:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

为此,可以创建拥有这些权限的 IAM 自定义角色,或分配预先存在的 GCP 角色来涵盖这些权限。

查找 GCS 服务帐户

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 Cloud Storage > Settings。 “云存储服务帐号”部分包含 GCS 服务帐号的电子邮件。

GCS 服务帐户

创建用于文件通知模式的自定义 Google Cloud IAM 角色

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 IAM & Admin > Roles。 然后,在顶部创建一个角色或更新现有角色。 在角色创建或编辑屏幕中,单击 Add Permissions。 此时会弹出一个菜单,你可以在其中向角色添加所需的权限。

GCP IAM 自定义角色

手动配置或管理文件通知资源

特权用户可以手动配置或管理文件通知资源。

  • 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项
  • 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:

注意

必须要具有适当权限才能配置或修改云基础结构。 请参阅 AzureS3GCS 的权限文档。

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 创建名为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于自动加载程序文件通知模式中使用的云资源中总结的存储系统)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用文件通知选项中的标识符将该标识符传递给 cloudFiles 源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。

只有调用 "path" 时才需要提供 newManagersetUpNotificationServices 选项;对于 listNotificationServicestearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path

以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:

云存储 安装程序 API List API 拆解 API
Amazon S3 所有版本 所有版本 所有版本
ADLS Gen2 所有版本 所有版本 所有版本
GCS Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本
Azure Blob 存储 所有版本 所有版本 所有版本
ADLS Gen1 不支持 不支持 不支持

排查常见错误

本部分介绍将自动加载程序与文件通知模式配合使用时出现的常见错误,以及如何解决这些问题。

未能创建事件网格订阅

如果在首次运行自动加载程序时看到以下错误消息,则事件网格不会在 Azure 订阅中注册为资源提供程序。

java.lang.RuntimeException: Failed to create event grid subscription.

若要将事件网格注册为资源提供程序,请执行以下操作:

  1. 在 Azure 门户中,转到订阅。
  2. 单击“设置”部分下的 资源 Providers
  3. 注册提供程序 Microsoft.EventGrid

执行事件网格订阅操作所需的授权

如果在首次运行 Auto Loader 时看到以下错误消息,请确认已将 参与者 角色分配给事件网格和存储帐户的服务主体。

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

事件网格客户端绕过代理

在 Databricks Runtime 15.2 及更高版本中,自动加载程序中的事件网格 connections 默认使用系统属性中的代理设置。 在 Databricks Runtime 13.3 LTS、14.3 LTS 和 15.0 到 15.2 中,可以通过设置 Spark 配置 属性 spark.databricks.cloudFiles.eventGridClient.useSystemProperties true来手动配置事件网格 connections 以使用代理。 请参阅 Azure Databricks上的 Spark 配置属性。