W tym artykule pokazano, jak używać przyspieszania zapytań do pobierania podzestawu danych z konta magazynu.
Przyspieszanie zapytań umożliwia aplikacjom i strukturom analitycznym znaczne zoptymalizowanie przetwarzania danych przez pobranie tylko danych, których potrzebują do wykonania danej operacji. Aby dowiedzieć się więcej, zobacz Przyspieszanie zapytań usługi Azure Data Lake Storage.
Zainstaluj moduł Az w wersji 4.6.0 lub nowszej.
Install-Module -Name Az -Repository PSGallery -Force
Aby przeprowadzić aktualizację ze starszej wersji modułu Az, uruchom następujące polecenie:
Update-Module -Name Az
Otwórz wiersz polecenia i zmień katalog (cd
) w folderze projektu, na przykład:
cd myProject
Zainstaluj wersję lub nowszą 12.5.0-preview.6
bibliotekę klienta usługi Azure Blob Storage dla pakietu .NET przy użyciu dotnet add package
polecenia .
dotnet add package Azure.Storage.Blobs -v 12.8.0
Przykłady wyświetlane w tym artykule analizują plik CSV przy użyciu biblioteki CsvHelper . Aby użyć tej biblioteki, użyj następującego polecenia.
dotnet add package CsvHelper
Otwórz plik pom.xml projektu w edytorze tekstów. Dodaj następujące elementy zależności do grupy zależności.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Zainstaluj bibliotekę klienta usługi Azure Data Lake Storage dla języka Python przy użyciu narzędzia pip.
pip install azure-storage-blob==12.4.0
Zainstaluj bibliotekę klienta usługi Data Lake dla języka JavaScript, otwierając okno terminalu, a następnie wpisując następujące polecenie.
npm install @azure/storage-blob
npm install @fast-csv/parse
Dodaj te using
instrukcje na początku pliku kodu.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Przyspieszanie zapytań pobiera dane sformatowane w formacie CSV i Json. W związku z tym pamiętaj, aby dodać instrukcje using dla dowolnych bibliotek analizowania plików CSV lub Json, których chcesz użyć. Przykłady wyświetlane w tym artykule analizują plik CSV przy użyciu biblioteki CsvHelper dostępnej w narzędziu NuGet. W związku z tym dodalibyśmy te using
instrukcje na początku pliku kodu.
using CsvHelper;
using CsvHelper.Configuration;
Aby skompilować przykłady przedstawione w tym artykule, należy również dodać te using
instrukcje.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Dodaj te import
instrukcje na początku pliku kodu.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Dodaj te instrukcje importu na początku pliku kodu.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
storage-blob
Dołącz moduł, umieszczając tę instrukcję w górnej części pliku kodu.
const { BlobServiceClient } = require("@azure/storage-blob");
Przyspieszanie zapytań pobiera dane sformatowane w formacie CSV i Json. W związku z tym upewnij się, że dodano instrukcje dla dowolnych modułów analizowania plików CSV lub Json, których chcesz użyć. Przykłady wyświetlane w tym artykule analizują plik CSV przy użyciu modułu fast-csv . W związku z tym dodalibyśmy tę instrukcję na początku pliku kodu.
const csv = require('@fast-csv/parse');
Za pomocą języka SQL można określić predykaty filtru wierszy i projekcje kolumn w żądaniu przyspieszania zapytań. Poniższy kod wysyła zapytanie do pliku CSV w magazynie i zwraca wszystkie wiersze danych, w których trzecia kolumna odpowiada wartości Hemingway, Ernest
.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Metoda BlockBlobClient.QueryAsync
asynchronicka wysyła zapytanie do interfejsu API przyspieszania zapytań, a następnie przesyła strumieniowo wyniki z powrotem do aplikacji jako obiekt usługi Stream .
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Metoda BlockBlobClient.openInputStream()
wysyła zapytanie do interfejsu API przyspieszania zapytań, a następnie przesyła strumieniowo wyniki z powrotem do aplikacji jako InputStream
obiekt, który może być odczytywany jak każdy inny obiekt InputStream.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Ten przykład wysyła zapytanie do interfejsu API przyspieszania zapytań, a następnie przesyła strumieniowo wyniki z powrotem. Obiekt blob
przekazany do funkcji pomocniczej queryHemingway
jest typu BlockBlobClient. Aby dowiedzieć się więcej o sposobie uzyskiwania obiektu BlockBlobClient , zobacz Szybki start: zarządzanie obiektami blob przy użyciu zestawu SDK języka JavaScript w wersji 12 w Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Wyniki można ograniczyć do podzbioru kolumn. W ten sposób pobierasz tylko kolumny potrzebne do wykonania danego obliczenia. Zwiększa to wydajność aplikacji i zmniejsza koszty, ponieważ mniej danych jest przesyłanych za pośrednictwem sieci.
Uwaga
Maksymalna liczba kolumn, do których można ograniczyć zakres wyników, wynosi 49. Jeśli wyniki mają zawierać więcej niż 49 kolumn, użyj symbolu wieloznakowego (*
) dla wyrażenia SELECT (na przykład: SELECT *
).
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Poniższy kod łączy filtrowanie wierszy i projekcje kolumn w tym samym zapytaniu.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}