Konfigurace příjmu streamovaných dat v clusteru Azure Data Exploreru
Článek
Streamování příjmu dat je užitečné pro načítání dat, když potřebujete nízkou latenci mezi příjmem dat a dotazem. Použití příjmu dat streamování zvažte v následujících scénářích:
Vyžaduje se latence menší než sekunda.
Optimalizace provozního zpracování mnoha tabulek, kde je stream dat do každé tabulky relativně malý (několik záznamů za sekundu), ale celkový objem příjmu dat je vysoký (tisíce záznamů za sekundu).
Pokud je datový proud dat do každé tabulky vysoký (více než 4 GB za hodinu), zvažte použití dávkového příjmu dat.
Datová připojení centra událostí, IoT Hub a Event Gridu můžou využívat příjem dat streamování za předpokladu, že je povolený na úrovni clusteru. Rozhodnutí o použití příjmu dat streamování se provádí podle zásad příjmu dat streamování nakonfigurovaných v cílové tabulce. Informace o správě datových připojení najdete v tématu Centrum událostí, IoT Hub a Event Grid.
Následující tabulka vám pomůže vybrat typ příjmu dat, který je vhodný pro vaše prostředí:
Kritérium
Datové připojení
Vlastní příjem dat
Zpoždění dat mezi zahájením příjmu dat a daty dostupnými pro dotaz
Delší zpoždění
Kratší zpoždění
Režie na vývoj
Rychlé a snadné nastavení, bez režie na vývoj
Vysoké režijní náklady na vývoj při vytváření ingestování dat aplikace, zpracování chyb a zajištění konzistence dat
Poznámka
Proces povolení a zakázání příjmu streamovaných dat v clusteru můžete spravovat pomocí Azure Portal nebo programově v jazyce C#. Pokud pro vlastní aplikaci používáte jazyk C#, může být vhodnější použít programový přístup.
Mezi hlavní přispěvatele, kteří můžou mít vliv na příjem dat streamování, patří:
Velikost virtuálního počítače a clusteru: Výkon a kapacita streamování se škálují se zvýšenými velikostmi virtuálních počítačů a clusterů. Počet souběžných žádostí o příjem dat je omezený na šest na jádro. Například u skladových položek s 16 jádry, jako jsou D14 a L16, je maximální podporované zatížení 96 souběžných požadavků na příjem dat. U dvou základních skladových položek, jako je D11, je maximální podporovaná zátěž 12 souběžných požadavků na příjem dat.
Limit velikosti dat: Limit velikosti dat pro požadavek na ingestování streamování je 4 MB. To zahrnuje všechna data vytvořená pro zásady aktualizace během příjmu dat.
Aktualizace schématu: Aktualizace schématu, jako je vytváření a úpravy tabulek a mapování příjmu dat, můžou u služby příjmu dat streamování trvat až pět minut. Další informace najdete v tématu Příjem dat streamování a změny schématu.
Kapacita SSD: Povolení příjmu streamovaných dat v clusteru i v případě, že data nejsou ingestovaná prostřednictvím streamování, využívá část místního disku SSD počítačů clusteru pro streamování dat a snižuje velikost dostupného úložiště pro horkou mezipaměť.
Pokud chcete vytvořit tabulku, která bude přijímat data prostřednictvím příjmu dat streamování, zkopírujte následující příkaz do podokna Dotaz a vyberte Spustit.
Zkopírujte jeden z následujících příkazů do podokna Dotaz a vyberte Spustit. Tím se definují zásady příjmu dat streamování v tabulce, kterou jste vytvořili, nebo v databázi, která tabulku obsahuje.
Tip
Zásada definovaná na úrovni databáze se vztahuje na všechny existující a budoucí tabulky v databázi. Když povolíte zásadu na úrovni databáze, není nutné ji povolovat pro jednotlivé tabulky.
Pokud chcete definovat zásady pro tabulku, kterou jste vytvořili, použijte:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Vytvoření aplikace pro příjem streamovaných dat do clusteru
Vytvořte aplikaci pro příjem dat do clusteru pomocí preferovaného jazyka.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Zákaz příjmu dat streamování v clusteru
Upozornění
Zakázání příjmu dat streamování může trvat několik hodin.
Před zakázáním příjmu dat streamování v clusteru Azure Data Explorer odstraňte zásady příjmu dat ze všech relevantních tabulek a databází. Odebráním zásad příjmu dat streamování se aktivuje změna uspořádání dat v clusteru Azure Data Explorer. Streamovaná data příjmu dat se přesunou z počátečního úložiště do trvalého úložiště ve sloupcovém úložišti (rozsahy nebo horizontální oddíly). Tento proces může trvat několik sekund až několik hodin v závislosti na množství dat v počátečním úložišti.
Zrušení zásad příjmu streamování
Zásady příjmu streamování můžete zrušit pomocí Azure Portal nebo programově v jazyce C#.
V Azure Portal přejděte do clusteru Azure Data Explorer a vyberte Dotaz.
Pokud chcete zásady příjmu dat streamování z tabulky odstranit, zkopírujte následující příkaz do podokna Dotaz a vyberte Spustit.
.delete table TestTable policy streamingingestion
V Nastavení vyberte Konfigurace.
V podokně Konfigurace vyberte Vypnuto a zakažte příjem dat streamování.
Vyberte Uložit.
Pokud chcete z tabulky odstranit zásady příjmu streamování, spusťte následující kód:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Pokud chcete zakázat ingestování streamování v clusteru, spusťte následující kód:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Omezení
Mapování dat musí být předem vytvořené pro použití při příjmu streamů. Jednotlivé požadavky na příjem streamovaných dat nepoužádá vložená mapování dat.
U streamovaných dat příjmu dat není možné nastavit značky rozsahu.
Aktualizujte zásady. Zásady aktualizace mohou odkazovat pouze na nově přijatá data ve zdrojové tabulce, a ne na jiná data nebo tabulky v databázi.
Pokud je v clusteru, který se používá jako vedoucí pro databáze sledujících, povolený ingestování streamování, musí být ingestování streamování povolené i v následujících clusterech, aby bylo možné sledovat streamovaná data příjmu dat. Totéž platí pro sdílení dat clusteru prostřednictvím Data Share.