It's just Emanuele showing you what you can do in the Microsoft Data Platform

Create an asynchronous multithreaded workflow on your SQL Server using the Service Broker

C

As any other *Server, SQL Server is born with concurrency in mind, a lot of users/applications can connect concurrently and do their job, as designed, perfect, but, what if you’re the only user?
What if you’re in some kind of data warehouse configuration where you have to ETL, or you are in a maintenance window and have to do a bunch of stuff in that time, or simply you need to launch a task but you don’t want/need it to be blocking for your operations?

Usually, you would create more clients/sessions and launch each task from a different client, and call it a day, which is a perfectly functional solution, by the way; But what if you don’t have access to any other external tool other than SQL Server itself? You know, customers can have all kinds of restrictions on their servers and can be picky.

Let’s start with the basics, to understand how this works, then the concept could be applied do more complex scenarios:

Let’s say that I have a workflow composed of many small tasks that need to be completed but they are not blocking for my application (e.g. maintenance tasks, cleanup, background processing.etc..)

The idea is roughly the following:

  • We’ll have a SP that can be launched at any time, to which I pass the SQL code that I want to execute.
  • The SQL Commands will enter a queue, which will be handled by n (with n configuratale) background threads that actually execute the code
  • A status manager task will take care of updating the execution log table for each completed tasks and to signal if there are still running tasks in the execution queue

Let’s start by creating a demo DB to work on:

USE master;
GO

DROP DATABASE IF EXISTS BROKER_TEST;
GO

CREATE DATABASE BROKER_TEST;
GO

ALTER DATABASE BROKER_TEST SET ENABLE_BROKER;

USE BROKER_TEST;
GO

The Service Broker is usually disabled (except for MSDB, but that’s another story), so you’ll have to enable it after creating the demo database.

Let’s create the Logging table that will have all the execution results of each thread:

CREATE TABLE [AsynchLog]
(
 [ID] INT IDENTITY PRIMARY KEY CLUSTERED ,
 [Command] NVARCHAR(4000),
 [CompletedAt] DATETIME DEFAULT GETDATE(),
 [Duration(s)] SMALLINT,
 [Error] BIT,
 [ErrorMessage] VARCHAR(2000)
);
GO

Then, we’re finally ready to actually tackle the Service Broker, you’ll need to define the following elements:

  • Messages
  • Contract(s)
  • Queue(s)
  • Service(s)
  • Procedures that handle all of the above

The Service Broker can use different types of validation to send messages between the INITIATOR (fancy word for saying who’s sending the message) and the TARGET (and viceversa)
Messages are defined as part of a contract, that binds which message is sent by which actor (INITIATOR or TARGET)
Each message is sent to a Queue , which is managed by a service.

CREATE MESSAGE TYPE [RequestMessage]
     VALIDATION = NONE;
 
CREATE MESSAGE TYPE [ReplyMessage]
     VALIDATION = WELL_FORMED_XML;
 
CREATE CONTRACT [ParallelContract]
     ([RequestMessage] SENT BY INITIATOR, [ReplyMessage] SENT BY ANY);

CREATE QUEUE TargetQueue;
 
CREATE SERVICE [TargetService]
     ON QUEUE TargetQueue
     ([ParallelContract]);
 
CREATE QUEUE InitiatorQueue;
 
CREATE SERVICE [InitiatorService]
     ON QUEUE InitiatorQueue
     ([ParallelContract]);

GO

In my example I’m creating two type of messages, which need only to be valid XML, nothing else, a contract that states that RequestMessage is sent by the INITIATOR and ReplyMessage is sent by TARGET (duh) and two Queues with two different Services to handle them; The Request Message doesn’t use any kid of validation, I’ll use it as a blob to send the SQL code to run in the background, while the Reply Message is a valid XML that will hold the outcome of the execution

Until now we’ve played around, let’s take out the procedures!

CREATE OR ALTER PROCEDURE CommandLauncherProc
AS
BEGIN

SET NOCOUNT ON;

DECLARE @RecvReqDlgHandle AS UNIQUEIDENTIFIER;
DECLARE @Group AS UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg AS nvarchar(max);
DECLARE @RecvReqMsgName AS sysname;
DECLARE @listOfCallIds varchar(max);
WHILE (1 = 1)
     BEGIN
         BEGIN TRANSACTION;
         WAITFOR(
         RECEIVE TOP (1) @RecvReqDlgHandle = conversation_handle,
                         @RecvReqMsg = message_body,
                         @RecvReqMsgName = message_type_name,
                         @Group = conversation_group_id FROM TargetQueue), TIMEOUT 5000;

         IF (@@ROWCOUNT = 0)
             BEGIN
                 ROLLBACK;
                 BREAK;
             END
         IF @RecvReqMsgName = N'RequestMessage'
             BEGIN
                 DECLARE @XMLReply xml

                 BEGIN TRY
                     --Send OK Reply, starting to execute code
                     END CONVERSATION @RecvReqDlgHandle; 

                     --Open Target Queue Conversation
                     DECLARE @ConversationHandle UNIQUEIDENTIFIER;

                     BEGIN DIALOG @ConversationHandle 
                     FROM SERVICE [TargetService] 
                     TO SERVICE N'InitiatorService' 
                     ON CONTRACT [ParallelContract] 
                     WITH RELATED_CONVERSATION_GROUP = @Group, 
                     ENCRYPTION = OFF, LIFETIME = 10000;

                     DECLARE @t1 datetime= GETDATE();

                     EXEC sp_executesql @RecvReqMsg; --Code to run

                     DECLARE @t2 datetime = GETDATE();
                     DECLARE @Duration smallint = DATEDIFF(SECOND,@t1,@t2);
                     SELECT 
                     @XMLReply = (SELECT 'OK' as Result, 
                                  @Duration as Duration, @RecvReqMsg as Command
                                  FOR XML PATH(''), ELEMENTS);

                     SEND ON CONVERSATION (@ConversationHandle) 
                     MESSAGE TYPE [ReplyMessage] (@XMLReply);


                 END TRY
                 BEGIN CATCH
                 --SEND KO Reply
                 SELECT 
                 @XMLReply = (SELECT 'FAIL' as Result, 
                              ERROR_MESSAGE() as ErrorMessage, @RecvReqMsg as Command
                              FOR XML PATH(''), ELEMENTS);

                 SEND ON CONVERSATION (@ConversationHandle) 
                 MESSAGE TYPE [ReplyMessage] (@XMLReply);

                 BREAK;
                 END CATCH
                
             END
         ELSE
             IF @RecvReqMsgName = N'<a class="linkclass" href="http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog">http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog</a>'
                 BEGIN
                     END CONVERSATION @RecvReqDlgHandle;
                 END
             ELSE
                 IF @RecvReqMsgName = N'<a class="linkclass" href="http://schemas.microsoft.com/SQL/ServiceBroker/Error">http://schemas.microsoft.com/SQL/ServiceBroker/Error</a>'
                     BEGIN
                         END CONVERSATION @RecvReqDlgHandle;
                     END
         COMMIT TRANSACTION;
     END
END

GO

The first procedure that we need is the one that actually receives and processes the messages in the queue. It does the following:

  1. It waits to receive a message from TargetQueue, on a loop
  2. When it receives a message:
    1. It checks if the message name is the same as the message that it’s expecting (in our case, RequestMessage)
    2. It closes the original conversation handle (using it like an acknowledgment as “ok, I’ve started to process this”)
    3. Open up another conversation to the InitiatorQueue as a signaling path for execution start and channel where to send the outcome of the execution (we’ll track each conversation group with the @Group variable, more on that later)
    4. Sends to sp_executesql the code to run
    5. If everything is ok, a XML message that states the successful execution and the time it took it sent to the conversation just opened
    6. If for some reason something went wrong during the execution, the failed notification alongside with the error message is sent back to the initiator queue.

Of course, we won’t have a bunch of these procedure run a 1=1 while loop in the background burning CPU, but it will be activated on demand by the queue itself, to do so we set this procedure as the activation procedure of the queue:

ALTER QUEUE TargetQueue WITH ACTIVATION (STATUS = ON, PROCEDURE_NAME = CommandLauncherProc, MAX_QUEUE_READERS = 5, EXECUTE AS SELF);

The procedure name is the name of our CommandLauncherProc, which will listen on the queue.
The MAX_QUEUE_READERS , on the other hand, is similar to a MAXDOP parameter, meaning that in the code above a maximum of 5 concurrent messages can be elaborated at the same time. In our case, this means that if we queue 10 messages, they’ll get processed in 10 seconds (5 seconds per procedure, 5 running at the time) as opposed to 50 seconds of a serial run (of course I’m deliberately ignoring any concurrency, lock wait, etc that may happen, only happy theory).
Be aware that I’m knowingly ignoring the Queue activation time and hence the fact that all of 5 tasks won’t be running from the arrival of at least 5 messages

Now, let’s handle the other part of the background processing, as I need a queue-elaborating procedure to process the commands to run, I need the same to process the replies and statuses.

CREATE OR ALTER PROCEDURE ResultsManagerProc
AS
BEGIN

SET NOCOUNT ON;

DECLARE @RecvReqDlgHandle AS UNIQUEIDENTIFIER;
DECLARE @Group AS UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg AS XML;
DECLARE @RecvReqMsgName AS sysname;
DECLARE @LogRecordsToKeep smallint = 10000;

--Reads from initiatorQueue and spools the results to a resultstable
WHILE (1 = 1)
     BEGIN
         BEGIN TRANSACTION;
         WAITFOR(
         RECEIVE TOP (1) @RecvReqDlgHandle = conversation_handle,
                         @RecvReqMsg = message_body,
                         @RecvReqMsgName = message_type_name,
                         @Group = conversation_group_id FROM InitiatorQueue), TIMEOUT 5000;

         IF (@@ROWCOUNT = 0)
             BEGIN
                 ROLLBACK;
                 BREAK;
             END
         IF @RecvReqMsgName = N'ReplyMessage'
         BEGIN
            
            END CONVERSATION @RecvReqDlgHandle;

            DECLARE @Result varchar(4);

            SELECT @Result=r.x.value('.','varchar(4)')
            FROM @RecvReqMsg.nodes('/Result') r(x)

            --Insert in results table
            IF(@Result = 'OK')
            BEGIN

                INSERT INTO AsynchLog (Command,[Duration(s)],Error)
                SELECT C.x.value('.','nvarchar(max)'), D.x.value('.','smallint'), 0
                FROM @RecvReqMsg.nodes('/Command') C(x)
                CROSS APPLY @RecvReqMsg.nodes('/Duration') D(x)

            END
            ELSE
            BEGIN

                INSERT INTO AsynchLog (Command,ErrorMessage,Error)
                SELECT C.x.value('.','nvarchar(max)'), E.x.value('.','varchar(2000)'), 0
                FROM @RecvReqMsg.nodes('/Command') C(x)
                CROSS APPLY @RecvReqMsg.nodes('/ErrorMessage') E(x);

            END

            ;WITH C AS
            (SELECT MAX(ID)-@LogRecordsToKeep KeepID FROM AsynchLog)
            DELETE FROM AsynchLog
            WHERE ID < (SELECT KeepID FROM C)

        END
        ELSE
             IF @RecvReqMsgName = N'<a class="linkclass" href="http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog">http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog</a>'
                 BEGIN
                     END CONVERSATION @RecvReqDlgHandle;
                 END
             ELSE
                 IF @RecvReqMsgName = N'<a class="linkclass" href="http://schemas.microsoft.com/SQL/ServiceBroker/Error">http://schemas.microsoft.com/SQL/ServiceBroker/Error</a>'
                     BEGIN
                         END CONVERSATION @RecvReqDlgHandle;
                     END
         COMMIT TRANSACTION;
     END
END
GO

ALTER QUEUE InitiatorQueue WITH ACTIVATION (STATUS = ON, PROCEDURE_NAME = ResultsManagerProc, MAX_QUEUE_READERS = 5, EXECUTE AS SELF);
GO

Similarly, here we’re monitoring the InitiatorQueue for new ReplyMessages and inserting the results in the logging table that we created at the beginning according to the message contents. The procedure is then set as the activation procedure of the queue.
I’ve implemented a basic and arbitrary way to keep the log table exceed a certain number of rows, of course this can be accomplished in various ways, depending on your requirements, this is just a proof of concept, it can’t be perfect, you have to earn your salary!

Then, let’s finally create the procedure that we’ll call for Asynchronously executing our tasks:

CREATE OR ALTER PROCEDURE Async_exec
(
 @SQL                NVARCHAR(MAX),
 @ConversationHandle UNIQUEIDENTIFIER = NULL OUTPUT,
 @Group              UNIQUEIDENTIFIER = NULL OUTPUT,
 @Lifetime           INT = 10000
)
AS

SET NOCOUNT ON;

         BEGIN
             BEGIN TRANSACTION;
             IF @Group IS NULL
                 BEGIN
                     BEGIN DIALOG @ConversationHandle 
                     FROM SERVICE [InitiatorService] 
                     TO SERVICE N'TargetService' 
                     ON CONTRACT [ParallelContract] 
                     WITH ENCRYPTION = OFF,
                     LIFETIME = @Lifetime;

                     SELECT @Group = conversation_group_id
                     FROM   sys.conversation_endpoints
                     WHERE  conversation_handle = @ConversationHandle;
                 END;
                 ELSE
                 BEGIN
                     BEGIN DIALOG @ConversationHandle 
                     FROM SERVICE [InitiatorService] 
                     TO SERVICE N'TargetService' 
                     ON CONTRACT [ParallelContract] 
                     WITH RELATED_CONVERSATION_GROUP = @Group, 
                     ENCRYPTION = OFF, LIFETIME = @Lifetime;
                 END;


           -- Send a message on the conversation           
             SEND ON CONVERSATION(@ConversationHandle) MESSAGE TYPE [RequestMessage](@SQL);
             --END CONVERSATION @ConversationHandle;
             COMMIT TRANSACTION;
         END;
GO

Here we’re simply opening up a conversation to the TargetQueue sending the SQL code as message payload.
A more important aspect implemented here, is the @Group parameter, already mentioned in the procedures above, that we’re going to use to keep track of the currently running tasks; A bunch of background tasks can be launched using a UNIQUEIDENTIFIER to keep track of this particular group exploiting the conversation group feature already available in Service Broker (i.e. all the task in queue will have the same conversation_group_id, which we can track)

CREATE OR ALTER FUNCTION AsynchIsStillRunning (@Group UNIQUEIDENTIFIER) 
RETURNS bit
AS
BEGIN

    IF (EXISTS (SELECT conversation_handle FROM InitiatorQueue WITH (NOLOCK) WHERE conversation_group_id = @Group))
        RETURN 1
    
    RETURN 0
END

You may have noticed that we’re not perfectly ending a conversation on both parties on both queues, this is intentional and for the following reasons:

  1. I want the threads to be fully decoupled and not having anybody waiting around only to do cleanups
  2. I want/need a way to track what has actually been processed without actually close the conversation, because the EndConversation message isn’t sent as a part of the original conversation group and hence I’ll lose any information regarding my running conversation groups
  3. This post is getting too long

As you may have noticed, to avoid endpoint leaking, we’re giving a limited Lifetime to each conversation, in our case 10K seconds by default because I’m lazy, this will kill off any open endpoint automatically after the lifetime span ensuring that no trash is left around.

Just for the sake of it, we can create a procedure to change the “DOP” dynamically in case of increased/decreased workload/weight of the single request:

CREATE OR ALTER PROCEDURE ChangeAsyncDOP (@DOP int)
AS
BEGIN

    DECLARE @SQL nvarchar(max) = N'ALTER QUEUE TargetQueue WITH ACTIVATION (MAX_QUEUE_READERS = ' + CONVERT(varchar(2),@DOP) + ');'
    EXEC sp_executesql @SQL

END
GO

Just an idea, but maybe we could use this to spin up more maintenance/background tasks during a maintenance window o when user activity is low? You never know!
Finally, let’s see if everything works as it should!

ChangeAsyncDOP @DOP=20
GO

DECLARE @SQL nvarchar(MAX)

SET @SQL = N'DECLARE @RandomNumber int = 0
DECLARE @MaxRandomNumber int =30
 
SET @RandomNumber = @MaxRandomNumber*RAND()

WAITFOR DELAY @RandomNumber'

DECLARE @Group UNIQUEIDENTIFIER = '0F0F0F0F-0E0E-0D0D-0C0C-0B0B0B0B0B0B'

EXEC Async_exec @SQL=@SQL, @Group=@Group
GO 10000

WITH D AS (
SELECT 
SUM([Duration(s)]) as SerialDuration, 
DATEDIFF(SECOND,MIN(CompletedAt),MAX(CompletedAt)) as ParallelDuration 
FROM AsynchLog)

SELECT SerialDuration, ParallelDuration, 
CAST(SerialDuration as decimal) /ParallelDuration as SpeedUp
FROM D

We’re just simulating a random execution duration with WAITFOR and a random number, launching 10K queries in the queue (set to DOP=20) and waiting for it to complete.

For this particularly computational-empy workflow, we have a 0.3% performance loss vs the ideal case, probably due to the time that the queue threads need to spin up and the exchange of messages, however, not bad.
The main thing would be to precisely evaluate the concurrency and resource utilization of such background tasks in order to avoid resource starving, locking, deadlocking and all that fun stuff.

Of course, this is a pretty simple case, using the Service Broker functionality this can be expanded in various ways, for example:

  • Cross Database Queries
  • Cross Instance Queries (to split the load between multiple servers)
  • Return the output of the Queries (e.g. send some data to process in background and let it slowly populate automatically in a table ready to use)
  • Set “DOP” = 1 and use it to accomplish a background serial workflow (as Service Broker guarantees the execution order of the queue)

About the author

Emanuele Meazzo

2 comments

  • Hi there!
    This is a great job! Thank you
    Let me ask you please. What is the point to close and open new conversation? It is my understanding that we can use a single conversation to send multiple messages before we close it.

    • You are right, but at this point I don’t even remember 😅
      It was probably just to keep everything clean an neat, as I didn’t want objects to start piling up

It's just Emanuele showing you what you can do in the Microsoft Data Platform

Emanuele Meazzo

My Social Profiles

If you're interested in following me around in the social world, here are some URLs for you