Reusing dialogs with a dialog pool


As noted in various Service Broker sources, it is often advantageous to minimize the overhead of creating dialogs to send messages on. This blog shows how to create a shared pool of dialogs to be able to reuse dialogs instead of creating new ones. The dialog pool is a variation of Remus Rusanu’s reusing and recycling conversations as shown in his blog. One of the main differences is that the dialog pool is keyed only on services and contract, not SPID. This allows the same SPID to obtain multiple dialogs from the pool should the need arise. As importantly, different SPIDs can reuse the same dialog sequentially instead of creating two of them. Measurements show equivalent performance using the dialog pool compared to the SPID-based reuse scheme.


 


The following code shows how to get, free and delete dialogs from a dialog pool table. Initially empty, a new dialog is created in the pool when a request for an existing free dialog cannot be met. Thus the pool will grow during bursts of high demand.


 


The dialog pool entries also contain creation time and send count fields that ease the auditing and “recycling” of dialogs in the pool based on application requirements. Recycling consists of gracefully ending an existing dialog between services and beginning a new one. If done prudently, this technique can ease the handling of dialog errors by limiting the number of messages affected. For example, the application may choose to contrain a dialog to a certain number of messages before it is recycled. This might also be done according to the age of a dialog. See the end of the usp_send procedure for an example of recycling.


 


An example application that exercises the dialog pool is also included.


 


————————————————————————–


— Dialog Pool Sample.


— This sample shows how to create and use a shared pool of reuseable dialogs.


— The purpose of reusing dialogs is to reduce the overhead of creating them.


— The sample also shows how dialogs in the pool can be “recycled” by deleting


— dialogs based on application criteria, such as number of messages sent.


— This sample is largely based on Remus Rusanu’s tutorials on reusing and


— recycling conversations (rusanu.com/blog).


— Contents: dialog pool and application using the pool.


—————————————————-


 


USE master



GO


 


————————————————————————–


— Create demo database section


————————————————————————–


 


IF EXISTS (SELECT name FROM sys.databases WHERE name = ‘SsbDemoDb’)


      DROP DATABASE [SsbDemoDb];


 


CREATE DATABASE [SsbDemoDb]



GO


 


USE [SsbDemoDb];


GO


 


— Create master key


IF NOT EXISTS(SELECT name FROM sys.symmetric_keys WHERE name = ‘##MS_DatabaseMasterKey##’)


      CREATE MASTER KEY ENCRYPTION BY PASSWORD=‘Password#123’



GO


 


————————————————————————–


— Dialog pool section


————————————————————————–


 


————————————————————————–


— The dialog pool table.


— Obtain a conversation handle using from service, to service, and contract.


— Also indicates age and usage of dialog for auditing purposes.


————————————————————————–


IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘DialogPool’)


      DROP TABLE [DialogPool]


GO


CREATE TABLE [DialogPool] (


      FromService SYSNAME NOT NULL,


      ToService SYSNAME NOT NULL,


      OnContract SYSNAME NOT NULL,


      Handle UNIQUEIDENTIFIER NOT NULL,


      OwnerSPID INT NOT NULL,


      CreationTime DATETIME NOT NULL,


      SendCount BIGINT NOT NULL,


      UNIQUE (Handle));


GO


 


————————————————————————–


— Get dialog procedure.


— Reuse a free dialog in the pool or create a new one in case


— no free dialogs exist.


— Input is from service, to service, and contract.


— Output is dialog handle and count of message previously sent on dialog.


————————————————————————–


IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_get_dialog’)


      DROP PROC usp_get_dialog


GO


CREATE PROCEDURE [usp_get_dialog] (


      @fromService SYSNAME,


      @toService SYSNAME,


      @onContract SYSNAME,


      @dialogHandle UNIQUEIDENTIFIER OUTPUT,


      @sendCount BIGINT OUTPUT)


AS


BEGIN


      SET NOCOUNT ON;


      DECLARE @dialog TABLE


      (


          FromService SYSNAME NOT NULL,


          ToService SYSNAME NOT NULL,


          OnContract SYSNAME NOT NULL,


          Handle UNIQUEIDENTIFIER NOT NULL,


          OwnerSPID INT NOT NULL,


          CreationTime DATETIME NOT NULL,


          SendCount BIGINT NOT NULL


      );


 


      — Try to claim an unused dialog in [DialogPool]


      — READPAST option avoids blocking on locked dialogs.


      BEGIN TRANSACTION;


      DELETE @dialog;


      UPDATE TOP(1) [DialogPool] WITH(READPAST)


             SET OwnerSPID = @@SPID


             OUTPUT INSERTED.* INTO @dialog


             WHERE FromService = @fromService


                   AND ToService = @toService


                   AND OnContract = @OnContract


                   AND OwnerSPID = 1;


      IF @@ROWCOUNT > 0


      BEGIN


           SET @dialogHandle = (SELECT Handle FROM @dialog);


           SET @sendCount = (SELECT SendCount FROM @dialog);          


      END


      ELSE


      BEGIN


           — No free dialogs: need to create a new one


           BEGIN DIALOG CONVERSATION @dialogHandle


                 FROM SERVICE @fromService


                 TO SERVICE @toService


                 ON CONTRACT @onContract


                 WITH ENCRYPTION = OFF;


           INSERT INTO [DialogPool]


                  (FromService, ToService, OnContract, Handle, OwnerSPID,


                      CreationTime, SendCount)


                  VALUES


                  (@fromService, @toService, @onContract, @dialogHandle, @@SPID,


                      GETDATE(), 0);


          SET @sendCount = 0;


      END


      COMMIT


END;


GO


 


————————————————————————–


— Free dialog procedure.


— Return the dialog to the pool.


— Inputs are dialog handle and updated send count.


————————————————————————–


IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_free_dialog’)


      DROP PROC usp_free_dialog


GO


CREATE PROCEDURE [usp_free_dialog] (


      @dialogHandle UNIQUEIDENTIFIER,


      @sendCount BIGINT)


AS


BEGIN


      SET NOCOUNT ON;


      DECLARE @rowcount INT;


      DECLARE @string VARCHAR(50);


 


      BEGIN TRANSACTION;


 


      — Release dialog by setting OwnerSPID to -1.


      UPDATE [DialogPool] SET OwnerSPID = 1, SendCount = @sendCount WHERE Handle = @dialogHandle;


      SELECT @rowcount = @@ROWCOUNT;


      IF @rowcount = 0


      BEGIN


           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));


           RAISERROR(‘usp_free_dialog: dialog %s not found in dialog pool’, 16, 1, @string) WITH LOG;


      END


      ELSE IF @rowcount > 1


      BEGIN


           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));


           RAISERROR(‘usp_free_dialog: duplicate dialog %s found in dialog pool’, 16, 1, @string) WITH LOG;


      END


 


      COMMIT


END;


GO


 


————————————————————————–


— Delete dialog procedure.


— Delete the dialog from the pool. This does not end the dialog.


— Input is dialog handle.


————————————————————————–


IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_delete_dialog’)


      DROP PROC usp_delete_dialog


GO


CREATE PROCEDURE [usp_delete_dialog] (


      @dialogHandle UNIQUEIDENTIFIER)


AS


BEGIN


      SET NOCOUNT ON;


 


      BEGIN TRANSACTION;


      DELETE [DialogPool] WHERE Handle = @dialogHandle;


      COMMIT


END;


GO


 


————————————————————————–


— Application setup section.


————————————————————————–


 


————————————————————————–


— Send messages from initiator to target.


— Initiator uses dialogs from the dialog pool.


— Initiator also retires dialogs based on application criteria,


— which results in recycling dialogs in the pool.


————————————————————————–


 


— This table stores the messages on the target side


IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘MsgTable’)


      DROP TABLE MsgTable


GO


CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))


GO


 


— Activated store proc for the initiator to receive messages.


CREATE PROCEDURE initiator_queue_activated_procedure


AS


BEGIN


     DECLARE @handle UNIQUEIDENTIFIER;


     DECLARE @message_type SYSNAME;


 


     BEGIN TRANSACTION;


     WAITFOR (


          RECEIVE TOP(1) @handle = [conversation_handle],


            @message_type = [message_type_name]


          FROM [SsbInitiatorQueue]), TIMEOUT 5000;


 


     IF @@ROWCOUNT = 1


     BEGIN


          — Expect target response to EndOfStream message.


          IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’


          BEGIN


               END CONVERSATION @handle;


          END


     END


     COMMIT


END;


GO


 


— Activated store proc for the target to receive messages.


CREATE PROCEDURE target_queue_activated_procedure


AS


BEGIN


    — Variable table for received messages.


    DECLARE @receive_table TABLE(


            queuing_order BIGINT,


            conversation_handle UNIQUEIDENTIFIER,


            message_type_name SYSNAME,


            message_body VARCHAR(MAX));


   


    — Cursor for received message table.


    DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY


            FOR SELECT


            conversation_handle,


            message_type_name,


            message_body


            FROM @receive_table ORDER BY queuing_order;


 


     DECLARE @conversation_handle UNIQUEIDENTIFIER;


     DECLARE @message_type SYSNAME;


     DECLARE @message_body VARCHAR(4000);


 


     — Error variables.


     DECLARE @error_number INT;


     DECLARE @error_message VARCHAR(4000);


     DECLARE @error_severity INT;


     DECLARE @error_state INT;


     DECLARE @error_procedure SYSNAME;


     DECLARE @error_line INT;


     DECLARE @error_dialog VARCHAR(50);


 


     BEGIN TRY


       WHILE (1 = 1)


       BEGIN


         BEGIN TRANSACTION;


   


         — Receive all available messages into the table.


         — Wait 5 seconds for messages.


         WAITFOR (


            RECEIVE


               [queuing_order],


               [conversation_handle],


               [message_type_name],


               CAST([message_body] AS VARCHAR(4000))


            FROM [SsbTargetQueue]


            INTO @receive_table


         ), TIMEOUT 5000;


   


         IF @@ROWCOUNT = 0


         BEGIN


              COMMIT;


              BREAK;


         END


         ELSE


         BEGIN


              OPEN message_cursor;


              WHILE (1=1)


              BEGIN


                  FETCH NEXT FROM message_cursor


                            INTO @conversation_handle,


                                 @message_type,


                                 @message_body;


       


                  IF (@@FETCH_STATUS != 0) BREAK;


 


                  — Process a message.


                  — If an exception occurs, catch and attempt to recover.


                  BEGIN TRY


 


                      IF @message_type = ‘SsbMsgType’


                      BEGIN


                          — process the msg. Here we will just insert it into a table


                          INSERT INTO MsgTable values(@message_type, @message_body);


                      END


                      ELSE IF @message_type = ‘EndOfStream’


                      BEGIN


                          — initiator is signaling end of message stream: end the dialog


                          END CONVERSATION @conversation_handle;


                      END


                      ELSE IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’


                      BEGIN


                           — If the message_type indicates that the message is an error,


                           — raise the error and end the conversation.


                           WITH XMLNAMESPACES (‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’ AS ssb)


                           SELECT


                           @error_number = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Code)[1]’, ‘INT’),


                           @error_message = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Description)[1]’, ‘VARCHAR(4000)’);


                           SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));


                           RAISERROR(‘Error in dialog %s: %s (%i)’, 16, 1, @error_dialog, @error_message, @error_number);


                           END CONVERSATION @conversation_handle;


                      END


                  END TRY


                  BEGIN CATCH


                     SET @error_number = ERROR_NUMBER();


                     SET @error_message = ERROR_MESSAGE();


                     SET @error_severity = ERROR_SEVERITY();


                     SET @error_state = ERROR_STATE();


                     SET @error_procedure = ERROR_PROCEDURE();


                     SET @error_line = ERROR_LINE();


             


                     IF XACT_STATE() = 1


                     BEGIN


                          — The transaction is doomed. Only rollback possible.


                          — This could disable the queue if done 5 times consecutively!


                          ROLLBACK TRANSACTION;


             


                          — Record the error.


                          BEGIN TRANSACTION;


                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


                                 @error_severity, @error_state, @error_procedure, @error_line, 1);


                          COMMIT;


 


                          — For this level of error, it is best to exit the proc


                          — and give the queue monitor control.


                          — Breaking to the outer catch will accomplish this.


                          RAISERROR (‘Message processing error’, 16, 1);


                     END


                     ELSE IF XACT_STATE() = 1


                     BEGIN


                          — Record error and continue processing messages.


                          — Failing message could also be put aside for later processing here.


                          — Otherwise it will be discarded.


                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


                                 @error_severity, @error_state, @error_procedure, @error_line, 0);


                     END


                  END CATCH


              END


              CLOSE message_cursor;


              DELETE @receive_table;


         END


         COMMIT;


       END


     END TRY


     BEGIN CATCH


         — Process the error and exit the proc to give the queue monitor control


         SET @error_number = ERROR_NUMBER();


         SET @error_message = ERROR_MESSAGE();


         SET @error_severity = ERROR_SEVERITY();


         SET @error_state = ERROR_STATE();


         SET @error_procedure = ERROR_PROCEDURE();


         SET @error_line = ERROR_LINE();


 


         IF XACT_STATE() = 1


         BEGIN


              — The transaction is doomed. Only rollback possible.


              — This could disable the queue if done 5 times consecutively!


              ROLLBACK TRANSACTION;


 


              — Record the error.


              BEGIN TRANSACTION;


              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


                     @error_severity, @error_state, @error_procedure, @error_line, 1);


              COMMIT;


         END


         ELSE IF XACT_STATE() = 1


         BEGIN


              — Record error and commit transaction.


              — Here you could also save anything else you want before exiting.


              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


                     @error_severity, @error_state, @error_procedure, @error_line, 0);


              COMMIT;


         END


     END CATCH


END;


GO


 


— Table to store processing errors.


IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘target_processing_errors’)


      DROP TABLE target_processing_errors;


GO


 


CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,


       error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,


       error_line INT, doomed_transaction TINYINT)


GO


 


— Create Initiator and Target side SSB entities


CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;


CREATE MESSAGE TYPE EndOfStream;


CREATE CONTRACT SsbContract


       (


        SsbMsgType SENT BY INITIATOR,


        EndOfStream SENT BY INITIATOR


       );


CREATE QUEUE SsbInitiatorQueue


      WITH ACTIVATION (


            STATUS = ON,


            MAX_QUEUE_READERS = 1,


            PROCEDURE_NAME = [initiator_queue_activated_procedure],


            EXECUTE AS OWNER);


CREATE QUEUE SsbTargetQueue


      WITH ACTIVATION (


            STATUS = ON,


            MAX_QUEUE_READERS = 1,


            PROCEDURE_NAME = [target_queue_activated_procedure],


            EXECUTE AS OWNER);


 


CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue;


CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract);


GO


 


— SEND procedure. Uses a dialog from the dialog pool.



IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_send’)


      DROP PROC usp_send


GO


CREATE PROCEDURE [usp_send] (


      @fromService SYSNAME,


      @toService SYSNAME,


      @onContract SYSNAME,


      @messageType SYSNAME,


      @messageBody NVARCHAR(MAX))


AS


BEGIN


      SET NOCOUNT ON;


      DECLARE @dialogHandle UNIQUEIDENTIFIER;


      DECLARE @sendCount BIGINT;     


      DECLARE @counter INT;


      DECLARE @error INT;


 


      SELECT @counter = 1;


      BEGIN TRANSACTION;


      — Will need a loop to retry in case the dialog is


      — in a state that does not allow transmission


     


      WHILE (1=1)


      BEGIN


            — Claim a dialog from the dialog pool.


            — A new one will be created if none are available.


           


            EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT;


 


            — Attempt to SEND on the dialog


           


            IF (@messageBody IS NOT NULL)


            BEGIN


                  — If the @messageBody is not null it must be sent explicitly


                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);


            END


            ELSE


            BEGIN


                  — Messages with no body must *not* specify the body,


                  — cannot send a NULL value argument


                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;


            END


                 


            SELECT @error = @@ERROR;


            IF @error = 0


            BEGIN


                  — Successful send, increment count and exit the loop


                 


                  SET @sendCount = @sendCount + 1;


                  BREAK;


            END


           


            SELECT @counter = @counter+1;


            IF @counter > 10


            BEGIN


                  — We failed 10 times in a  row, something must be broken


                 


                  RAISERROR(‘Failed to SEND on a conversation for more than 10 times. Error %i.’, 16, 1, @error) WITH LOG;


            BREAK;


            END


 


            — Delete the associated dialog from the table and try again


           


            EXEC usp_delete_dialog @dialogHandle;


            SELECT @dialogHandle = NULL;


      END


 


      — “Criterion” for dialog pool removal is send count > 1000.


      — Modify to suit application.


      — When deleting also inform the target to end the dialog.


      IF @sendCount > 1000


      BEGIN


         EXEC usp_delete_dialog @dialogHandle ;


         SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];


      END


      ELSE


      BEGIN


         — Free the dialog.


         EXEC usp_free_dialog @dialogHandle, @sendCount;


      END


      COMMIT


END;


GO


 


————————————————————————————


— Run application section


————————————————————————————


 


— Send some messages


exec usp_send N’SsbInitiatorService’, N’SsbTargetService’, N’SsbContract’, N’SsbMsgType’, N'<xml>This is a well formed XML Message1.</xml>’


exec usp_send N’SsbInitiatorService’, N’SsbTargetService’, N’SsbContract’, N’SsbMsgType’, N'<xml>This is a well formed XML Message2.</xml>’


exec usp_send N’SsbInitiatorService’, N’SsbTargetService’, N’SsbContract’, N’SsbMsgType’, N'<xml>This is a well formed XML Message3.</xml>’


exec usp_send N’SsbInitiatorService’, N’SsbTargetService’, N’SsbContract’, N’SsbMsgType’, N'<xml>This is a well formed XML Message4.</xml>’


exec usp_send N’SsbInitiatorService’, N’SsbTargetService’, N’SsbContract’, N’SsbMsgType’, N'<xml>This is a well formed XML Message5.</xml>’


GO


 


— Show the dialog pool


SELECT * FROM [DialogPool]


GO


 


— Show the dialogs used.


SELECT * FROM sys.conversation_endpoints;


GO


 


— Check whether the TARGET side has processed the messages


SELECT * FROM MsgTable


GO


TRUNCATE TABLE MsgTable


GO


 


 

Comments (1)

  1. About This Document SQL Server Service Broker(SSB) is a message queueing infrastructure that is integrated