Фильтрация данных с помощью ускорения запросов Azure Data Lake Storage
Статья
В этой статье показано, как использовать ускорение запросов для получения подмножества данных из учетной записи хранения.
Ускорение запросов позволяет приложениям и платформам аналитики существенно оптимизировать обработку данных, получая только те данные, которые необходимы для выполнения указанной операции. Дополнительные сведения см. в статье Ускорение запросов Azure Data Lake Storage.
Необходимые компоненты
Для доступа к службе хранилища Azure требуется подписка Azure. Если у вас еще нет подписки, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
В контексте этой статьи предполагается, что вы создали проект Java с помощью Apache Maven. Пример создания проекта с помощью Apache Maven см. в разделе Настройка.
Установите модуль Az версии 4.6.0 или более поздней.
Install-Module -Name Az -Repository PSGallery -Force
Чтобы обновить более старую версию Az, выполните следующую команду:
Update-Module -Name Az
Откройте командную строку и смените каталог (cd) в папке проекта. Например:
cd myProject
Установите версию 12.5.0-preview.6 (или более позднюю) клиентской библиотеки хранилища BLOB-объектов Azure для пакета .NET с помощью команды dotnet add package.
dotnet add package Azure.Storage.Blobs -v 12.8.0
Примеры, представленные в этой статье, посвящены синтаксическому анализу CSV-файла с помощью библиотеки CsvHelper. Чтобы использовать эту библиотеку, используйте следующую команду:
dotnet add package CsvHelper
Откройте файл pom.xml своего проекта в текстовом редакторе. Добавьте приведенные ниже элементы зависимости в группу зависимостей.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Функция ускорения запросов извлекает данные в формате CSV и Json. Таким образом, следует обязательно добавить операторы using для любых библиотек синтаксического анализа CSV или Json, которые вы решите использовать. Примеры, приведенные в этой статье, иллюстрируют синтаксический анализ CSV-файла с помощью библиотеки CsvHelper, доступной в NuGet. Поэтому мы бы добавили эти операторы using в начало файла кода.
using CsvHelper;
using CsvHelper.Configuration;
Чтобы скомпилировать примеры, представленные в этой статье, вам также необходимо добавить эти операторы using.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Добавьте эти операторы import в начало файла кода.
Функция ускорения запросов извлекает данные в формате CSV и Json. Поэтому не забудьте добавить операторы для любых модулей синтаксического анализа CSV или Json, которые вы решите использовать. Примеры, приведенные в этой статье, иллюстрируют синтаксический анализ CSV-файла с помощью модуля fast-csv. Поэтому мы бы добавили этот оператор в начало файла кода.
const csv = require('@fast-csv/parse');
Получение данных с помощью фильтра
Вы можете задавать предикаты фильтра строк и проекции столбцов в запросе ускорения запросов с помощью SQL. Приведенный ниже код запрашивает CSV-файл в хранилище и возвращает все строки данных, в которых третий столбец соответствует значению Hemingway, Ernest.
В SQL-запросе ключевое слово BlobStorage используется для обозначения запрашиваемого файла.
Ссылки на столбцы указываются как _N, где первый столбец — _1. Если исходный файл содержит строку заголовков, то вы можете ссылаться на столбцы по имени, которое указано в ней.
Асинхронный метод BlockBlobClient.QueryAsync отправляет запрос в API ускорения запросов, а затем передает результаты обратно в приложение в виде объекта Stream.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Метод BlockBlobClient.openInputStream() отправляет запрос в API ускорения запросов, а затем передает результаты обратно в приложение в виде объекта InputStream, который можно читать, как любой другой объект InputStream.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Вы можете задать в качестве области для результатов подмножество столбцов. Таким образом, вы получите только те столбцы, которые необходимы для выполнения конкретного расчета. Это улучшает производительность приложения и снижает затраты, поскольку по сети передается меньше данных.
Примечание.
Максимальное количество столбцов, для которых можно получить результаты, равно 49. Если вам нужно, чтобы результаты содержали более 49 столбцов, используйте подстановочный знак (*) для выражения SELECT (например: SELECT *).
Этот код извлекает только столбец BibNum для всех книг в наборе данных. Он также использует информацию из строки заголовков в исходном файле для ссылки на столбцы в запросе.