V tomto článku se dozvíte, jak pomocí akcelerace dotazů načíst podmnožinu dat z účtu úložiště.
Akcelerace dotazů umožňuje aplikacím a analytickým architekturám výrazně optimalizovat zpracování dat načtením pouze dat, která vyžadují k provedení dané operace. Další informace najdete v tématu Akcelerace dotazů Azure Data Lake Storage.
Nainstalujte modul Az verze 4.6.0 nebo vyšší.
Install-Module -Name Az -Repository PSGallery -Force
Pokud chcete aktualizovat ze starší verze Az, spusťte následující příkaz:
Update-Module -Name Az
Otevřete příkazový řádek a změňte adresář (cd
) do složky projektu, například:
cd myProject
12.5.0-preview.6
Pomocí příkazu nainstalujte verzi nebo novější klientskou knihovnu služby Azure Blob Storage pro balíček dotnet add package
.NET.
dotnet add package Azure.Storage.Blobs -v 12.8.0
Příklady, které se zobrazí v tomto článku, analyzují soubor CSV pomocí knihovny CsvHelper . Pokud chcete tuto knihovnu použít, použijte následující příkaz.
dotnet add package CsvHelper
Otevřete soubor pom.xml projektu v textovém editoru. Do skupiny závislostí přidejte následující prvky závislostí.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Nainstalujte klientskou knihovnu Azure Data Lake Storage pro Python pomocí nástroje pip.
pip install azure-storage-blob==12.4.0
Nainstalujte klientskou knihovnu Data Lake pro JavaScript otevřením okna terminálu a zadáním následujícího příkazu.
npm install @azure/storage-blob
npm install @fast-csv/parse
Tyto using
příkazy přidejte na začátek souboru kódu.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Akcelerace dotazů načítá data ve formátu CSV a JSON. Proto nezapomeňte přidat příkazy using pro všechny knihovny analýzy CSV nebo Json, které se rozhodnete použít. Příklady, které se zobrazí v tomto článku, analyzují soubor CSV pomocí knihovny CsvHelper , která je k dispozici na NuGetu. Proto bychom tyto using
příkazy přidali na začátek souboru kódu.
using CsvHelper;
using CsvHelper.Configuration;
Abyste mohli zkompilovat příklady uvedené v tomto článku, budete také muset přidat tyto using
příkazy.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Tyto import
příkazy přidejte na začátek souboru kódu.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Tyto příkazy importu přidejte do horní části souboru kódu.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
storage-blob
Zahrňte modul tak, že tento příkaz umístíte do horní části souboru kódu.
const { BlobServiceClient } = require("@azure/storage-blob");
Akcelerace dotazů načítá data ve formátu CSV a JSON. Proto nezapomeňte přidat příkazy pro všechny moduly analýzy CSV nebo Json, které se rozhodnete použít. Příklady, které se zobrazí v tomto článku, analyzují soubor CSV pomocí modulu fast-csv . Proto bychom tento příkaz přidali na začátek souboru kódu.
const csv = require('@fast-csv/parse');
Sql můžete použít k určení predikátů filtru řádků a projekce sloupců v požadavku na zrychlení dotazu. Následující kód se dotazuje na soubor CSV v úložišti a vrátí všechny řádky dat, ve kterých třetí sloupec odpovídá hodnotě Hemingway, Ernest
.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Asynchronní metoda BlockBlobClient.QueryAsync
odešle dotaz do rozhraní API zrychlení dotazu a pak výsledky streamuje zpět do aplikace jako objekt Stream .
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Metoda BlockBlobClient.openInputStream()
odešle dotaz do rozhraní API zrychlení dotazu a pak streamuje výsledky zpět do aplikace jako InputStream
objekt, který lze číst jako jakýkoli jiný inputStream objekt.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Tento příklad odešle dotaz do rozhraní API zrychlení dotazu a pak výsledky streamuje zpět. Objekt blob
předaný queryHemingway
do pomocné funkce je typu BlockBlobClient. Další informace o získání objektu BlockBlobClient najdete v tématu Rychlý start: Správa objektů blob pomocí sady JavaScript SDK v12 v Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Výsledky můžete vymezit na podmnožinu sloupců. Tímto způsobem načtete pouze sloupce potřebné k provedení daného výpočtu. Tím se zlepší výkon aplikace a sníží se náklady, protože se přes síť přenáší méně dat.
Poznámka:
Maximální počet sloupců, na které můžete výsledky nastavit, je 49. Pokud potřebujete, aby výsledky obsahovaly více než 49 sloupců, použijte zástupný znak (*
) pro výraz SELECT (například: SELECT *
).
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Následující kód kombinuje filtrování řádků a projekce sloupců do stejného dotazu.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}