Share via


Azure LogicApp Debatching: Splitting Messages and process them separately

Introduction

In this article I would like to explain Debatching technique in Logic app with two examples.  Debatching is a technique of splitting your messages based on any repeated element in the message and process them separately.

  1. Create a Logic app that receives Customer Records from SQL, those records we process them one by one inside a foreach loop. If one record takes 2 second of time, then 10 records will be processed in 20 seconds. Only one instances of Logic App will be created in this case.
  2. Create 2 Logic apps, One is to receive the Customer records from SQL, Second is to split and process the records.

Scenario

We receive Customer records from On-Premise SQL database by executing "GetCustomers" Stored Procedure, and post one by one into Salesforce (BTW, the ordering is not important). Before posting a new Customer, we should check if the Customer is already present in Salesforce.

Using Singleton Technique

Add "operationOptions": "SingleInstance" in the code view that makes LA flow as Singleton(Single Instance will be created to process all requests)

here is the detailed description for the above Logic App. In this logic app, 

1. Logic App get array of Customer Records like below from SQL

JSON Message

Using foreach loop we process one record at a time in this Logic App. This works alright but since there is no ordering we need to maintain, we could process them parallel. So we choose second option.

Using Debatching Technique

To use Debatching technique to split the record, we need 2 Logic Apps, One is to receive the Customer records from SQL, Second is to split and process the records.

Using this Logic App, every 3 mins. we poll a Stored Procedure named as "GetCustomers" and it returns list of customers. This Logic App posts the resultset of "GetCustomers"  in to another Logic App. The other Logic App looks like below,

Once we get the ResultSet, we check against empty validation, then we check if the Customer is already present in Salesforce, if not we create one, and send email confirmation.

To debatch the message, we need to use splitOn like below,

splitOn

it is a code view of our Second Logic App, I have opened it in Chrome:\apps->Json editor.

Output

Now I am going to run this Second Sample which has used the Debatching technique. My On-Premise SQL data is below,

My first Logic App, which receives these records and post the same into Second Logic App.

2 instances of Second Logic App has been created.

Each instances of second Logic App would be as follows,

Conclusion

Using debatching technique in Logic App, we could increase the number of instance of Logic App to gain more CPU process to improve the flow. Also, if there are any failures after receiving the records from SQL, we could only retry each record of Second Logic App instances separately. By default, while processing Salesforce records, splitOn("splitOn": "@triggerBody()?.value") is enabled. If you would like to maintain any ordering that you receive it from Salesforce just remove the SplitOn column from your Logic App flow.

Code (Logic App II)

{
  "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "workflows_LogicAppToSalesforce_name": {
      "defaultValue": null,
      "type": "String"
    },
    "workflows_OnPremSQLToSalesforce_name": {
      "defaultValue": null,
      "type": "String"
    },
    "workflows_LogicAppToSalesforce_connectionId": {
      "type": "String"
    },
    "workflows_LogicAppToSalesforce_connectionId_1": {
      "type": "String"
    },
    "workflows_OnPremSQLToSalesforce_connectionId": {
      "type": "String"
    },
    "workflows_OnPremSQLToSalesforce_connectionId_1": {
      "type": "String"
    },
    "workflows_OnPremSQLToSalesforce_connectionId_2": {
      "type": "String"
    }
  },
  "variables": {},
  "resources": [
    {
      "type": "Microsoft.Logic/workflows",
      "name": "[parameters('workflows_LogicAppToSalesforce_name')]",
      "apiVersion": "2016-06-01",
      "location": "northcentralus",
      "properties": {
        "state": "Enabled",
        "definition": {
          "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
          "contentVersion": "1.0.0.0",
          "parameters": {
            "$connections": {
              "defaultValue": {},
              "type": "Object"
            }
          },
          "triggers": {
            "manual": {
              "splitOn": "@triggerBody()?['Table1']",
              "type": "Request",
              "kind": "Http",
              "inputs": {
                "schema": {}
              }
            }
          },
          "actions": {
            "Condition": {
              "actions": {
                "ChkCustomerIsPresentinSalesforce": {
                  "actions": {
                    "CreateCustomerInSF": {
                      "runAfter": {},
                      "type": "ApiConnection",
                      "inputs": {
                        "body": {
                          "item": [
                            {
                              "BillingCity": "@triggerBody()['BillingCity']",
                              "BillingPostalCode": "@triggerBody()['BillingPostalCode']",
                              "BillingState": "@triggerBody()['BillingState']",
                              "BillingStreet": "@triggerBody()['BillingStreet']",
                              "Name": "@triggerBody()['CustomerName']"
                            },
                            {
                              "BillingCity": "@triggerBody()['BillingCity']",
                              "BillingPostalCode": "@triggerBody()['BillingPostalCode']",
                              "BillingState": "@triggerBody()['BillingState']",
                              "BillingStreet": "@triggerBody()['BillingStreet']",
                              "Name": "@triggerBody()['CustomerName']"
                            }
                          ]
                        },
                        "host": {
                          "api": {
                            "runtimeUrl": "https://logic-apis-northcentralus.azure-apim.net/apim/salesforce"
                          },
                          "connection": {
                            "name": "@parameters('$connections')['salesforce']['connectionId']"
                          }
                        },
                        "method": "post",
                        "path": "/datasets/default/tables/@{encodeURIComponent(encodeURIComponent('Account'))}/items"
                      }
                    },
                    "Send_an_email": {
                      "runAfter": {
                        "CreateCustomerInSF": [
                          "Succeeded"
                        ]
                      },
                      "type": "ApiConnection",
                      "inputs": {
                        "body": {
                          "Body": "Dear Integration Owner,\n\n@{body('CreateCustomerInSF')['Name']} account has been created. }\n\nThank you, Have a great day...\n",
                          "Subject": "Alert: New Customer has been created successfully.",
                          "To": "baraneedharan.manoharan@mindtree.com"
                        },
                        "host": {
                          "api": {
                            "runtimeUrl": "https://logic-apis-northcentralus.azure-apim.net/apim/office365"
                          },
                          "connection": {
                            "name": "@parameters('$connections')['office365']['connectionId']"
                          }
                        },
                        "method": "post",
                        "path": "/Mail"
                      }
                    }
                  },
                  "runAfter": {
                    "GetCusFromSalesforce": [
                      "Succeeded"
                    ]
                  },
                  "expression": "@equals(empty(body('GetCusFromSalesforce')?['value']), true)",
                  "type": "If"
                },
                "GetCusFromSalesforce": {
                  "runAfter": {},
                  "type": "ApiConnection",
                  "inputs": {
                    "host": {
                      "api": {
                        "runtimeUrl": "https://logic-apis-northcentralus.azure-apim.net/apim/salesforce"
                      },
                      "connection": {
                        "name": "@parameters('$connections')['salesforce']['connectionId']"
                      }
                    },
                    "method": "get",
                    "path": "/datasets/default/tables/@{encodeURIComponent(encodeURIComponent('Account'))}/items",
                    "queries": {
                      "$filter": "Name eq '@{triggerBody()?['CustomerName']}'"
                    }
                  }
                }
              },
              "runAfter": {},
              "expression": "@not(equals(empty(triggerBody()), true))",
              "type": "If"
            }
          },
          "outputs": {}
        },
        "parameters": {
          "$connections": {
            "value": {
              "office365": {
                "connectionId": "[parameters('workflows_LogicAppToSalesforce_connectionId')]",
                "connectionName": "office365-1",
                "id": "/subscriptions/{SubcriptionID}/providers/Microsoft.Web/locations/northcentralus/managedApis/office365"
              },
              "salesforce": {
                "connectionId": "[parameters('workflows_LogicAppToSalesforce_connectionId_1')]",
                "connectionName": "salesforce",
                "id": "/subscriptions/{SubcriptionID}/providers/Microsoft.Web/locations/northcentralus/managedApis/salesforce"
              }
            }
          }
        }
      },
      "dependsOn": []
    },
    {
      "type": "Microsoft.Logic/workflows",
      "name": "[parameters('workflows_OnPremSQLToSalesforce_name')]",
      "apiVersion": "2016-06-01",
      "location": "northcentralus",
      "properties": {
        "state": "Enabled",
        "definition": {
          "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
          "contentVersion": "1.0.0.0",
          "parameters": {
            "$connections": {
              "defaultValue": {},
              "type": "Object"
            }
          },
          "triggers": {
            "Recurrence": {
              "recurrence": {
                "frequency": "Hour",
                "interval": 3
              },
              "type": "Recurrence"
            }
          },
          "actions": {
            "Exec_OnPrem_StoreProcedure": {
              "runAfter": {},
              "type": "ApiConnection",
              "inputs": {
                "host": {
                  "api": {
                    "runtimeUrl": "https://logic-apis-northcentralus.azure-apim.net/apim/sql"
                  },
                  "connection": {
                    "name": "@parameters('$connections')['sql']['connectionId']"
                  }
                },
                "method": "post",
                "path": "/datasets/default/procedures/@{encodeURIComponent(encodeURIComponent('[dbo].[GetCustomers]'))}",
                "retryPolicy": {
                  "count": 4,
                  "interval": "PT10M",
                  "type": "fixed"
                }
              }
            },
            "HTTP": {
              "runAfter": {
                "Exec_OnPrem_StoreProcedure": [
                  "Succeeded"
                ]
              },
              "type": "Http",
              "inputs": {
                "body": "@body('Exec_OnPrem_StoreProcedure')?['ResultSets']",
                "headers": {
                  "Content-Type": "application/json"
                },
                "method": "POST",
                "uri": "https://prod-20.northcentralus.logic.azure.com:443/workflows/55b6a9af0e88481eb717cf7aa79c4fe0/triggers/manual/paths/invoke?api-version=2016-06-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig="
              }
            }
          },
          "outputs": {}
        },
        "parameters": {
          "$connections": {
            "value": {
              "office365": {
                "connectionId": "[parameters('workflows_OnPremSQLToSalesforce_connectionId')]",
                "connectionName": "office365-1",
                "id": "/subscriptions/{SubcriptionID}/providers/Microsoft.Web/locations/northcentralus/managedApis/office365"
              },
              "salesforce": {
                "connectionId": "[parameters('workflows_OnPremSQLToSalesforce_connectionId_1')]",
                "connectionName": "salesforce",
                "id": "/subscriptions/{SubcriptionID}/providers/Microsoft.Web/locations/northcentralus/managedApis/salesforce"
              },
              "sql": {
                "connectionId": "[parameters('workflows_OnPremSQLToSalesforce_connectionId_2')]",
                "connectionName": "sql",
                "id": "/subscriptions/{SubcriptionID}/providers/Microsoft.Web/locations/northcentralus/managedApis/sql"
              }
            }
          }
        }
      },
      "dependsOn": []
    }
  ]
}