Filtern von Daten mithilfe der Abfragebeschleunigung für Azure Data Lake Storage
Artikel
In diesem Artikel wird gezeigt, wie Sie mithilfe der Abfragebeschleunigung eine Teilmenge von Daten aus Ihrem Speicherkonto abrufen.
Die Abfragebeschleunigung ermöglicht es Anwendungen und Analyseframeworks, die Datenverarbeitung drastisch zu optimieren. Dabei werden nur die Daten abgerufen, die für die Durchführung eines bestimmten Vorgangs erforderlich sind. Weitere Informationen finden Sie unter Abfragebeschleunigung für Azure Data Lake Storage.
Voraussetzungen
Sie benötigen ein Azure-Abonnement, um auf Azure Storage zuzugreifen. Wenn Sie noch kein Abonnement haben, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
In diesem Artikel wird davon ausgegangen, dass Sie ein Java-Projekt mithilfe von Apache Maven erstellt haben. Ein Beispiel zum Erstellen eines Projekts mithilfe von Apache Maven finden Sie unter Einrichten.
Installieren Sie das Az-Modul, Version 4.6.0 oder höher.
Install-Module -Name Az -Repository PSGallery -Force
Führen Sie zum Aktualisieren von einer älteren Version von Az den folgenden Befehl aus:
Update-Module -Name Az
Öffnen Sie eine Eingabeaufforderung, und wechseln Sie mit dem Befehl cd in Ihren Projektordner. Beispiel:
cd myProject
Installieren Sie Version 12.5.0-preview.6 oder höher der Azure Blob Storage-Clientbibliothek für das .NET-Paket mit dem Befehl dotnet add package.
dotnet add package Azure.Storage.Blobs -v 12.8.0
In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe der Bibliothek CsvHelper analysiert. Führen Sie den folgenden Befehl aus, um diese Bibliothek zu verwenden.
dotnet add package CsvHelper
Öffnen Sie die Datei pom.xml des Projekts in einem Text-Editor. Fügen Sie der Gruppe der Abhängigkeiten das folgende Abhängigkeitselement hinzu:
Fügen Sie die folgenden using-Anweisungen am Anfang Ihrer Codedatei hinzu.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Bei der Abfragebeschleunigung werden CSV- und JSON-formatierte Daten abgerufen. Stellen Sie deshalb sicher, dass Sie die using-Anweisungen für alle CSV- oder JSON-Analysebibliotheken hinzufügen, die Sie verwenden möchten. In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe der Bibliothek CsvHelper analysiert, die auf NuGet verfügbar ist. Daher werden die folgenden using-Anweisungen am Anfang der Codedatei hinzugefügt.
using CsvHelper;
using CsvHelper.Configuration;
Zum Kompilieren der Beispiele, die in diesem Artikel gezeigt werden, müssen Sie auch die folgenden using-Anweisungen hinzufügen.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Fügen Sie die folgenden import-Anweisungen am Anfang Ihrer Codedatei hinzu.
Bei der Abfragebeschleunigung werden CSV- und JSON-formatierte Daten abgerufen. Stellen Sie deshalb sicher, dass Sie Anweisungen für alle CSV- oder JSON-Analysemodule hinzufügen, die Sie verwenden möchten. In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe des Moduls fast-csv analysiert. Daher wird diese Anweisung am Anfang der Codedatei hinzugefügt.
const csv = require('@fast-csv/parse');
Abrufen von Daten mithilfe eines Filters
Sie können SQL verwenden, um die Zeilenfilterprädikate und Spaltenprojektionen in einer Abfragebeschleunigungsanforderung anzugeben. Der folgende Code fragt eine CSV-Datei im Speicher ab und gibt alle Datenzeilen zurück, bei denen die dritte Spalte mit dem Wert Hemingway, Ernest übereinstimmt.
In der SQL-Abfrage wird das Schlüsselwort BlobStorage verwendet, um die abzufragende Datei anzugeben.
Spaltenverweise werden als _N angegeben, wobei die erste Spalte _1 ist. Wenn die Quelldatei eine Kopfzeile enthält, können Sie mit dem Namen, der in der Kopfzeile angegeben ist, auf Spalten verweisen.
Die asynchrone Methode BlockBlobClient.QueryAsync sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als Stream-Objekt an die Anwendung zurück.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Die Methode BlockBlobClient.openInputStream() sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als InputStream-Objekt, das wie jedes andere InputStream-Objekt gelesen werden kann, an die Anwendung zurück.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Sie können die Ergebnisse auf eine Teilmenge der Spalten beschränken. Auf diese Weise rufen Sie nur die Spalten ab, die zum Ausführen einer bestimmten Berechnung benötigt werden. Dadurch werden die Anwendungsleistung verbessert und Kosten reduziert, da weniger Daten über das Netzwerk übertragen werden.
Hinweis
Die maximale Anzahl von Spalten, auf die Sie ihre Ergebnisse beschränken können, ist 49. Wenn Ihre Ergebnisse mehr als 49 Spalten enthalten müssen, verwenden Sie für den SELECT-Ausdruck ein Platzhalterzeichen (*) (Beispiel: SELECT *).
Dieser Code ruft nur die Spalte BibNum für alle Bücher im Dataset ab. Außerdem werden die Informationen aus der Kopfzeile in der Quelldatei verwendet, um auf Spalten in der Abfrage zu verweisen.