Konfigurowanie pozyskiwania danych za pośrednictwem przesyłania strumieniowego w klastrze usługi Azure Data Explorer
Artykuł
Pozyskiwanie przesyłania strumieniowego jest przydatne do ładowania danych, gdy potrzebujesz małego opóźnienia między pozyskiwaniem i wykonywaniem zapytań. Rozważ użycie pozyskiwania przesyłania strumieniowego w następujących scenariuszach:
Wymagane jest opóźnienie mniejsze niż sekundę.
Aby zoptymalizować przetwarzanie operacyjne wielu tabel, w których strumień danych do każdej tabeli jest stosunkowo mały (kilka rekordów na sekundę), ale ogólny wolumin pozyskiwania danych jest wysoki (tysiące rekordów na sekundę).
Jeśli strumień danych do każdej tabeli jest wysoki (ponad 4 GB na godzinę), rozważ użycie pozyskiwania wsadowego.
Wybierz odpowiedni typ pozyskiwania przesyłania strumieniowego
Obsługiwane są dwa typy pozyskiwania przesyłania strumieniowego:
Typ pozyskiwania
Opis
Połączenie danych
Połączenia danych centrum zdarzeń, IoT Hub i usługi Event Grid mogą korzystać z pozyskiwania strumieniowego, pod warunkiem że jest ona włączona na poziomie klastra. Decyzja o korzystaniu z pozyskiwania przesyłania strumieniowego jest wykonywana zgodnie z zasadami pozyskiwania przesyłania strumieniowego skonfigurowanymi w tabeli docelowej. Aby uzyskać informacje na temat zarządzania połączeniami danych, zobacz Event Hub, IoT Hub i Event Grid.
Poniższa tabela ułatwia wybór typu pozyskiwania, który jest odpowiedni dla danego środowiska:
Kryterium
Połączenie danych
Niestandardowe pozyskiwanie
Opóźnienie danych między inicjowaniem pozyskiwania a danymi dostępnymi dla zapytania
Dłuższe opóźnienie
Krótsze opóźnienie
Nakład pracy nad programowaniem
Szybka i łatwa konfiguracja, bez obciążeń programistycznych
Wysokie obciążenie programistyczne w celu utworzenia aplikacji pozyskiwania danych, obsługi błędów i zapewnienia spójności danych
Uwaga
Możesz zarządzać procesem włączania i wyłączania pozyskiwania przesyłania strumieniowego w klastrze przy użyciu Azure Portal lub programowo w języku C#. Jeśli używasz języka C# dla aplikacji niestandardowej, możesz znaleźć go bardziej wygodne przy użyciu podejścia programowego.
Głównymi współautorami, którzy mogą mieć wpływ na pozyskiwanie przesyłania strumieniowego, są:
Rozmiar maszyny wirtualnej i klastra: wydajność pozyskiwania przesyłania strumieniowego i wydajność są skalowane wraz ze zwiększonymi rozmiarami maszyn wirtualnych i klastrów. Liczba współbieżnych żądań pozyskiwania jest ograniczona do sześciu na rdzeń. Na przykład w przypadku 16 podstawowych jednostek SKU, takich jak D14 i L16, maksymalne obsługiwane obciążenie wynosi 96 równoczesnych żądań pozyskiwania. W przypadku dwóch podstawowych jednostek SKU, takich jak D11, maksymalne obsługiwane obciążenie to 12 współbieżnych żądań pozyskiwania.
Limit rozmiaru danych: limit rozmiaru danych dla żądania pozyskiwania przesyłania strumieniowego wynosi 4 MB. Obejmuje to wszystkie dane utworzone na potrzeby zasad aktualizacji podczas pozyskiwania.
Aktualizacje schematu: aktualizacje schematu, takie jak tworzenie i modyfikowanie tabel i mapowań pozyskiwania, może potrwać do pięciu minut w przypadku usługi pozyskiwania przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Przesyłanie strumieniowe pozyskiwania i zmiany schematu.
Pojemność ssd: włączenie pozyskiwania przesyłania strumieniowego w klastrze, nawet jeśli dane nie są pozyskiwane za pośrednictwem przesyłania strumieniowego, używa części lokalnego dysku SSD maszyn klastra do przesyłania strumieniowego danych pozyskiwania i zmniejsza ilość miejsca dostępnego dla gorącej pamięci podręcznej.
Włączanie pozyskiwania przesyłania strumieniowego w klastrze
W Azure Portal przejdź do klastra usługi Azure Data Explorer.
W obszarze Ustawienia wybierz pozycję Konfiguracje.
W okienku Konfiguracje wybierz pozycję Włączone , aby włączyć pozyskiwanie przesyłania strumieniowego.
Wybierz pozycję Zapisz.
Pozyskiwanie przesyłania strumieniowego można włączyć podczas tworzenia nowego klastra usługi Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Tworzenie tabeli docelowej i definiowanie zasad
Utwórz tabelę, aby odbierać dane pozyskiwania przesyłania strumieniowego i definiować powiązane z nimi zasady przy użyciu Azure Portal lub programowo w języku C#.
Aby utworzyć tabelę, która będzie odbierać dane za pośrednictwem pozyskiwania przesyłania strumieniowego, skopiuj następujące polecenie do okienka Zapytanie i wybierz pozycję Uruchom.
Skopiuj jedno z następujących poleceń do okienka Zapytanie i wybierz pozycję Uruchom. Definiuje zasady pozyskiwania przesyłania strumieniowego w utworzonej tabeli lub w bazie danych zawierającej tabelę.
Porada
Zasady zdefiniowane na poziomie bazy danych mają zastosowanie do wszystkich istniejących i przyszłych tabel w bazie danych. Po włączeniu zasad na poziomie bazy danych nie ma potrzeby włączania jej dla każdej tabeli.
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Tworzenie aplikacji pozyskiwania przesyłania strumieniowego w celu pozyskiwania danych do klastra
Utwórz aplikację na potrzeby pozyskiwania danych do klastra przy użyciu preferowanego języka.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Wyłączanie pozyskiwania przesyłania strumieniowego w klastrze
Ostrzeżenie
Wyłączenie pozyskiwania przesyłania strumieniowego może potrwać kilka godzin.
Przed wyłączeniem pozyskiwania przesyłania strumieniowego w klastrze usługi Azure Data Explorer usuń zasady pozyskiwania przesyłania strumieniowego ze wszystkich odpowiednich tabel i baz danych. Usunięcie zasad pozyskiwania przesyłania strumieniowego powoduje ponowne rozmieszczenie danych w klastrze usługi Azure Data Explorer. Dane pozyskiwania przesyłania strumieniowego są przenoszone z początkowego magazynu do magazynu trwałego w magazynie kolumn (zakresy lub fragmenty). Ten proces może potrwać od kilku sekund do kilku godzin, w zależności od ilości danych w początkowym magazynie.
Usuwanie zasad pozyskiwania przesyłania strumieniowego
Zasady pozyskiwania przesyłania strumieniowego można usunąć przy użyciu Azure Portal lub programowo w języku C#.
W Azure Portal przejdź do klastra usługi Azure Data Explorer i wybierz pozycję Zapytanie.
Aby usunąć zasady pozyskiwania przesyłania strumieniowego z tabeli, skopiuj następujące polecenie do okienka Zapytanie i wybierz pozycję Uruchom.
.delete table TestTable policy streamingingestion
W obszarze Ustawienia wybierz pozycję Konfiguracje.
W okienku Konfiguracje wybierz pozycję Wyłączone , aby wyłączyć pozyskiwanie przesyłania strumieniowego.
Wybierz pozycję Zapisz.
Aby usunąć zasady pozyskiwania przesyłania strumieniowego z tabeli, uruchom następujący kod:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Aby wyłączyć pozyskiwanie przesyłania strumieniowego w klastrze, uruchom następujący kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Ograniczenia
Mapowania danych muszą być wstępnie utworzone do użycia w pozyskiwaniu przesyłania strumieniowego. Pojedyncze żądania pozyskiwania przesyłania strumieniowego nie pasują do wbudowanych mapowań danych.
Nie można ustawić tagów zakresu na danych pozyskiwania przesyłania strumieniowego.
Zaktualizuj zasady. Zasady aktualizacji mogą odwoływać się tylko do nowo pozyskanych danych w tabeli źródłowej, a nie do żadnych innych danych lub tabel w bazie danych.
Jeśli pozyskiwanie przesyłania strumieniowego jest włączone w klastrze używanym jako lider w przypadku baz danych obserwowanych, pozyskiwanie przesyłania strumieniowego musi być włączone w następujących klastrach, a także śledzić dane pozyskiwania przesyłania strumieniowego. To samo dotyczy tego, czy dane klastra są udostępniane za pośrednictwem Data Share.