Konfigurace příjmu streamovaných dat v clusteru Azure Data Exploreru
Článek
Příjem dat streamování je užitečný pro načítání dat, když potřebujete nízkou latenci mezi příjmem dat a dotazem. Zvažte použití příjmu dat streamování v následujících scénářích:
Je vyžadována latence menší než sekunda.
Optimalizace provozního zpracování mnoha tabulek, kde je datový proud do každé tabulky relativně malý (několik záznamů za sekundu), ale celkový objem příjmu dat je vysoký (tisíce záznamů za sekundu).
Pokud je datový proud do každé tabulky vysoký (přes 4 GB za hodinu), zvažte použití příjmu dat ve frontě.
Další informace o různých metodách příjmu dat najdete v přehledu příjmu dat.
Ukázky kódu založené na předchozích verzích sady SDK najdete v archivovaného článku.
Zvolte odpovídající typ příjmu dat streamování.
Podporují se dva typy příjmu dat streamování:
Typ příjmu dat
Popis
Datové připojení
Služba Event Hubs, IoT Hub a datová připojení Event Gridu můžou používat příjem dat streamování za předpokladu, že je povolená na úrovni clusteru. Rozhodnutí o použití příjmu streamovaných dat se provádí podle zásad příjmu streamování nakonfigurovaných v cílové tabulce. Informace o správě datových připojení najdete v tématu Event Hub, IoT Hub a Event Grid.
Vlastní příjem dat
Vlastní příjem dat vyžaduje, abyste napsali aplikaci, která používá některou z klientských knihoven Azure Data Exploreru. Informace v tomto tématu slouží ke konfiguraci vlastního příjmu dat. Můžete také najít ukázkovou aplikaci pro příjem dat C?view=azure-data-explorer&preserve-view=true# pro příjem dat.
Následující tabulka vám pomůže zvolit typ příjmu dat, který je vhodný pro vaše prostředí:
Kritérium
Datové připojení
Vlastní příjem dat
Zpoždění dat mezi zahájením příjmu dat a daty dostupnými pro dotaz
Delší zpoždění
Kratší zpoždění
Režijní náklady na vývoj
Rychlé a snadné nastavení bez režijních nákladů na vývoj
Vysoká režie při vývoji při vytváření ingestování dat, zpracování chyb a zajištění konzistence dat
Poznámka:
Proces můžete spravovat, abyste v clusteru povolili a zakázali příjem dat streamování pomocí webu Azure Portal nebo programově v jazyce C#. Pokud používáte jazyk C# pro vlastní aplikaci, může být vhodnější použít programový přístup.
Mezi hlavní přispěvatele, kteří můžou mít vliv na příjem dat streamování, patří:
Virtuální počítač a velikost clusteru: Výkon streamování příjmu dat a škálování kapacity se zvýšenými velikostmi virtuálních počítačů a clusterů Počet souběžných žádostí o příjem dat je omezený na šest na jádro. Například u 16 jader SKU, například D14 a L16, je maximální podporovaná zátěž 96 souběžných požadavků na příjem dat. U dvou základních skladových položek, jako je D11, je maximální podporovaná zátěž 12 souběžných požadavků na příjem dat.
Omezení velikosti dat: Limit velikosti dat pro požadavek na příjem dat je 4 MB. To zahrnuje všechna data vytvořená pro zásady aktualizace během příjmu dat.
Aktualizace schématu: Aktualizace schématu, jako jsou vytváření a úpravy tabulek a mapování příjmu dat, můžou trvat až pět minut pro službu příjmu dat streamování. Další informace najdete v tématu Streamování změn příjmu dat a schématu.
Kapacita SSD: Povolení příjmu streamovaných dat v clusteru, i když se data neingestují prostřednictvím streamování, využívá část místního disku SSD počítačů clusteru pro streamování dat příjmu dat a snižuje úložiště dostupné pro horkou mezipaměť.
Při vytváření clusteru pomocí kroků v části Vytvoření clusteru a databáze Azure Data Exploreru vyberte na kartě Konfigurace příjem dat> streamování.
Pokud chcete povolit příjem dat streamování při vytváření nového clusteru Azure Data Exploreru, spusťte následující kód:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Povolení příjmu streamovaných dat v existujícím clusteru
Pokud máte existující cluster, můžete ingestování streamování povolit pomocí webu Azure Portal nebo programově v jazyce C#.
Na webu Azure Portal přejděte do clusteru Azure Data Exploreru.
V Nastavení vyberte Konfigurace.
V podokně Konfigurace vyberte Zapnuto a povolte příjem dat streamování.
Zvolte Uložit.
Při aktualizaci existujícího clusteru Azure Data Exploreru můžete povolit příjem dat streamování.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Vytvoření cílové tabulky a definování zásad
Vytvořte tabulku pro příjem streamovaných dat příjmu dat a definujte související zásady pomocí webu Azure Portal nebo programově v jazyce C#.
Pokud chcete vytvořit tabulku, která bude přijímat data prostřednictvím příjmu streamovaných dat, zkopírujte do podokna Dotaz následující příkaz a vyberte Spustit.
Zkopírujte jeden z následujících příkazů do podokna Dotaz a vyberte Spustit. Tím se definují zásady příjmu dat streamování v tabulce, kterou jste vytvořili, nebo v databázi, která tuto tabulku obsahuje.
Tip
Zásada definovaná na úrovni databáze se vztahuje na všechny existující a budoucí tabulky v databázi. Když zásadu povolíte na úrovni databáze, není nutné ji povolit pro každou tabulku.
K definování zásad v tabulce, kterou jste vytvořili, použijte:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Zakázání příjmu streamovaných dat v clusteru
Upozorňující
Zakázání příjmu dat streamování může trvat několik hodin.
Před zakázáním příjmu dat streamování v clusteru Azure Data Exploreru zakažte zásady příjmu streamování ze všech relevantních tabulek a databází. Při odebrání zásad příjmu dat streamování se v clusteru Azure Data Exploreru mění uspořádání dat. Streamovaná data příjmu dat se přesunou z počátečního úložiště do trvalého úložiště v úložišti sloupců (rozsahy nebo horizontální oddíly). V závislosti na množství dat v počátečním úložišti může tento proces trvat několik sekund až několik hodin.
Vyřazení zásad příjmu dat streamování
Zásady příjmu streamování můžete odstranit pomocí webu Azure Portal nebo programově v jazyce C#.
Na webu Azure Portal přejděte do clusteru Azure Data Exploreru a vyberte Dotaz.
Pokud chcete z tabulky odstranit zásady příjmu streamovaných dat, zkopírujte do podokna Dotaz následující příkaz a vyberte Spustit.
.delete table TestTable policy streamingingestion
V Nastavení vyberte Konfigurace.
V podokně Konfigurace vyberte Vypnuto a zakažte příjem dat streamování.
Zvolte Uložit.
Pokud chcete z tabulky odstranit zásady příjmu dat streamování, spusťte následující kód:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Pokud chcete v clusteru zakázat příjem streamu, spusťte následující kód:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Omezení
Mapování dat musí být předem vytvořené pro použití při příjmu dat streamování. Jednotlivé požadavky na příjem dat streamování neposílaly vložené mapování dat.
Aktualizujte zásady. Zásady aktualizace mohou odkazovat pouze na nově ingestované data ve zdrojové tabulce, a ne na žádná jiná data nebo tabulky v databázi.
Když se zásada aktualizace s transakčními zásadami nezdaří, opakování se vrátí do dávkového příjmu dat.
Pokud je v clusteru povolené příjem dat streamování, který se používá jako vedoucí pro sledující databáze, musí být příjem dat streamování povolený na následujících clusterech a sledovat data příjmu streamovaných dat. Totéž platí i pro sdílení dat clusteru prostřednictvím sdílené datové složky.