In many business scenarios, there is the need to implement long-running processes which first send a message to a second process and then pause and wait for an asynchronous response before they continue. Being this an asynchronous communication, the challenge is to correlate the response to the original request. The Correlation Identifier enterprise integration pattern targets this scenario.
Azure Logic Apps provides a stateful workflow engine that allow us to implement robust integration workflows quite easily. One of the workflow actions in Logic Apps is the webhook action, which can be used to implement the Correlation Identifier pattern. One typical scenario in which this pattern can be used is when an approval step with a custom API (with a similar behaviour to the Send Approval Email connector) is required in a workflow.
In this post, I will show how to implement the Correlation Identifier enterprise integration pattern on Logic Apps leveraging the webhook action. [UPDATE] The Correlation Identifier pattern can also be implemented using Azure Service Bus and Sessions, as shown by my colleague Prasoon in this post.
Some background information
The Correlation Identifier Pattern
Adapted from Enterprise Integration Patterns
The Correlation Identifier enterprise integration pattern proposes to add a unique id to the request message on the requestor end and return it as the correlation identifier in the asynchronous response message. This way, when the requestor receives the asynchronous response, it knows which request that response corresponds to. Depending on the functional and non-functional requirements, this pattern can be implemented in a stateless or stateful manner.
A webhook is a service that will be triggered on a particular event and will result on an Http call to a RESTful subscriber. A much more comprehensive definition can be found here. You might be familiar with the configuration of webhooks with static subscribers. In a previous post, I showed how to trigger a Logic App by an SMS message with a Twilio webhook. This webhook will sends all events to the same Http endpoint, i.e. a static subscriber.
The Correlation Identifier pattern on Logic Apps
If you have used the Send Approval Email Logic App Connector, this implements the Correlation Identifier pattern out-of-the-box in a stateful manner. When this connector is used in a Logic App workflow, an email is sent, and the workflow instance waits for a response. Once the email recipient clicks on a button in the email, the particular workflow instance receives an asynchronous callback with a payload containing the user selection; and it continues to the next step. This approval email comes in very handy in many cases; however, a custom implementation of this pattern might be required in different business scenarios. The webhook action allow us to have a custom implementation of the Correlation Identifier pattern.
The Logic Apps Webhook Action
To implement the Correlation Identifier pattern, it's important that you have a basic understanding of the Logic Apps webhook action. Justin wrote some handy notes about it here. The webhook action of Logic Apps works with an instance-based, i.e. dynamic webhook subscription. Once executed, the webhook action generates an instance-based callback URL for the dynamic subscription. This URL is to be used to send a correlated response to trigger the continuation of the corresponding workflow. This applies the Return Address integration pattern.
We can implement the Correlation Identifier pattern by building a Custom API Connector for Logic Apps following the webhook subscribe and unsubscribe pattern of Logic Apps. However, it's also possible to implement this pattern without the need of writing a Custom API Connector, as I'll show below.
To illustrate the pattern, I'll be using a fictitious company called FarmToTable. FarmToTable provides delivery of fresh produce by drone. Consumers subscribe to the delivery service by creating their personalised list of produce to be delivered on a weekly basis. FarmToTable requires to implement an SMS confirmation service so that an SMS message is sent to each consumer the day before the scheduled delivery date. After receiving the text message, the customer must confirm within 12 hours whether they want the delivery or not, so that the delivery is arranged.
The Solution Architecture
As mentioned above, the scenario requires sending an SMS text message and waiting for an SMS response. For sending and receiving the SMS, we will be using Twilio. More details on working with Logic Apps and Twilio on one of my previous posts. Twilio provides webhooks that are triggered when SMS messages are received. The Twilio webhooks only allow static subscriptions, i.e. calling one single Http endpoint. Nevertheless, the webhook action of Logic Apps requires the webhook subscribe and unsubscribe pattern, which works with an instance-based subscription. Thus, we need to implement a wrapper for the required subscribe/unsubscribe pattern.
The architecture of this pattern is shown in the figure below and explain after.
Components of the solution:
- Long-running stateful workflow. This is the Logic App that controls the main workflow, sends a request, pauses and waits for an asynchronous response. This is implememented by using the webhook action.
- Subscribe/Unsubscribe Webhook Wrapper. In our scenario, we are working with a third-party service (Twilio) that only supports webhooks with static subscriptions; thus, we need to create this wrapper. This wrapper is composed by 4 different parts.
- Subscription store: A database to store the unique message Id and the instance-based callback URL provided by the webhook action. In my implementation, I'm using Azure Cosmos DB for this. Nevertheless, you can use any other suitable alternative. Because the only message id we can send to Twilio and get back is the phone number, I'm using this as my correlation identifier. We can assume that for this scenario the phone number is unique during the day.
- Subscribe and Start Request Processing API: this is a RESTful API that is in charge of starting the processing of the request and storing the subscription. I'm implementing this API with a Logic App, but you can use an Azure Function, an API App or a Custom Api App connector for Logic App.
- Unsubscribe and Cancel Request Processing API: this is another RESTful API that is only going to be called if the webhook action on the main workflow times out. This API is in charge of cancelling the processing and deleting the subscription from the store. The unsubscribe step has a similar purpose to the CancellationToken structure used in C# async programming. In our scenario, there is nothing to cancel though. Like the previous API, I'm implementing this with a Logic App, but you can use different technologies.
- Instance-based webhook: this webhook is to be triggered by the third-party webhook with a static subscription. Once triggered, this Logic App is in charge of getting the instance-based callback URL from the store and invoking it. After making the call back to the main workflow instance, the subscription is to be deleted.
The actual solution
To implement this solution, I'm going to follow the steps described below:
1. Configure my Twilio account to be able to send and receive SMS messages. More details here.
2. Create a Service Bus Namespace and 2 queues. For my scenario, I'm using one inbound queue (ScheduledDeliveriesToConfirm) and one outbound queue (ConfirmedScheduledDeliveries). For your own scenarios, you can use other triggers and outbound protocols.
3. Create a Cosmos Db collection to store the instance-based webhook subscriptions. More details on how to work with Cosmos Db here.
- Create Cosmos Db account (with the Document DB API).
- Create database
- Create collection.
4. Create the "Subscribe and Start Request Processing API". I'm using a Logic App workflow to implement this API as shown below. I hope the steps with their comments are self-explanatory.
- The workflow is Http triggered. It expects, as the request body, the scheduled delivery details and the instance-based callback URL of the calling webhook action.
- The provided Http trigger URL is to be configured later in the webhook action subscribe Uri of the main Logic App.
- It stores the correlation on Cosmos Db. More information on the Cosmos Db connector here.
It starts the request processing by calling the Twilio connector to send the SMS message.
The expected payload for this API is as the one below. This payload is to be sent by the webhook action subscribe call on the main Logic App:
"customer": "Paco de la Cruz",
"orderName": "Seasonal leafy greens and fruits",
You can have a look at the code behind here. Please use it just as a reference, as it hasn't been refactored for deployment.
5. Create the "Unsubscribe and Cancel Request Processing API". I used another Logic App workflow to implement this API. This API is only going to be called if the webhook action on the main workflow times out. The workflow is show below.
- The workflow is Http triggered. It expects as the request body the message id so the corresponding subscription can be deleted.
- The provided Http trigger URL is to be configured later in the webhook action unsubscribe Uri of the main Logic App.
It deletes the subscription from Cosmos Db. More information on the Cosmos Db connector here.
The expected payload for this API is quite simple, as the one shown below. This payload is to be sent by the webhook action unsubscribe call on the main Logic App:
The code behind is published here. Please use it just as a reference, as it hasn't been refactored to be deployed.
6. Create the Instance-based Webhook. I'm using another Logic App to implement the instance-based webhook as shown below.
- The workflow is Http triggered. It's to be triggered by the Twilio webhook.
- The provided Http trigger URL is to be configured later in the Twilio webhook.
- It gets the message Id (phone number) from the Twilio message.
- It then gets the instance-based subscription (callback URL) from Cosmos Db.
- Then, it posts the received message to the corresponding instance of the main Logic App workflow by using the correlated callback URL.
- After making the callback, it deletes the subscription from Cosmos Db.
The code behind for this workflow is here. Please use it just as a reference, as it is not ready to be deployed.
7. Configure the Twilio static webhook. Now, we have to configure the Twilio webhook to call the Logic App created above when an SMS message is received. Detailed instructions in my previous post.
8. Create the long-running stateful workflow. Once we have the implemented the subscribe/unsubscribe webhook wrapper required for the Logic App webhook action, we can start creating the long-running stateful workflow. This is shown below.
In order to trigger the Unsubscription API, the timeout property of the webhook action must be configured. This can be specified under the settings of the action. The Duration is to be configured the in ISO 8601 duration format. If you don't want to resend the request after the time out, you should turn off the retry policy.
- The workflow is triggered by messages on the ScheduledDeliveriesToConfirm Service Bus queue.
Then the webhook action:
- Sends the scheduled delivery message and the corresponding instance-based callback URL to the Subscribe and Start Request Processing Logic App.
- Waits for the callback from the Instance-based webhook. This would receive as an Http post the response send by the customer. If a response is received before the time out limit, the action will succeed and continue to the next action.
- If the webhook action times out, it calls the Unsubscribe and Cancel Request Processing Logic App and sends the message id (phone number); and the action fails so the workflow does not continue. However, if required, you could continue the workflow by configuring the RunAfter property of the subsequent action.
- If a response is received, the workflow continues assessing the response. If the response is 'YES', it sends the original message to the ConfirmedScheduledDeliveries queue.
The code behind of this workflow is available here. Please use it just as a reference only, as it hasn't been refactored for deployment.
Now, we have finished implementing the whole solution! :) You can have a look at all the Logic Apps JSON definitions in this repository.
In this post, I've shown how to implement the Correlation Identifier pattern using a stateful Logic App. To illustrate the pattern, I implemented an approval step in a Logic App workflow with a custom API. For this, I used Twilio, a third-party service, that offers a webhook with a static subscription; and created a wrapper to implement the subscribe/unsubscribe pattern, including an instance-based webhook to meet the Logic Apps webhook action requirements.
I hope you find this post useful whenever you have to add a custom approval step or correlate asynchronous messages using Logic Apps, or that I've given you an overview of how to enable the correlation of asynchronous messages in your own workflow or integration scenarios.
Feel free to add your comments or questions below, and happy clouding!
Cross-posted on Paco’s Blog.
Follow Paco on @pacodelacruz.