使用 Java 应用管理 Azure Data Lake Analytics
重要
Azure Data Lake Analytics已于 2024 年 2 月 29 日停用。 通过此公告了解更多信息。
对于数据分析,你的组织可以使用 Azure Synapse Analytics 或 Microsoft Fabric。
本文介绍如何通过使用 Azure Java SDK 编写的应用管理 Azure Data Lake Analytics 帐户、数据源、用户和作业。
先决条件
- Java 开发工具包 (JDK) 8(使用 Java 1.8 版)。
- IntelliJ 或其他合适的 Java 开发环境。 本文档中的说明使用 IntelliJ。
- 创建Microsoft Entra应用程序并检索其客户端 ID、租户 ID 和密钥。 有关Microsoft Entra应用程序以及如何获取客户端 ID 的说明,请参阅使用门户创建 Active Directory 应用程序和服务主体。 创建应用程序并生成密钥后,可从门户获取回复 URI 和密钥。
使用 Microsoft Entra ID 进行身份验证
以下代码片段提供了非交互式身份验证的代码,在这种验证方式中,应用程序提供自己的凭据。
创建 Java 应用程序
- 打开 IntelliJ,使用“命令行应用”模板创建 Java 项目。
- 右键单击屏幕左侧的项目,然后选择“ 添加框架支持”。 选择 “Maven ”,然后选择“ 确定”。
- 打开新创建的 "pom.xml" 文件,并在 </version> tag and the </project> 标记之间添加以下文本段:
<dependencies>
<dependency>
<groupId>com.azure.resourcemanager</groupId>
<artifactId>azure-resourcemanager-datalakeanalytics</artifactId>
<version>1.0.0-beta.1</version>
</dependency>
<dependency>
<groupId>com.azure.resourcemanager</groupId>
<artifactId>azure-resourcemanager-datalakestore</artifactId>
<version>1.0.0-beta.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.7.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
转到“文件”>“设置”>“生成”>“执行”>“部署”。 选择“生成工具”>“Maven”>“导入”。 然后选中“自动导入 Maven 项目”。
打开 Main.java
,将现有代码块替换为以下代码:
import com.azure.core.credential.TokenCredential;
import com.azure.core.management.AzureEnvironment;
import com.azure.core.management.profile.AzureProfile;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.resourcemanager.datalakeanalytics.DataLakeAnalyticsManager;
import com.azure.resourcemanager.datalakeanalytics.models.DataLakeAnalyticsAccount;
import com.azure.resourcemanager.datalakestore.DataLakeStoreManager;
import com.azure.resourcemanager.datalakestore.models.DataLakeStoreAccount;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import com.azure.storage.file.datalake.models.PathAccessControl;
import com.azure.storage.file.datalake.models.PathPermissions;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.UUID;
public class Main {
private static String adlsAccountName;
private static String adlaAccountName;
private static String resourceGroupName;
private static String location;
private static String tenantId;
private static String subscriptionId;
private static String clientId;
private static String clientSecret;
private static String fileSystemName;
private static String localFolderPath;
private static DataLakeAnalyticsManager analyticsManager;
private static DataLakeStoreManager storeManager;
private static DataLakeStoreAccount storeAccount;
private static DataLakeAnalyticsAccount analyticsAccount;
private static DataLakeServiceClient serviceClient;
private static DataLakeFileSystemClient fileSystemClient;
private static DataLakeFileClient fileClient;
public static void main(String[] args) throws Exception {
adlsAccountName = "<DATA-LAKE-STORE-NAME>";
adlaAccountName = "<DATA-LAKE-ANALYTICS-NAME>";
resourceGroupName = "<RESOURCE-GROUP-NAME>";
location = "East US 2";
tenantId = "<TENANT-ID>";
subscriptionId = "<SUBSCRIPTION-ID>";
clientId = "<CLIENT-ID>";
clientSecret = "<CLIENT-SECRET>";
fileSystemName = "<DATALAKE-FILE-SYSTEM-NAME>";
localFolderPath = "C:\\local_path\\";
// ----------------------------------------
// Authenticate
// ----------------------------------------
AzureProfile profile = new AzureProfile(AzureEnvironment.AZURE);
ClientSecretCredential creds = new ClientSecretCredentialBuilder()
.clientId(clientId).tenantId(tenantId).clientSecret(clientSecret)
.authorityHost("https://login.microsoftonline.com/" + tenantId + "/oauth2/token")
.build();
setupClients(creds, profile);
// ----------------------------------------
// List Data Lake Store and Analytics accounts that this app can access
// ----------------------------------------
System.out.println(String.format("All ADL Store accounts that this app can access in subscription %s:", subscriptionId));
storeManager.accounts().list().forEach(acct -> System.out.println(acct.name()));
System.out.println(String.format("All ADL Analytics accounts that this app can access in subscription %s:", subscriptionId));
analyticsManager.accounts().list().forEach(acct -> System.out.println(acct.name()));
waitForNewline("Accounts displayed.", "Creating files.");
// ----------------------------------------
// Create a file in Data Lake Store: input1.csv
// ----------------------------------------
createFile("input1.csv", "123,abc", true);
waitForNewline("File created.", "Submitting a job.");
// ----------------------------------------
// Submit a job to Data Lake Analytics
// ----------------------------------------
String script = "@input = EXTRACT Row1 string, Row2 string FROM \"/input1.csv\" USING Extractors.Csv(); OUTPUT @input TO @\"/output1.csv\" USING Outputters.Csv();";
UUID jobId = submitJobByScript(script, "testJob", creds);
waitForNewline("Job submitted.", "Getting job status.");
// ----------------------------------------
// Download job output from Data Lake Store
// ----------------------------------------
downloadFile("output1.csv", localFolderPath + "output1.csv");
waitForNewline("Job output downloaded.", "Deleting file.");
deleteFile("output1.csv");
waitForNewline("File deleted.", "Done.");
}
public static void setupClients(TokenCredential creds, AzureProfile profile) {
analyticsManager = DataLakeAnalyticsManager.authenticate(creds, profile);
storeManager = DataLakeStoreManager.authenticate(creds, profile);
createAccounts();
serviceClient = new DataLakeServiceClientBuilder().endpoint(storeAccount.endpoint()).credential(creds).buildClient();
fileSystemClient = serviceClient.createFileSystem(fileSystemName);
}
public static void waitForNewline(String reason, String nextAction) {
if (nextAction == null)
nextAction = "";
System.out.println(reason + "\r\nPress ENTER to continue...");
try {
System.in.read();
} catch (Exception e) {
}
if (!nextAction.isEmpty()) {
System.out.println(nextAction);
}
}
// Create accounts
public static void createAccounts() {
// Create ADLS account
storeAccount = storeManager.accounts().define(adlsAccountName)
.withRegion(location)
.withExistingResourceGroup(resourceGroupName)
.create();
analyticsAccount = analyticsManager.accounts().define(adlaAccountName)
.withRegion(location).withExistingResourceGroup(resourceGroupName)
.withDefaultDataLakeStoreAccount(adlsAccountName)
.withDataLakeStoreAccounts(Collections.EMPTY_LIST)
.create();
}
// Create a file
public static void createFile(String path, String contents, boolean force) {
byte[] bytesContents = contents.getBytes();
DataLakeFileClient fileClient = fileSystemClient.createFile(path,force);
PathAccessControl accessControl = fileClient.getAccessControl();
fileClient.setPermissions(PathPermissions.parseOctal("744"), accessControl.getGroup(), accessControl.getOwner());
fileClient.upload(new ByteArrayInputStream(bytesContents), bytesContents.length);
}
// Delete a file
public static void deleteFile(String filePath) {
fileSystemClient.getFileClient(filePath).delete();
}
// Download a file
private static void downloadFile(String srcPath, String destPath) throws IOException {
fileClient = fileSystemClient.getFileClient(srcPath);
OutputStream outputStream = new FileOutputStream(destPath);
fileClient.read(outputStream);
outputStream.close();
}
}
为代码片段中调用的参数提供值:
adlsAccountName
adlaAccountName
resourceGroupName
location
tenantId
subscriptionId
clientId
clientSecret
fileSystemName
localFolderPath
后续步骤
- 若要了解 U-SQL,请参阅 Get started with Azure Data Lake Analytics U-SQL language(Azure Data Lake Analytics U-SQL 语言入门)和 U-SQL language reference(U-SQL 语言参考)。
- 有关管理任务,请参阅使用 Azure 门户管理 Azure Data Lake Analytics。
- 有关 Data Lake Analytics 的概述,请参阅 Azure Data Lake Analytics 概述。