迁移应用程序以将无密码连接用于适用于 Kafka 的 Azure 事件中心
本文介绍如何从传统身份验证方法迁移到更安全的无密码连接,以及适用于 Kafka 的 Azure 事件中心。
必须对 Kafka Azure 事件中心的应用程序请求进行身份验证。 适用于 Kafka 的Azure 事件中心为应用提供安全连接的不同方式。 其中一种方法是使用连接字符串。 但是,应尽可能确定应用程序中无密码连接的优先级。
自 Spring Cloud Azure 4.3.0 以来支持无密码连接。 本文是从 Spring Cloud Stream Kafka 应用程序中删除凭据的迁移指南。
比较身份验证选项
当应用程序使用 Kafka 的 Azure 事件中心进行身份验证时,它提供连接事件中心命名空间的授权实体。 Apache Kafka 协议提供多个用于身份验证的简单身份验证和安全层 (SASL) 机制。 根据 SASL 机制,可以使用两种身份验证选项来授权访问安全资源:Microsoft Entra 身份验证和共享访问签名(SAS)身份验证。
Microsoft Entra 身份验证
Microsoft Entra 身份验证是使用 Microsoft Entra ID 中定义的标识连接到 Kafka Azure 事件中心的机制。 使用 Microsoft Entra 身份验证,可以在中心位置管理服务主体标识和其他Microsoft 服务,从而简化权限管理。
使用 Microsoft Entra ID 进行身份验证可提供以下优势:
- 以统一的方式跨 Azure 服务对用户进行身份验证。
- 在统一的位置管理密码策略和密码轮换。
- Microsoft Entra ID 支持的多种形式的身份验证,因此无需存储密码。
- 客户可以使用外部(Microsoft Entra ID)组管理事件中心权限。
- 对连接到 Kafka Azure 事件中心的应用程序支持基于令牌的身份验证。
SAS 身份验证
事件中心还提供共享访问签名(SAS),用于对 Kafka 资源的事件中心进行委派访问。
尽管可以使用 SAS 连接到 Kafka Azure 事件中心,但应谨慎使用。 你必须勤奋地永远不会在不安全的位置公开连接字符串。 获得连接字符串访问权限的任何人都可以进行身份验证。 例如,如果意外将连接字符串签入源代码管理、通过不安全的电子邮件发送、粘贴到错误的聊天中或由不应具有权限的人员查看,恶意用户可能会访问该应用程序。 相反,使用基于 OAuth 2.0 令牌的机制授权访问可提供优于 SAS 的安全性和易用性。 请考虑将应用程序更新为使用无密码连接。
引入无密码连接
使用无密码连接,无需在应用程序代码、配置文件或环境变量中存储任何凭据即可连接到 Azure 服务。
许多 Azure 服务支持无密码连接,例如通过 Azure 托管标识。 这些技术提供可靠的安全功能,可以使用 Azure 标识客户端库中的 DefaultAzureCredential 实现。 本教程介绍如何更新现有应用程序以使用DefaultAzureCredential
,而不是连接字符串等替代方法。
DefaultAzureCredential
支持多种身份验证方法,并自动确定应在运行时使用哪种方法。 应用通过此方法能够在不同环境(本地开发与生产)中使用不同的身份验证方法,而无需实现特定于环境的代码。
可在 Azure 标识库概述中找到搜索凭据的顺序和位置DefaultAzureCredential
。 例如,在本地工作时, DefaultAzureCredential
通常会使用开发人员用于登录 Visual Studio 的帐户进行身份验证。 将应用部署到 Azure 后,DefaultAzureCredential
将自动切换为使用托管标识。 此转换不需要进行任何代码更改。
若要确保连接无密码,必须同时考虑本地开发和生产环境。 如果任一位置都需要连接字符串,则应用程序不是无密码的。
在本地开发环境中,可以使用适用于 Visual Studio Code 或 IntelliJ 的 Azure CLI、Azure PowerShell、Visual Studio 或 Azure 插件进行身份验证。 在这种情况下,可以在应用程序中使用该凭据,而不是配置属性。
将应用程序部署到 Azure 托管环境(例如虚拟机)时,可以在该环境中分配托管标识。 然后,无需提供凭据即可连接到 Azure 服务。
注意
托管标识提供用于表示应用或服务的安全标识。 标识由 Azure 平台托管,无需设置或转交任何机密。 可以在概述文档中详细了解托管标识。
迁移现有应用程序以使用无密码连接
以下步骤说明如何迁移现有应用程序以使用无密码连接而不是 SAS 解决方案。
0) 为本地开发身份验证准备工作环境
首先,使用以下命令设置一些环境变量。
export AZ_RESOURCE_GROUP=<YOUR_RESOURCE_GROUP>
export AZ_EVENTHUBS_NAMESPACE_NAME=<YOUR_EVENTHUBS_NAMESPACE_NAME>
export AZ_EVENTHUB_NAME=<YOUR_EVENTHUB_NAME>
使用以下值替换占位符,在本文中将使用这些值:
<YOUR_RESOURCE_GROUP>
:要使用的资源组的名称。<YOUR_EVENTHUBS_NAMESPACE_NAME>
:要使用的Azure 事件中心命名空间的名称。<YOUR_EVENTHUB_NAME>
:将使用的事件中心的名称。
1) 授予Azure 事件中心的权限
如果要使用 Microsoft Entra 身份验证在本地运行此示例,请确保用户帐户已通过用于 IntelliJ 的 Azure 工具包、Visual Studio Code Azure 帐户插件或 Azure CLI 进行身份验证。 此外,请确保帐户已授予足够的权限。
在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。
在“事件中心概述”页上, 从左侧菜单中选择“访问控制”(IAM )。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择“ 添加 ”,然后 从生成的下拉菜单中添加角色分配 。
使用搜索框将结果筛选为所需角色。 对于此示例,搜索Azure 事件中心数据发送方和Azure 事件中心数据接收器,然后选择匹配的结果,然后选择“下一步”。
在“分配访问权限”下,选择“用户”、“组”或服务主体,然后选择“选择成员”。
在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。
有关授予访问角色的详细信息,请参阅 使用 Microsoft Entra ID 授权访问事件中心资源。
2) 登录并迁移应用代码以使用无密码连接
对于本地开发,请确保使用为事件中心分配了角色的相同Microsoft Entra 帐户进行身份验证。 可通过 Azure CLI、Visual Studio、Azure PowerShell 或其他工具(如 IntelliJ)进行身份验证。
使用以下命令通过 Azure CLI 登录到 Azure:
az login
接下来,使用以下步骤更新 Spring Kafka 应用程序以使用无密码连接。 尽管从概念上讲相似,但每个框架使用不同的实现详细信息。
在项目中,打开 pom.xml 文件并添加以下引用:
<dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> <version>1.6.0</version> </dependency>
迁移后,在项目中为 OAuth2 身份验证实现 AuthenticateCallbackHandler 和 OAuthBearerToken ,如以下示例所示。
public class KafkaOAuth2AuthenticateCallbackHandler implements AuthenticateCallbackHandler { private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(30); private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default"; private Function<TokenCredential, Mono<OAuthBearerTokenImp>> resolveToken; private final TokenCredential credential = new DefaultAzureCredentialBuilder().build(); @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { TokenRequestContext request = buildTokenRequestContext(configs); this.resolveToken = tokenCredential -> tokenCredential.getToken(request).map(OAuthBearerTokenImp::new); } private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) { URI uri = buildEventHubsServerUri(configs); String tokenAudience = buildTokenAudience(uri); TokenRequestContext request = new TokenRequestContext(); request.addScopes(tokenAudience); return request; } @SuppressWarnings("unchecked") private URI buildEventHubsServerUri(Map<String, ?> configs) { String bootstrapServer = Arrays.asList(configs.get(BOOTSTRAP_SERVERS_CONFIG)).get(0).toString(); bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", ""); URI uri = URI.create("https://" + bootstrapServer); return uri; } private String buildTokenAudience(URI uri) { return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost()); } @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { for (Callback callback : callbacks) { if (callback instanceof OAuthBearerTokenCallback) { OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; this.resolveToken .apply(credential) .doOnNext(oauthCallback::token) .doOnError(throwable -> oauthCallback.error("invalid_grant", throwable.getMessage(), null)) .block(ACCESS_TOKEN_REQUEST_BLOCK_TIME); } else { throw new UnsupportedCallbackException(callback); } } } @Override public void close() { // NOOP } }
public class OAuthBearerTokenImp implements OAuthBearerToken { private final AccessToken accessToken; private final JWTClaimsSet claims; public OAuthBearerTokenImp(AccessToken accessToken) { this.accessToken = accessToken; try { claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet(); } catch (ParseException exception) { throw new SaslAuthenticationException("Unable to parse the access token", exception); } } @Override public String value() { return accessToken.getToken(); } @Override public Long startTimeMs() { return claims.getIssueTime().getTime(); } @Override public long lifetimeMs() { return claims.getExpirationTime().getTime(); } @Override public Set<String> scope() { // Referring to https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the scp // claim is a String, which is presented as a space separated list. return Optional.ofNullable(claims.getClaim("scp")) .map(s -> Arrays.stream(((String) s) .split(" ")) .collect(Collectors.toSet())) .orElse(null); } @Override public String principalName() { return (String) claims.getClaim("upn"); } public boolean isExpired() { return accessToken.isExpired(); } }
创建 Kafka 生成者或使用者时,请添加支持 SASL/OAUTHBEARER 机制所需的配置。 以下示例演示代码在迁移前后的外观。 在这两个示例中,将
<eventhubs-namespace>
占位符替换为事件中心命名空间的名称。在迁移之前,代码应如以下示例所示:
Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString)); return new KafkaProducer<>(properties);
迁移后,代码应如以下示例所示。 在此示例中,将
<path-to-your-KafkaOAuth2AuthenticateCallbackHandler>
占位符替换为实现KafkaOAuth2AuthenticateCallbackHandler
的全类名。Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"); properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "<path-to-your-KafkaOAuth2AuthenticateCallbackHandler>"); return new KafkaProducer<>(properties);
在本地运行应用
进行这些代码更改后,在本地运行应用程序。 新配置应选取本地凭据,前提是已登录到兼容的 IDE 或命令行工具,例如 Azure CLI、Visual Studio 或 IntelliJ。 分配给 Azure 中本地开发用户的角色将允许应用在本地连接到 Azure 服务。
3) 配置 Azure 托管环境
将应用程序配置为使用无密码连接并在本地运行后,相同的代码可以在部署到 Azure 后向 Azure 服务进行身份验证。 例如,部署到已分配托管标识的 Azure Spring Apps 实例的应用程序可以连接到 Kafka 的 Azure 事件中心。
在本部分中,你将执行两个步骤,使应用程序能够以无密码方式在 Azure 托管环境中运行:
- 为 Azure 托管环境分配托管标识。
- 将角色分配给托管标识。
注意
Azure 还提供 服务连接器,可帮助将托管服务与事件中心连接。 使用服务连接器配置托管环境时,可以省略将角色分配给托管标识的步骤,因为服务连接器会为你执行此操作。 以下部分介绍如何通过两种方式配置 Azure 托管环境:一种是通过服务连接器,另一种是通过直接配置每个托管环境。
重要
服务连接器的命令需要 Azure CLI 2.41.0 或更高版本。
为 Azure 托管环境分配托管标识
以下步骤演示如何为各种 Web 托管服务分配系统分配的托管标识。 托管标识可以使用之前设置的应用配置安全地连接到其他 Azure 服务。
在Azure App 服务实例的主概述页上,从导航窗格中选择“标识”。
在 “系统分配 ”选项卡上,确保将 “状态 ”字段设置为 “打开”。 系统分配的标识由 Azure 在内部进行管理,并为你处理管理任务。 标识的详细信息和 ID 永远不会在代码中公开。
还可以使用 Azure CLI 在 Azure 托管环境中分配托管标识。
可以使用 az webapp identity assign 命令将托管标识分配给 Azure App 服务 实例,如以下示例所示。
export AZURE_MANAGED_IDENTITY_ID=$(az webapp identity assign \
--resource-group $AZ_RESOURCE_GROUP \
--name <app-service-name> \
--query principalId \
--output tsv)
为托管标识分配角色
接下来,向创建的托管标识授予访问事件中心命名空间的权限。 可以通过向托管标识分配角色来授予权限,就像对本地开发用户所做的那样。
如果使用服务连接器连接服务,则无需完成此步骤。 已为你处理以下必要的配置:
如果在创建连接时选择了托管标识,则会为应用创建系统分配的托管标识,并在事件中心上分配了Azure 事件中心数据发送方和Azure 事件中心数据接收器角色。
如果选择使用连接字符串,则连接字符串已添加为应用环境变量。
测试应用程序
进行这些代码更改后,在浏览器中浏览到托管应用程序。 应用应能够成功连接到 Kafka 的Azure 事件中心。 请记住,角色分配通过 Azure 环境传播最长可能需要五分钟的时间。 应用程序现在配置为在本地和生产环境中运行,开发人员无需管理应用程序本身的机密。
后续步骤
本教程介绍了如何将应用程序迁移到无密码连接。
可以阅读以下资源,更深入地了解本文中讨论的概念: