Az モジュール バージョン 4.6.0 以降をインストールします。
Install-Module -Name Az -Repository PSGallery -Force
古いバージョンの Az から更新するには、次のコマンドを実行します。
Update-Module -Name Az
コマンド プロンプトを開き、ディレクトリ (cd
) をプロジェクト フォルダーに変更します。次に例を示します。
cd myProject
dotnet add package
コマンドを使用して、.NET パッケージ用 Azure Blob Storage クライアント ライブラリの 12.5.0-preview.6
バージョン以降をインストールします。
dotnet add package Azure.Storage.Blobs -v 12.8.0
この記事に示されている例では、CsvHelper ライブラリを使用して CSV ファイルが解析されています。 そのライブラリを使用するには、次のコマンドを使用します。
dotnet add package CsvHelper
テキスト エディターでプロジェクトの pom.xml ファイルを開きます。 依存関係のグループに、次の dependency 要素を追加します。
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Python 向けの Azure Data Lake Storage クライアント ライブラリを、pip を使用してインストールします。
pip install azure-storage-blob==12.4.0
ターミナル ウィンドウを開いてから次のコマンドを入力して、JavaScript 用の Data Lake クライアント ライブラリをインストールします。
npm install @azure/storage-blob
npm install @fast-csv/parse
次の using
ステートメントをコード ファイルの先頭に追加します。
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
クエリ アクセラレーションでは、CSV および JSON 形式のデータが取得されます。 そのため、使用する CSV または JSON 解析ライブラリに using ステートメントを必ず追加してください。 この記事に記載されている例では、NuGet で利用できる CsvHelper ライブラリを使用して CSV ファイルを解析します。 そのため、以下の using
ステートメントをコード ファイルの先頭に追加します。
using CsvHelper;
using CsvHelper.Configuration;
この記事に記載されている例をコンパイルするには、以下の using
ステートメントも追加する必要があります。
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
次の import
ステートメントをコード ファイルの先頭に追加します。
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
次の import ステートメントを、コード ファイルの先頭に追加します。
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
このステートメントをコード ファイルの先頭に配置して、storage-blob
モジュールを追加します。
const { BlobServiceClient } = require("@azure/storage-blob");
クエリ アクセラレーションでは、CSV および JSON 形式のデータが取得されます。 そのため、使用する CSV または JSON 解析モジュールにステートメントを必ず追加してください。 この記事に示されている例では、fast-csv モジュールを使用して CSV ファイルが解析されています。 そのため、このステートメントをコード ファイルの先頭に追加しました。
const csv = require('@fast-csv/parse');
SQL を使用して、クエリ アクセラレーション要求で行フィルター述語と列のプロジェクションを指定できます。 次のコードでは、ストレージ内の CSV ファイルに対してクエリを行い、3番目の列が Hemingway, Ernest
値と一致するすべてのデータの行を返します。
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
非同期メソッド BlockBlobClient.QueryAsync
は、クエリをクエリ アクセラレーション API に送信し、その結果を Stream オブジェクトとしてアプリケーションにストリーミングで戻します。
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
メソッド BlockBlobClient.openInputStream()
は、クエリをクエリ アクセラレーション API に送信し、その結果を、他の InputStream オブジェクトと同様に読み取ることができる InputStream
オブジェクトとしてアプリケーションにストリーミングで戻します。
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
この例では、クエリがクエリ アクセラレーション API に送信され、結果が戻されています。 queryHemingway
ヘルパー関数に渡される blob
オブジェクトは BlockBlobClient 型です。 BlockBlobClient オブジェクトを取得する方法の詳細については、「クイック スタート:Node.js の JavaScript v12 SDK を使用して BLOB を管理する」を参照してください。
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
結果の範囲を列のサブセットに限定することができます。 このようにして、特定の計算を実行するために必要な列のみを取得します。 これにより、ネットワーク経由で転送されるデータが少なくなるため、アプリケーションのパフォーマンスが向上し、コストが削減されます。
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}