Streamingopname configureren in uw Azure Data Explorer-cluster
Artikel
Streamingopname is handig voor het laden van gegevens wanneer u een lage latentie tussen opname en query nodig hebt. Overweeg het gebruik van streamingopname in de volgende scenario's:
Latentie van minder dan een seconde is vereist.
Het optimaliseren van de operationele verwerking van veel tabellen waarbij de gegevensstroom in elke tabel relatief klein is (een paar records per seconde), maar het totale gegevensopnamevolume hoog is (duizenden records per seconde).
Als de gegevensstroom in elke tabel hoog is (meer dan 4 GB per uur), kunt u overwegen batchopname te gebruiken.
Zie Overzicht van gegevensopname voor meer informatie over verschillende opnamemethoden.
Kies het juiste type streamingopname
Er worden twee typen streamingopname ondersteund:
Opnametype
Beschrijving
Gegevensverbinding
Event Hub-, IoT Hub- en Event Grid-gegevensverbindingen kunnen streamingopname gebruiken, mits deze is ingeschakeld op clusterniveau. De beslissing om streamingopname te gebruiken, wordt uitgevoerd volgens het beleid voor streamingopname dat is geconfigureerd in de doeltabel. Zie Event Hub, IoT Hub en Event Grid voor informatie over het beheren van gegevensverbindingen.
Gebruik de volgende tabel om het opnametype te kiezen dat geschikt is voor uw omgeving:
Criterium
Gegevensverbinding
Aangepaste opname
Gegevensvertraging tussen het starten van de opname en de gegevens die beschikbaar zijn voor query
Langere vertraging
Kortere vertraging
Ontwikkelingsoverhead
Snelle en eenvoudige installatie, geen ontwikkelingsoverhead
Hoge overhead voor ontwikkeling om een toepassing te maken die de gegevens opneemt, fouten afhandelt en gegevensconsistentie garandeert
Notitie
U kunt het proces voor het in- en uitschakelen van streamingopname op uw cluster beheren met behulp van de Azure Portal of programmatisch in C#. Als u C# gebruikt voor uw aangepaste toepassing, vindt u het misschien handiger om de programmatische benadering te gebruiken.
De belangrijkste bijdragers die van invloed kunnen zijn op streamingopname zijn:
VM- en clustergrootte: prestaties van streamingopname en capaciteit worden geschaald met grotere VM- en clustergrootten. Het aantal gelijktijdige opnameaanvragen is beperkt tot zes per kern. Voor 16 kern-SKU's, zoals D14 en L16, is de maximale ondersteunde belasting bijvoorbeeld 96 gelijktijdige opnameaanvragen. Voor twee kern-SKU's, zoals D11, is de maximale ondersteunde belasting 12 gelijktijdige opnameaanvragen.
Limiet voor gegevensgrootte: de limiet voor de gegevensgrootte voor een aanvraag voor streamingopname is 4 MB. Dit omvat alle gegevens die zijn gemaakt voor updatebeleid tijdens de opname.
Schema-updates: Schema-updates, zoals het maken en wijzigen van tabellen en opnametoewijzingen, kunnen maximaal vijf minuten duren voor de service voor streamingopname. Zie Streaming-opname en schemawijzigingen voor meer informatie.
SSD-capaciteit: het inschakelen van streamingopname op een cluster, zelfs wanneer gegevens niet via streaming worden opgenomen, gebruikt een deel van de lokale SSD-schijf van de clustercomputers voor streaming-opnamegegevens en vermindert de opslag die beschikbaar is voor hot cache.
Als u de tabel wilt maken die de gegevens via streaming-opname ontvangt, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
Kopieer een van de volgende opdrachten naar het deelvenster Query en selecteer Uitvoeren. Hiermee definieert u het streaming-opnamebeleid voor de tabel die u hebt gemaakt of in de database die de tabel bevat.
Tip
Een beleid dat is gedefinieerd op databaseniveau, is van toepassing op alle bestaande en toekomstige tabellen in de database. Wanneer u het beleid op databaseniveau inschakelt, hoeft u het niet per tabel in te schakelen.
Als u het beleid wilt definiëren voor de tabel die u hebt gemaakt, gebruikt u:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Een toepassing voor streamingopname maken om gegevens op te nemen in uw cluster
Maak uw toepassing voor het opnemen van gegevens in uw cluster met behulp van uw voorkeurstaal.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Streamingopname uitschakelen in uw cluster
Waarschuwing
Het uitschakelen van streamingopname kan enkele uren duren.
Voordat u streamingopname uitschakelt in uw Azure Data Explorer-cluster, moet u het beleid voor streamingopname uit alle relevante tabellen en databases verwijderen. Het verwijderen van het beleid voor streamingopname activeert een herverdeling van gegevens in uw Azure Data Explorer-cluster. De streamingopnamegegevens worden verplaatst van de eerste opslag naar de permanente opslag in de kolomopslag (gebieden of shards). Dit proces kan enkele seconden tot een paar uur duren, afhankelijk van de hoeveelheid gegevens in de eerste opslag.
Het beleid voor streamingopname verwijderen
U kunt het beleid voor streamingopname verwijderen met behulp van de Azure Portal of programmatisch in C#.
Ga in de Azure Portal naar uw Azure Data Explorer-cluster en selecteer Query.
Als u het beleid voor streamingopname uit de tabel wilt verwijderen, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
.delete table TestTable policy streamingingestion
Selecteer in Instellingende optie Configuraties.
Selecteer in het deelvenster Configuratiesde optie Uit om opname van streaming uit te schakelen.
Selecteer Opslaan.
Voer de volgende code uit om het beleid voor streamingopname uit de tabel te verwijderen:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Voer de volgende code uit om streamingopname op uw cluster uit te schakelen:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Beperkingen
Gegevenstoewijzingen moeten vooraf worden gemaakt voor gebruik in streamingopname. Afzonderlijke aanvragen voor streamingopname zijn niet geschikt voor inlinegegevenstoewijzingen.
Bereiktags kunnen niet worden ingesteld voor de streamingopnamegegevens.
Beleid bijwerken. Het updatebeleid kan alleen verwijzen naar de zojuist opgenomen gegevens in de brontabel en niet naar andere gegevens of tabellen in de database.
Als streamingopname is ingeschakeld op een cluster dat wordt gebruikt als leider voor volgdatabases, moet streamingopname ook worden ingeschakeld op de volgende clusters om streamingopnamegegevens te volgen. Hetzelfde geldt voor het feit of de clustergegevens worden gedeeld via Data Share.