迁移应用程序以将无密码连接用于适用于 Kafka 的 Azure 事件中心

本文介绍如何从传统身份验证方法迁移到更安全的无密码连接,以及适用于 Kafka 的 Azure 事件中心。

必须对 Kafka Azure 事件中心的应用程序请求进行身份验证。 适用于 Kafka 的Azure 事件中心为应用提供安全连接的不同方式。 其中一种方法是使用连接字符串。 但是,应尽可能确定应用程序中无密码连接的优先级。

自 Spring Cloud Azure 4.3.0 以来支持无密码连接。 本文是从 Spring Cloud Stream Kafka 应用程序中删除凭据的迁移指南。

比较身份验证选项

当应用程序使用 Kafka 的 Azure 事件中心进行身份验证时,它提供连接事件中心命名空间的授权实体。 Apache Kafka 协议提供多个用于身份验证的简单身份验证和安全层 (SASL) 机制。 根据 SASL 机制,可以使用两种身份验证选项来授权访问安全资源:Microsoft Entra 身份验证和共享访问签名(SAS)身份验证。

Microsoft Entra 身份验证

Microsoft Entra 身份验证是使用 Microsoft Entra ID 中定义的标识连接到 Kafka Azure 事件中心的机制。 使用 Microsoft Entra 身份验证,可以在中心位置管理服务主体标识和其他Microsoft 服务,从而简化权限管理。

使用 Microsoft Entra ID 进行身份验证可提供以下优势:

  • 以统一的方式跨 Azure 服务对用户进行身份验证。
  • 在统一的位置管理密码策略和密码轮换。
  • Microsoft Entra ID 支持的多种形式的身份验证,因此无需存储密码。
  • 客户可以使用外部(Microsoft Entra ID)组管理事件中心权限。
  • 对连接到 Kafka Azure 事件中心的应用程序支持基于令牌的身份验证。

SAS 身份验证

事件中心还提供共享访问签名(SAS),用于对 Kafka 资源的事件中心进行委派访问。

尽管可以使用 SAS 连接到 Kafka Azure 事件中心,但应谨慎使用。 你必须勤奋地永远不会在不安全的位置公开连接字符串。 获得连接字符串访问权限的任何人都可以进行身份验证。 例如,如果意外将连接字符串签入源代码管理、通过不安全的电子邮件发送、粘贴到错误的聊天中或由不应具有权限的人员查看,恶意用户可能会访问该应用程序。 相反,使用基于 OAuth 2.0 令牌的机制授权访问可提供优于 SAS 的安全性和易用性。 请考虑将应用程序更新为使用无密码连接。

引入无密码连接

使用无密码连接,无需在应用程序代码、配置文件或环境变量中存储任何凭据即可连接到 Azure 服务。

许多 Azure 服务支持无密码连接,例如通过 Azure 托管标识。 这些技术提供可靠的安全功能,可以使用 Azure 标识客户端库中的 DefaultAzureCredential 实现。 本教程介绍如何更新现有应用程序以使用DefaultAzureCredential,而不是连接字符串等替代方法。

DefaultAzureCredential 支持多种身份验证方法,并自动确定应在运行时使用哪种方法。 应用通过此方法能够在不同环境(本地开发与生产)中使用不同的身份验证方法,而无需实现特定于环境的代码。

可在 Azure 标识库概述中找到搜索凭据的顺序和位置DefaultAzureCredential。 例如,在本地工作时, DefaultAzureCredential 通常会使用开发人员用于登录 Visual Studio 的帐户进行身份验证。 将应用部署到 Azure 后,DefaultAzureCredential 将自动切换为使用托管标识。 此转换不需要进行任何代码更改。

若要确保连接无密码,必须同时考虑本地开发和生产环境。 如果任一位置都需要连接字符串,则应用程序不是无密码的。

在本地开发环境中,可以使用适用于 Visual Studio Code 或 IntelliJ 的 Azure CLI、Azure PowerShell、Visual Studio 或 Azure 插件进行身份验证。 在这种情况下,可以在应用程序中使用该凭据,而不是配置属性。

将应用程序部署到 Azure 托管环境(例如虚拟机)时,可以在该环境中分配托管标识。 然后,无需提供凭据即可连接到 Azure 服务。

注意

托管标识提供用于表示应用或服务的安全标识。 标识由 Azure 平台托管,无需设置或转交任何机密。 可以在概述文档中详细了解托管标识。

迁移现有应用程序以使用无密码连接

以下步骤说明如何迁移现有应用程序以使用无密码连接而不是 SAS 解决方案。

0) 为本地开发身份验证准备工作环境

首先,使用以下命令设置一些环境变量。

export AZ_RESOURCE_GROUP=<YOUR_RESOURCE_GROUP>
export AZ_EVENTHUBS_NAMESPACE_NAME=<YOUR_EVENTHUBS_NAMESPACE_NAME>
export AZ_EVENTHUB_NAME=<YOUR_EVENTHUB_NAME>

使用以下值替换占位符,在本文中将使用这些值:

  • <YOUR_RESOURCE_GROUP>:要使用的资源组的名称。
  • <YOUR_EVENTHUBS_NAMESPACE_NAME>:要使用的Azure 事件中心命名空间的名称。
  • <YOUR_EVENTHUB_NAME>:将使用的事件中心的名称。

1) 授予Azure 事件中心的权限

如果要使用 Microsoft Entra 身份验证在本地运行此示例,请确保用户帐户已通过用于 IntelliJ 的 Azure 工具包、Visual Studio Code Azure 帐户插件或 Azure CLI 进行身份验证。 此外,请确保帐户已授予足够的权限。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。

  2. 在“事件中心概述”页上, 从左侧菜单中选择“访问控制”(IAM )。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“ 添加 ”,然后 从生成的下拉菜单中添加角色分配

    事件中心命名空间资源的Azure 门户 访问控制(IAM)页的屏幕截图,其中突出显示了“添加角色分配”。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,搜索Azure 事件中心数据发送方Azure 事件中心数据接收器,然后选择匹配的结果,然后选择“下一步”。

  6. 在“分配访问权限”下,选择“用户”、“组”或服务主体,然后选择“选择成员”。

  7. 在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

有关授予访问角色的详细信息,请参阅 使用 Microsoft Entra ID 授权访问事件中心资源。

2) 登录并迁移应用代码以使用无密码连接

对于本地开发,请确保使用为事件中心分配了角色的相同Microsoft Entra 帐户进行身份验证。 可通过 Azure CLI、Visual Studio、Azure PowerShell 或其他工具(如 IntelliJ)进行身份验证。

使用以下命令通过 Azure CLI 登录到 Azure:

az login

接下来,使用以下步骤更新 Spring Kafka 应用程序以使用无密码连接。 尽管从概念上讲相似,但每个框架使用不同的实现详细信息。

  1. 在项目中,打开 pom.xml 文件并添加以下引用:

    <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-identity</artifactId>
       <version>1.6.0</version>
    </dependency>
    
  2. 迁移后,在项目中为 OAuth2 身份验证实现 AuthenticateCallbackHandlerOAuthBearerToken ,如以下示例所示。

    public class KafkaOAuth2AuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    
       private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(30);
       private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
    
       private Function<TokenCredential, Mono<OAuthBearerTokenImp>> resolveToken;
       private final TokenCredential credential = new DefaultAzureCredentialBuilder().build();
    
       @Override
       public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
          TokenRequestContext request = buildTokenRequestContext(configs);
          this.resolveToken = tokenCredential -> tokenCredential.getToken(request).map(OAuthBearerTokenImp::new);
       }
    
       private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
          URI uri = buildEventHubsServerUri(configs);
          String tokenAudience = buildTokenAudience(uri);
    
          TokenRequestContext request = new TokenRequestContext();
          request.addScopes(tokenAudience);
          return request;
       }
    
       @SuppressWarnings("unchecked")
       private URI buildEventHubsServerUri(Map<String, ?> configs) {
          String bootstrapServer = Arrays.asList(configs.get(BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
          bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
          URI uri = URI.create("https://" + bootstrapServer);
          return uri;
       }
    
       private String buildTokenAudience(URI uri) {
          return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
       }
    
       @Override
       public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
          for (Callback callback : callbacks) {
             if (callback instanceof OAuthBearerTokenCallback) {
                OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
                this.resolveToken
                        .apply(credential)
                        .doOnNext(oauthCallback::token)
                        .doOnError(throwable -> oauthCallback.error("invalid_grant", throwable.getMessage(), null))
                        .block(ACCESS_TOKEN_REQUEST_BLOCK_TIME);
             } else {
                throw new UnsupportedCallbackException(callback);
             }
          }
       }
    
       @Override
       public void close() {
          // NOOP
       }
    }
    
    public class OAuthBearerTokenImp implements OAuthBearerToken {
        private final AccessToken accessToken;
        private final JWTClaimsSet claims;
    
        public OAuthBearerTokenImp(AccessToken accessToken) {
            this.accessToken = accessToken;
            try {
                claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
            } catch (ParseException exception) {
                throw new SaslAuthenticationException("Unable to parse the access token", exception);
            }
        }
    
        @Override
        public String value() {
            return accessToken.getToken();
        }
    
        @Override
        public Long startTimeMs() {
            return claims.getIssueTime().getTime();
        }
    
        @Override
        public long lifetimeMs() {
            return claims.getExpirationTime().getTime();
        }
    
        @Override
        public Set<String> scope() {
            // Referring to https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the scp
            // claim is a String, which is presented as a space separated list.
            return Optional.ofNullable(claims.getClaim("scp"))
                    .map(s -> Arrays.stream(((String) s)
                    .split(" "))
                    .collect(Collectors.toSet()))
                    .orElse(null);
        }
    
        @Override
        public String principalName() {
            return (String) claims.getClaim("upn");
        }
    
        public boolean isExpired() {
            return accessToken.isExpired();
        }
    }
    
  3. 创建 Kafka 生成者或使用者时,请添加支持 SASL/OAUTHBEARER 机制所需的配置。 以下示例演示代码在迁移前后的外观。 在这两个示例中,将 <eventhubs-namespace> 占位符替换为事件中心命名空间的名称。

    在迁移之前,代码应如以下示例所示:

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG,
            String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString));
    return new KafkaProducer<>(properties);
    

    迁移后,代码应如以下示例所示。 在此示例中,将 <path-to-your-KafkaOAuth2AuthenticateCallbackHandler> 占位符替换为实现 KafkaOAuth2AuthenticateCallbackHandler的全类名。

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required");
    properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "<path-to-your-KafkaOAuth2AuthenticateCallbackHandler>");
    return new KafkaProducer<>(properties);
    

在本地运行应用

进行这些代码更改后,在本地运行应用程序。 新配置应选取本地凭据,前提是已登录到兼容的 IDE 或命令行工具,例如 Azure CLI、Visual Studio 或 IntelliJ。 分配给 Azure 中本地开发用户的角色将允许应用在本地连接到 Azure 服务。

3) 配置 Azure 托管环境

将应用程序配置为使用无密码连接并在本地运行后,相同的代码可以在部署到 Azure 后向 Azure 服务进行身份验证。 例如,部署到已分配托管标识的 Azure Spring Apps 实例的应用程序可以连接到 Kafka 的 Azure 事件中心。

在本部分中,你将执行两个步骤,使应用程序能够以无密码方式在 Azure 托管环境中运行:

  • 为 Azure 托管环境分配托管标识。
  • 将角色分配给托管标识。

注意

Azure 还提供 服务连接器,可帮助将托管服务与事件中心连接。 使用服务连接器配置托管环境时,可以省略将角色分配给托管标识的步骤,因为服务连接器会为你执行此操作。 以下部分介绍如何通过两种方式配置 Azure 托管环境:一种是通过服务连接器,另一种是通过直接配置每个托管环境。

重要

服务连接器的命令需要 Azure CLI 2.41.0 或更高版本。

为 Azure 托管环境分配托管标识

以下步骤演示如何为各种 Web 托管服务分配系统分配的托管标识。 托管标识可以使用之前设置的应用配置安全地连接到其他 Azure 服务。

  1. 在Azure App 服务实例的主概述页上,从导航窗格中选择“标识”。

  2. “系统分配 ”选项卡上,确保将 “状态 ”字段设置为 “打开”。 系统分配的标识由 Azure 在内部进行管理,并为你处理管理任务。 标识的详细信息和 ID 永远不会在代码中公开。

还可以使用 Azure CLI 在 Azure 托管环境中分配托管标识。

可以使用 az webapp identity assign 命令将托管标识分配给 Azure App 服务 实例,如以下示例所示。

export AZURE_MANAGED_IDENTITY_ID=$(az webapp identity assign \
    --resource-group $AZ_RESOURCE_GROUP \
    --name <app-service-name> \
    --query principalId \
    --output tsv)

为托管标识分配角色

接下来,向创建的托管标识授予访问事件中心命名空间的权限。 可以通过向托管标识分配角色来授予权限,就像对本地开发用户所做的那样。

如果使用服务连接器连接服务,则无需完成此步骤。 已为你处理以下必要的配置:

  • 如果在创建连接时选择了托管标识,则会为应用创建系统分配的托管标识,并在事件中心上分配了Azure 事件中心数据发送方Azure 事件中心数据接收器角色。

  • 如果选择使用连接字符串,则连接字符串已添加为应用环境变量。

测试应用程序

进行这些代码更改后,在浏览器中浏览到托管应用程序。 应用应能够成功连接到 Kafka 的Azure 事件中心。 请记住,角色分配通过 Azure 环境传播最长可能需要五分钟的时间。 应用程序现在配置为在本地和生产环境中运行,开发人员无需管理应用程序本身的机密。

后续步骤

本教程介绍了如何将应用程序迁移到无密码连接。

可以阅读以下资源,更深入地了解本文中讨论的概念: