你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Tutorial: Route data using table update policies
Applies to: ✅ Microsoft Fabric ✅ Azure Data Explorer
When your source data involves simple and quick transformations, it's best to perform them upstream in the pipeline by using an event stream. However, this approach might not work well for other transformations that are complex or require specialized functionality to operate.
In this tutorial, you learn how to:
The example in this tutorial demonstrates how to use update policies for data routing to perform complex transformations to enrich, cleanse, and transform data at ingestion time. For a list of other common use cases, see Common use cases for table update policies.
Prerequisites
- A Microsoft account or a Microsoft Entra user identity. An Azure subscription isn't required.
- An Azure Data Explorer cluster and database. Create a cluster and database
1 - Create tables and update policies
The following steps guide you through creating a source table, transformation functions, destination tables, and update policies. The tutorial demonstrates how to use table update policies to perform complex transformations and save the results to one or more destination tables. The example uses a single source table named Raw_Table and three destination tables named Device_Telemetry, Device_Alarms, and Error_Log.
Run the following command to create a table named Raw_Table.
.create table Raw_Table (RawData: dynamic)
The source table is where the ingested data is saved. The table has a single column named RawData of type dynamic. The dynamic type is used to store the raw data as is, without any schema. For more information, see .create table command.
Run the following command to create a function named Get_Telemetry, Get_Alarms, and Log_Error functions.
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
When creating an update policy, you can specify an in-line script for execution. However, we recommend encapsulating the transformation logic into a function. Using a function improves code maintenance. When new data arrives, the function is executed to transform the data. The function can be reused across multiple update policies. For more information, see .create function command.
Run the following command to create the destination tables.
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
The destination table must have the same schema as that output of the transformation function. You can create destination tables in the following ways:
- Using the
.create table
command and manually specifying the schema as demonstrated with the creation of the Device_Telemetry table. However, this approach can be error-prone and time-consuming. - Using the
.set-or-append
command if you've already created a function to transform the data. This method creates a new table with the same schema as the output of the function, usingtake 0
to make sure the function only returns the schema. For more information, see .set-or-append command.
- Using the
Run the following command to create the update policies for the destination tables
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
The
.alter table policy update
command is used to link the source table, the transformation function, and the destination table. The update policy is created on the destination table and specifies to the source table and transformation function. For more information, see .alter table policy update command.
2 - Ingest sample data
To test the update policies, you can ingest sample data into the source table using the .set-or-append
command. For more information, see Ingest data from a query.
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3 - Verify the results
To validate the results, you can run a query to verify that the data was transformed and routed to the destination tables. In the following example, the union
operator is used to combine the source and the results from the destination tables into a single result set.
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
Output
You should see the following output where the Raw_Table has three rows and the destination tables have one row each.
TableName | Rows |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
Clean up resources
Run the following command in your database to clean up the tables and functions created in this tutorial.
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error