Konfigurera direktuppspelningsinmatning i Azure Data Explorer-klustret
Artikel
Direktuppspelningsinmatning är användbart för att läsa in data när du behöver låg svarstid mellan inmatning och fråga. Överväg att använda strömmande inmatning i följande scenarier:
Svarstid på mindre än en sekund krävs.
För att optimera driftbearbetningen av många tabeller där dataströmmen till varje tabell är relativt liten (några poster per sekund), men den totala datainmatningsvolymen är hög (tusentals poster per sekund).
Om dataströmmen i varje tabell är hög (över 4 GB per timme) bör du överväga att använda batchinmatning.
Event Hub-, IoT Hub- och Event Grid-dataanslutningar kan använda strömmande inmatning, förutsatt att den är aktiverad på klusternivå. Beslutet att använda strömmande inmatning görs enligt den princip för inmatning av direktuppspelning som konfigurerats i måltabellen. Information om hur du hanterar dataanslutningar finns i Event Hub, IoT Hub och Event Grid.
Anpassad inmatning
Anpassad inmatning kräver att du skriver ett program som använder ett av Azure Data Explorer klientbibliotek. Använd informationen i det här avsnittet för att konfigurera anpassad inmatning. Du kanske också tycker att exempelprogrammet för C#-strömmande inmatning är användbart.
Använd följande tabell för att välja den inmatningstyp som är lämplig för din miljö:
Kriterium
Dataanslutning
Anpassad inmatning
Datafördröjning mellan inmatningsinitiering och tillgängliga data för fråga
Längre fördröjning
Kortare fördröjning
Omkostnader för utveckling
Snabb och enkel installation, inga utvecklingskostnader
Höga utvecklingskostnader för att skapa ett program som matar in data, hanterar fel och säkerställer datakonsekvens
Anmärkning
Du kan hantera processen för att aktivera och inaktivera strömmande inmatning i klustret via Azure-portalen eller programmatiskt i C#. Om du använder C# för din anpassade programkan det vara enklare att använda den programmatiska metoden.
De viktigaste bidragsgivarna som kan påverka strömmande inmatning är:
VM- och klusterstorlek: Prestanda för strömmande inmatning och kapacitet skalas med ökade storlekar på virtuella maskiner och kluster. Antalet samtidiga inmatningsbegäranden är begränsat till sex per kärna. För SKU:er med 16 kärnor, till exempel D14 och L16, är den maximala belastningen som stöds 96 samtidiga inmatningsbegäranden. För två kärn-SKU:er, till exempel D11, är den maximala belastningen som stöds 12 samtidiga inmatningsbegäranden.
Datastorleksgräns: Datastorleksgränsen för en begäran om strömmande inmatning är 4 MB. Detta inkluderar alla data som skapats genom uppdateringspolicy under inmatningen.
Schemauppdateringar: Schemauppdateringar, till exempel skapande och ändring av tabeller och inmatningsmappningar, kan ta upp till fem minuter för tjänsten för strömmande inmatning. För mer information, se strömningsinmatning och schemaändringar.
SSD-kapacitet: Aktivera strömmande inmatning på ett kluster, även om data inte matas in via strömning, använder en del av den lokala SSD-disken på klusterdatorerna för strömmande inmatningsdata och minskar lagringen som är tillgänglig för frekvent cache.
I Azure-portalen går du till ditt Azure Data Explorer-kluster.
I Inställningarväljer du Konfigurationer.
I fönstret Konfigurationer väljer du På för att aktivera direktuppspelningsinmatning.
Välj Spara.
Du kan aktivera strömmande inmatning när du skapar ett nytt Azure Data Explorer-kluster.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Skapa en måltabell och definiera principen
Skapa en tabell för att ta emot data för strömmande inmatning och definiera dess relaterade princip med hjälp av Azure-portalen eller programmatiskt i C#.
Kopiera något av följande kommandon i Query-fönstret och klicka på Kör. Detta definierar princip för strömmande inmatning i tabellen som du skapade eller i databasen som innehåller tabellen.
Tips
En princip som definieras på databasnivå gäller för alla befintliga och framtida tabeller i databasen. När du aktiverar principen på databasnivå behöver du inte aktivera den per tabell.
Om du vill definiera principen i tabellen du skapade använder du:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Skapa ett program för direktuppspelningsinmatning för att mata in data till klustret
Skapa ditt program för att mata in data till klustret med det språk du föredrar.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Inaktivera inmatning av direktuppspelning på ditt kluster
Varning
Det kan ta några timmar att inaktivera strömmande inmatning.
Innan du inaktiverar direktuppspelningsinmatning i Azure Data Explorer-klustret släpper du policy för strömmande inmatning från alla relevanta tabeller och databaser. Borttagningen av principen för strömmande inmatning utlöser omorganisering av data i Azure Data Explorer-klustret. Strömmande inmatningsdata flyttas från den ursprungliga lagringen till permanent lagring i kolumnlagret (omfattningar eller segment). Den här processen kan ta mellan några sekunder och några timmar, beroende på mängden data i den första lagringen.
Ta bort inmatningsprincipen för direktuppspelning
Du kan ta bort principen för strömmande inmatning med hjälp av Azure-portalen eller programmatiskt i C#.
I Azure-portalen går du till ditt Azure Data Explorer-kluster och väljer Query.
Om du vill släppa principen för strömmande inmatning från tabellen kopierar du följande kommando till frågefönstret och väljer Kör.
.delete table TestTable policy streamingingestion
I Inställningarväljer du Konfigurationer.
I fönstret Konfigurationer väljer du Av för att inaktivera direktuppspelningsinmatning.
Välj Spara.
Kör följande kod för att släppa principen för strömmande inmatning från tabellen:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Om du vill inaktivera strömmande inmatning i klustret kör du följande kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Begränsningar
Datamappningar måste vara förskapade för användning i strömningsinmatning. Enskilda begäranden om streaminginmatning stödjer inte inbyggda datamappningar.
Omfångstaggar kan inte ställas in på data för strömningsinmatning.
Uppdatera princip. Uppdateringsprincipen kan bara referera till nyligen inmatade data i källtabellen och inte andra data eller tabeller i databasen.
Om direktuppspelningsinmatning är aktiverat på ett kluster som används som ledare för följardatabasermåste strömmande inmatning aktiveras på följande kluster samt för att följa data för strömmande inmatning. Samma sak gäller oavsett om klusterdata delas via datadelning.