Configurare l'inserimento in streaming nel cluster Esplora dati di Azure
Articolo
L'inserimento in streaming è utile per caricare i dati quando è necessaria una bassa latenza tra inserimento e query. È consigliabile usare l'inserimento in streaming negli scenari seguenti:
È necessaria una latenza inferiore a un secondo.
Per ottimizzare l'elaborazione operativa di molte tabelle in cui il flusso di dati in ogni tabella è relativamente piccolo (alcuni record al secondo), ma il volume di inserimento dati complessivo è elevato (migliaia di record al secondo).
Se il flusso di dati in ogni tabella è elevato (oltre 4 GB all'ora), è consigliabile usare l'inserimento batch.
Scegliere il tipo di inserimento di streaming appropriato
Sono supportati due tipi di inserimento in streaming:
Tipo di inserimento
Descrizione
Connessione dati
Hub eventi, hub IoT e connessioni dati di Griglia di eventi possono usare l'inserimento in streaming, purché sia abilitato a livello di cluster. La decisione di usare l'inserimento in streaming viene eseguita in base ai criteri di inserimento di streaming configurati nella tabella di destinazione. Per informazioni sulla gestione delle connessioni dati, vedere Hub eventi, hub IoT e Griglia di eventi.
Inserimento personalizzato
L'inserimento personalizzato richiede di scrivere un'applicazione che usa una delle librerie client di Azure Esplora dati. Usare le informazioni contenute in questo argomento per configurare l'inserimento personalizzato. È anche possibile trovare utile l'applicazione di esempio di inserimento in streaming C# .
Usare la tabella seguente per scegliere il tipo di inserimento appropriato per l'ambiente:
Criterio
Connessione dati
Inserimento personalizzato
Ritardo dei dati tra l'avvio dell'inserimento e i dati disponibili per la query
Ritardo più lungo
Ritardo più breve
Sovraccarico di sviluppo
Configurazione rapida e semplice, senza sovraccarico di sviluppo
Sovraccarico di sviluppo elevato per creare un'applicazione che inserisce i dati, gestire gli errori e garantire la coerenza dei dati
Nota
È possibile gestire il processo per abilitare e disabilitare l'inserimento in streaming nel cluster usando il portale di Azure o a livello di codice in C#. Se si usa C# per l'applicazione personalizzata, è possibile che sia più conveniente usare l'approccio programmatico.
Considerazioni sulle prestazioni e sulle operazioni
I principali collaboratori che possono influire sull'inserimento in streaming sono:
Dimensioni della macchina virtuale e del cluster: le prestazioni e la capacità di inserimento dei flussi aumentano le dimensioni della macchina virtuale e del cluster. Il numero di richieste di inserimento simultanee è limitato a sei per core. Ad esempio, per 16 SKU core, ad esempio D14 e L16, il carico massimo supportato è 96 richieste di inserimento simultanee. Per due SKU principali, ad esempio D11, il carico massimo supportato è 12 richieste di inserimento simultanee.
Limite di dimensioni dei dati: il limite delle dimensioni dei dati per una richiesta di inserimento in streaming è 4 MB. Sono inclusi i dati creati per i criteri di aggiornamento durante l'inserimento.
Aggiornamenti dello schema: gli aggiornamenti dello schema, ad esempio la creazione e la modifica di tabelle e mapping di inserimento, possono richiedere fino a cinque minuti per il servizio di inserimento in streaming. Per altre informazioni, vedere Inserimento di streaming e modifiche dello schema.
Capacità SSD: l'abilitazione dell'inserimento in streaming in un cluster, anche quando i dati non vengono inseriti tramite streaming, usa parte del disco SSD locale dei computer cluster per lo streaming dei dati di inserimento e riduce lo spazio di archiviazione disponibile per la cache ad accesso frequente.
Nel portale di Azure passare a cluster di Esplora dati di Azure.
In Impostazioni selezionare Configurazioni.
Nel riquadro Configurazioni selezionare Sì per abilitare l'inserimento in streaming.
Selezionare Salva.
È possibile abilitare l'inserimento in streaming durante la creazione di un nuovo cluster di Azure Esplora dati.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Creare una tabella di destinazione e definire i criteri
Creare una tabella per ricevere i dati di inserimento in streaming e definirne i criteri correlati usando il portale di Azure o a livello di codice in C#.
Copiare uno dei comandi seguenti nel riquadro Query e selezionare Esegui. In questo modo vengono definiti i criteri di inserimento di streaming nella tabella creata o nel database che contiene la tabella.
Suggerimento
Un criterio definito a livello di database si applica a tutte le tabelle esistenti e future nel database. Quando si abilitano i criteri a livello di database, non è necessario abilitarlo per ogni tabella.
Per definire i criteri nella tabella creata, usare:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Creare un'applicazione di inserimento in streaming per inserire dati nel cluster
Creare l'applicazione per l'inserimento di dati nel cluster usando il linguaggio preferito.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Disabilitare l'inserimento in streaming nel cluster
Avviso
La disabilitazione dell'inserimento in streaming può richiedere alcune ore.
Prima di disabilitare l'inserimento di streaming nel cluster Esplora dati di Azure, eliminare i criteri di inserimento di streaming da tutte le tabelle e i database pertinenti. La rimozione dei criteri di inserimento dei flussi attiva la ridisistribuzione dei dati all'interno del cluster di Azure Esplora dati. I dati di inserimento in streaming verranno spostati dalla risorsa di archiviazione iniziale alla risorsa di archiviazione permanente nell'archivio colonne (extent o partizioni). Questo processo può richiedere da qualche secondo a poche ore, a seconda della quantità di dati nell'archiviazione iniziale.
Eliminare i criteri di inserimento di streaming
È possibile eliminare i criteri di inserimento di streaming usando il portale di Azure o a livello di codice in C#.
Nella portale di Azure passare al cluster di Azure Esplora dati e selezionare Query.
Per eliminare i criteri di inserimento di streaming dalla tabella, copiare il comando seguente nel riquadro Query e selezionare Esegui.
.delete table TestTable policy streamingingestion
In Impostazioni selezionare Configurazioni.
Nel riquadro Configurazioni selezionare No per disabilitare l'inserimento in streaming.
Selezionare Salva.
Per eliminare i criteri di inserimento di streaming dalla tabella, eseguire il codice seguente:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Per disabilitare l'inserimento in streaming nel cluster, eseguire il codice seguente:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Limitazioni
I mapping dei dati devono essere creati in modo preliminare per l'uso nell'inserimento in streaming. Le singole richieste di inserimento in streaming non supportano i mapping dei dati inline.
I tag extent non possono essere impostati sui dati di inserimento in streaming.
Aggiornare i criteri. I criteri di aggiornamento possono fare riferimento solo ai dati appena inseriti nella tabella di origine e non ad altre tabelle o dati nel database.
Se l'inserimento in streaming è abilitato in un cluster usato come leader per i database follower, l'inserimento in streaming deve essere abilitato nei cluster seguenti per seguire i dati di inserimento in streaming. Lo stesso vale se i dati del cluster vengono condivisi tramite Condivisione dati.