Streamingopname configureren in uw Azure Data Explorer-cluster
Artikel
Streamingopname is handig voor het laden van gegevens wanneer u een lage latentie tussen opname en query nodig hebt. Overweeg het gebruik van streaming-opname in de volgende scenario's:
Latentie van minder dan een seconde is vereist.
Voor het optimaliseren van de operationele verwerking van veel tabellen waarbij de gegevensstroom in elke tabel relatief klein is (een paar records per seconde), maar het totale gegevensopnamevolume hoog is (duizenden records per seconde).
Als de gegevensstroom in elke tabel hoog is (meer dan 4 GB per uur), kunt u overwegen opname in de wachtrij te gebruiken.
Zie Overzicht van gegevensopname voor meer informatie over verschillende opnamemethoden.
Zie het gearchiveerde artikel voor codevoorbeelden op basis van eerdere SDK-versies.
Kies het juiste type streamingopname
Er worden twee typen streamingopname ondersteund:
Opnametype
Description
Gegevensverbinding
Event Hubs-, IoT Hub- en Event Grid-gegevensverbindingen kunnen streamingopname gebruiken, mits deze is ingeschakeld op clusterniveau. De beslissing om streamingopname te gebruiken, wordt uitgevoerd volgens het streamingopnamebeleid dat is geconfigureerd voor de doeltabel. Zie Event Hub, IoT Hub en Event Grid voor meer informatie over het beheren van gegevensverbindingen.
Gebruik de volgende tabel om het opnametype te kiezen dat geschikt is voor uw omgeving:
Criterium
Gegevensverbinding
Aangepaste opname
Gegevensvertraging tussen het initiëren van de opname en de gegevens die beschikbaar zijn voor query's
Langere vertraging
Kortere vertraging
Overhead voor ontwikkeling
Snelle en eenvoudige installatie, geen ontwikkelingsoverhead
Hoge overhead voor ontwikkeling om een toepassing te maken die de gegevens opneemt, fouten afhandelt en gegevensconsistentie garandeert
Notitie
U kunt het proces voor het in- en uitschakelen van streamingopname op uw cluster beheren met behulp van de Azure Portal of programmatisch in C#. Als u C# gebruikt voor uw aangepaste toepassing, vindt u het misschien handiger om de programmatische benadering te gebruiken.
De belangrijkste bijdragers die van invloed kunnen zijn op streamingopname zijn:
VM- en clustergrootte: streaming-opnameprestaties en capaciteitsschalen met verhoogde VM- en clustergrootten. Het aantal gelijktijdige opnameaanvragen is beperkt tot zes per kern. Voor 16 kern-SKU's, zoals D14 en L16, is de maximaal ondersteunde belasting bijvoorbeeld 96 gelijktijdige opnameaanvragen. Voor twee kern-SKU's, zoals D11, is de maximaal ondersteunde belasting 12 gelijktijdige opnameaanvragen.
Limiet voor gegevensgrootte: de gegevensgroottelimiet voor een aanvraag voor streamingopname is 4 MB. Dit omvat alle gegevens die zijn gemaakt voor updatebeleid tijdens de opname.
Schema-updates: Schema-updates, zoals het maken en wijzigen van tabellen en opnametoewijzingen, kunnen maximaal vijf minuten duren voordat de streaming-opnameservice wordt uitgevoerd. Zie Streamingopname en schemawijzigingen voor meer informatie.
SSD-capaciteit: het inschakelen van streamingopname op een cluster, zelfs wanneer gegevens niet worden opgenomen via streaming, gebruikt een deel van de lokale SSD-schijf van de clustercomputers voor het streamen van opnamegegevens en vermindert de opslag die beschikbaar is voor hot cache.
Voer de volgende code uit om streamingopname in te schakelen tijdens het maken van een nieuw Azure Data Explorer-cluster:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Streamingopname inschakelen op een bestaand cluster
Als u een bestaand cluster hebt, kunt u streamingopname inschakelen met behulp van de Azure Portal of programmatisch in C#.
Ga in de Azure Portal naar uw Azure Data Explorer-cluster.
Selecteer in Instellingende optie Configuraties.
Selecteer in het deelvenster Configuratiesde optie Aan om streamingopname in te schakelen.
Selecteer Opslaan.
U kunt streamingopname inschakelen tijdens het bijwerken van een bestaand Azure Data Explorer-cluster.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Een doeltabel maken en het beleid definiëren
Maak een tabel om de streamingopnamegegevens te ontvangen en definieer het bijbehorende beleid met behulp van de Azure Portal of programmatisch in C#.
Als u de tabel wilt maken die de gegevens ontvangt via streamingopname, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
Kopieer een van de volgende opdrachten naar het deelvenster Query en selecteer Uitvoeren. Hiermee definieert u het streaming-opnamebeleid voor de tabel die u hebt gemaakt of in de database die de tabel bevat.
Tip
Een beleid dat is gedefinieerd op databaseniveau, is van toepassing op alle bestaande en toekomstige tabellen in de database. Wanneer u het beleid op databaseniveau inschakelt, hoeft u het niet per tabel in te schakelen.
Als u het beleid wilt definiëren voor de tabel die u hebt gemaakt, gebruikt u:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Streamingopname op uw cluster uitschakelen
Waarschuwing
Het uitschakelen van streamingopname kan enkele uren duren.
Voordat u streamingopname uitschakelt in uw Azure Data Explorer-cluster, moet u het beleid voor streamingopname uit alle relevante tabellen en databases verwijderen. Het verwijderen van het streaming-opnamebeleid activeert een herindeling van gegevens in uw Azure Data Explorer-cluster. De streamingopnamegegevens worden verplaatst van de eerste opslag naar de permanente opslag in de kolomopslag (gebieden of shards). Dit proces kan enkele seconden tot een paar uur duren, afhankelijk van de hoeveelheid gegevens in de eerste opslag.
Het beleid voor streamingopname verwijderen
U kunt het beleid voor streamingopname verwijderen met behulp van de Azure Portal of programmatisch in C#.
Ga in de Azure Portal naar uw Azure Data Explorer-cluster en selecteer Query.
Als u het beleid voor streamingopname uit de tabel wilt verwijderen, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
.delete table TestTable policy streamingingestion
Selecteer in Instellingende optie Configuraties.
Selecteer in het deelvenster Configuratiesde optie Uit om opname van streaming uit te schakelen.
Selecteer Opslaan.
Voer de volgende code uit om het beleid voor streamingopname uit de tabel te verwijderen:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Voer de volgende code uit om streamingopname op uw cluster uit te schakelen:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Beperkingen
Gegevenstoewijzingen moeten vooraf worden gemaakt voor gebruik in streamingopname. Afzonderlijke aanvragen voor streamingopname zijn niet geschikt voor inlinegegevenstoewijzingen.
Bereiktags kunnen niet worden ingesteld voor de streamingopnamegegevens.
Beleid bijwerken. Het updatebeleid kan alleen verwijzen naar de nieuw opgenomen gegevens in de brontabel en niet naar andere gegevens of tabellen in de database.
Als streamingopname is ingeschakeld op een cluster dat wordt gebruikt als leider voor volgdatabases, moet streamingopname ook worden ingeschakeld op de volgende clusters om streamingopnamegegevens te volgen. Hetzelfde geldt voor het feit of de clustergegevens worden gedeeld via Data Share.