你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
How to ingest data with the REST API
Applies to: ✅ Microsoft Fabric ✅ Azure Data Explorer
The Kusto.Ingest library is preferred for ingesting data to your database. However, you can still achieve almost the same functionality, without being dependent on the Kusto.Ingest package. This article shows you how, by using Queued Ingestion to your database for production-grade pipelines.
Note
The code below is written in C#, and makes use of the Azure Storage SDK, the Microsoft Authentication Library (MSAL), and the NewtonSoft.JSON package, to simplify the sample code. If needed, the corresponding code can be replaced with appropriate Azure Storage REST API calls, non-.NET MSAL package, and any available JSON handling package.
This article deals with the recommended mode of ingestion. For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. Here, the client code interacts with your database by posting ingestion notification messages to an Azure queue. References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. Interaction with the service must be authenticated with Microsoft Entra ID.
The following code shows how the Kusto Data Management service handles queued data ingestion without using the Kusto.Ingest library. This example may be useful if full .NET is inaccessible or unavailable because of the environment, or other restrictions.
The code includes the steps to create an Azure Storage client and upload the data to a blob. Each step is described in greater detail, after the sample code.
- Obtain an authentication token for accessing the ingestion service
- Query the ingestion service to obtain:
- Upload data to a blob on one of the blob containers obtained from Kusto in (2)
- Compose an ingestion message that identifies the target database and table and that points to the blob from (3)
- Post the ingestion message we composed in (4) to an ingestion queue obtained in (2)
- Retrieve any error found by the service during ingestion
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your ingestion service URI
var dmServiceBaseUri = @"{serviceURI}";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Using queued ingestion for production-grade pipelines
Obtain authentication evidence from Microsoft Entra ID
Here we use Microsoft Authentication Library (MSAL) to obtain a Microsoft Entra token to access the Kusto Data Management service and ask for its input queues. MSAL is available on multiple platforms.
// Authenticates the interactive user and retrieves Microsoft Entra Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Microsoft Entra ID:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.microsoftonline.com/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
Retrieve ingestion resources
Manually construct an HTTP POST request to the Data Management service, requesting the return of the ingestion resources. These resources include queues that the DM service is listening on, and blob containers for data uploading. The Data Management service will process any messages containing ingestion requests that arrive on one of those queues.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Obtain a Kusto identity token
Ingest messages are handed off to your cluster via a non-direct channel (Azure queue), making it impossible to do in-band authorization validation for accessing the ingestion service. The solution is to attach an identity token to every ingest message. The token enables in-band authorization validation. This signed token can then be validated by the ingestion service when it receives the ingestion message.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
Upload data to the Azure Blob container
This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. This code uses the Azure Storage SDK. If dependency isn't possible, it can be achieved with Azure Blob Service REST API.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
Compose the ingestion message
The NewtonSoft.JSON package will again compose a valid ingestion request to identify the target database and table, and that points to the blob. The message will be posted to the Azure Queue that the relevant Kusto Data Management service is listening on.
Here are some points to consider.
- This request is the bare minimum for the ingestion message.
Note
The identity token is mandatory and must be part of the AdditionalProperties JSON object.
- Whenever necessary, CsvMapping or JsonMapping properties must be provided as well
- For more information, see the article on ingestion mapping pre-creation.
- Section Ingestion message internal structure provides an explanation of the ingestion message structure
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
Post the ingestion message to the ingestion queue
Finally, post the message that you constructed, to the selected ingestion queue that you previously obtained.
Note
.Net storage client versions below v12, by default, encode the message to base64 For more information, see storage docs. If you are using .Net storage client versions above v12, you must properly encode the message content.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
Check for error messages from the Azure queue
After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. For more information on the failure message structure, see Ingestion failure message structure.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Ingestion messages - JSON document formats
Ingestion message internal structure
The message that the Kusto Data Management service expects to read from the input Azure Queue is a JSON document in the following format.
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Property | Description |
---|---|
Id | Message identifier (GUID) |
BlobPath | Path (URI) to the blob, including the SAS key granting permissions to read/write/delete it. Permissions are required so that the ingestion service can delete the blob once it has completed ingesting the data. |
RawDataSize | Size of the uncompressed data in bytes. Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs. This property is optional, but if not given, the service will access the blob just to retrieve the size. |
DatabaseName | Target database name |
TableName | Target table name |
RetainBlobOnSuccess | If set to true , the blob won't be deleted once ingestion is successfully completed. Default is false |
FlushImmediately | If set to true , any aggregation will be skipped. Default is false |
ReportLevel | Success/Error reporting level: 0-Failures, 1-None, 2-All |
ReportMethod | Reporting mechanism: 0-Queue, 1-Table |
AdditionalProperties | Other properties such as format , tags , and creationTime . For more information, see data ingestion properties. |
Ingestion failure message structure
The message that the Data Management expects to read from the input Azure Queue is a JSON document in the following format.
Property | Description |
---|---|
OperationId | Operation identifier (GUID) that can be used to track the operation on the service side |
Database | Target database name |
Table | Target table name |
FailedOn | Failure timestamp |
IngestionSourceId | GUID identifying the data chunk that failed to ingest |
IngestionSourcePath | Path (URI) to the data chunk that failed to ingest |
Details | Failure message |
ErrorCode | The error code. For all the error codes, see Ingestion error codes. |
FailureStatus | Indicates whether the failure is permanent or transient |
RootActivityId | The correlation identifier (GUID) that can be used to track the operation on the service side |
OriginatesFromUpdatePolicy | Indicates whether the failure was caused by an erroneous transactional update policy |
ShouldRetry | Indicates whether the ingestion could succeed if retried as is |