Reusing dialogs with a dialog pool
As noted in various Service Broker sources, it is often advantageous to minimize the overhead of creating dialogs to send messages on. This blog shows how to create a shared pool of dialogs to be able to reuse dialogs instead of creating new ones. The dialog pool is a variation of Remus Rusanu's reusing and recycling conversations as shown in his blog. One of the main differences is that the dialog pool is keyed only on services and contract, not SPID. This allows the same SPID to obtain multiple dialogs from the pool should the need arise. As importantly, different SPIDs can reuse the same dialog sequentially instead of creating two of them. Measurements show equivalent performance using the dialog pool compared to the SPID-based reuse scheme.
The following code shows how to get, free and delete dialogs from a dialog pool table. Initially empty, a new dialog is created in the pool when a request for an existing free dialog cannot be met. Thus the pool will grow during bursts of high demand.
The dialog pool entries also contain creation time and send count fields that ease the auditing and "recycling" of dialogs in the pool based on application requirements. Recycling consists of gracefully ending an existing dialog between services and beginning a new one. If done prudently, this technique can ease the handling of dialog errors by limiting the number of messages affected. For example, the application may choose to contrain a dialog to a certain number of messages before it is recycled. This might also be done according to the age of a dialog. See the end of the usp_send procedure for an example of recycling.
An example application that exercises the dialog pool is also included.
--------------------------------------------------------------------------
-- Dialog Pool Sample.
-- This sample shows how to create and use a shared pool of reuseable dialogs.
-- The purpose of reusing dialogs is to reduce the overhead of creating them.
-- The sample also shows how dialogs in the pool can be "recycled" by deleting
-- dialogs based on application criteria, such as number of messages sent.
-- This sample is largely based on Remus Rusanu's tutorials on reusing and
-- recycling conversations (rusanu.com/blog).
-- Contents: dialog pool and application using the pool.
----------------------------------------------------
USE master
GO
--------------------------------------------------------------------------
-- Create demo database section
--------------------------------------------------------------------------
IF EXISTS (SELECT name FROM sys.databases WHERE name = 'SsbDemoDb')
DROP DATABASE [SsbDemoDb];
CREATE DATABASE [SsbDemoDb]
GO
USE [SsbDemoDb];
GO
-- Create master key
IF NOT EXISTS(SELECT name FROM sys.symmetric_keys WHERE name = '##MS_DatabaseMasterKey##')
CREATE MASTER KEY ENCRYPTION BY PASSWORD='Password#123'
GO
--------------------------------------------------------------------------
-- Dialog pool section
--------------------------------------------------------------------------
--------------------------------------------------------------------------
-- The dialog pool table.
-- Obtain a conversation handle using from service, to service, and contract.
-- Also indicates age and usage of dialog for auditing purposes.
--------------------------------------------------------------------------
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'DialogPool')
DROP TABLE [DialogPool]
GO
CREATE TABLE [DialogPool] (
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
OwnerSPID INT NOT NULL,
CreationTime DATETIME NOT NULL,
SendCount BIGINT NOT NULL,
UNIQUE (Handle));
GO
--------------------------------------------------------------------------
-- Get dialog procedure.
-- Reuse a free dialog in the pool or create a new one in case
-- no free dialogs exist.
-- Input is from service, to service, and contract.
-- Output is dialog handle and count of message previously sent on dialog.
--------------------------------------------------------------------------
IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_get_dialog')
DROP PROC usp_get_dialog
GO
CREATE PROCEDURE [usp_get_dialog] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@dialogHandle UNIQUEIDENTIFIER OUTPUT,
@sendCount BIGINT OUTPUT)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialog TABLE
(
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
OwnerSPID INT NOT NULL,
CreationTime DATETIME NOT NULL,
SendCount BIGINT NOT NULL
);
-- Try to claim an unused dialog in [DialogPool]
-- READPAST option avoids blocking on locked dialogs.
BEGIN TRANSACTION;
DELETE @dialog;
UPDATE TOP(1) [DialogPool] WITH(READPAST)
SET OwnerSPID = @@SPID
OUTPUT INSERTED.* INTO @dialog
WHERE FromService = @fromService
AND ToService = @toService
AND OnContract = @OnContract
AND OwnerSPID = -1;
IF @@ROWCOUNT > 0
BEGIN
SET @dialogHandle = (SELECT Handle FROM @dialog);
SET @sendCount = (SELECT SendCount FROM @dialog);
END
ELSE
BEGIN
-- No free dialogs: need to create a new one
BEGIN DIALOG CONVERSATION @dialogHandle
FROM SERVICE @fromService
TO SERVICE @toService
ON CONTRACT @onContract
WITH ENCRYPTION = OFF;
INSERT INTO [DialogPool]
(FromService, ToService, OnContract, Handle, OwnerSPID,
CreationTime, SendCount)
VALUES
(@fromService, @toService, @onContract, @dialogHandle, @@SPID,
GETDATE(), 0);
SET @sendCount = 0;
END
COMMIT
END;
GO
--------------------------------------------------------------------------
-- Free dialog procedure.
-- Return the dialog to the pool.
-- Inputs are dialog handle and updated send count.
--------------------------------------------------------------------------
IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_free_dialog')
DROP PROC usp_free_dialog
GO
CREATE PROCEDURE [usp_free_dialog] (
@dialogHandle UNIQUEIDENTIFIER,
@sendCount BIGINT)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @rowcount INT;
DECLARE @string VARCHAR(50);
BEGIN TRANSACTION;
-- Release dialog by setting OwnerSPID to -1.
UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle;
SELECT @rowcount = @@ROWCOUNT;
IF @rowcount = 0
BEGIN
SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
RAISERROR('usp_free_dialog: dialog %s not found in dialog pool', 16, 1, @string) WITH LOG;
END
ELSE IF @rowcount > 1
BEGIN
SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
RAISERROR('usp_free_dialog: duplicate dialog %s found in dialog pool', 16, 1, @string) WITH LOG;
END
COMMIT
END;
GO
--------------------------------------------------------------------------
-- Delete dialog procedure.
-- Delete the dialog from the pool. This does not end the dialog.
-- Input is dialog handle.
--------------------------------------------------------------------------
IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_delete_dialog')
DROP PROC usp_delete_dialog
GO
CREATE PROCEDURE [usp_delete_dialog] (
@dialogHandle UNIQUEIDENTIFIER)
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRANSACTION;
DELETE [DialogPool] WHERE Handle = @dialogHandle;
COMMIT
END;
GO
--------------------------------------------------------------------------
-- Application setup section.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
-- Send messages from initiator to target.
-- Initiator uses dialogs from the dialog pool.
-- Initiator also retires dialogs based on application criteria,
-- which results in recycling dialogs in the pool.
--------------------------------------------------------------------------
-- This table stores the messages on the target side
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'MsgTable')
DROP TABLE MsgTable
GO
CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))
GO
-- Activated store proc for the initiator to receive messages.
CREATE PROCEDURE initiator_queue_activated_procedure
AS
BEGIN
DECLARE @handle UNIQUEIDENTIFIER;
DECLARE @message_type SYSNAME;
BEGIN TRANSACTION;
WAITFOR (
RECEIVE TOP(1) @handle = [conversation_handle],
@message_type = [message_type_name]
FROM [SsbInitiatorQueue]), TIMEOUT 5000;
IF @@ROWCOUNT = 1
BEGIN
-- Expect target response to EndOfStream message.
IF @message_type = 'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @handle;
END
END
COMMIT
END;
GO
-- Activated store proc for the target to receive messages.
CREATE PROCEDURE target_queue_activated_procedure
AS
BEGIN
-- Variable table for received messages.
DECLARE @receive_table TABLE(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name SYSNAME,
message_body VARCHAR(MAX));
-- Cursor for received message table.
DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
FOR SELECT
conversation_handle,
message_type_name,
message_body
FROM @receive_table ORDER BY queuing_order;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_type SYSNAME;
DECLARE @message_body VARCHAR(4000);
-- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
DECLARE @error_severity INT;
DECLARE @error_state INT;
DECLARE @error_procedure SYSNAME;
DECLARE @error_line INT;
DECLARE @error_dialog VARCHAR(50);
BEGIN TRY
WHILE (1 = 1)
BEGIN
BEGIN TRANSACTION;
-- Receive all available messages into the table.
-- Wait 5 seconds for messages.
WAITFOR (
RECEIVE
[queuing_order],
[conversation_handle],
[message_type_name],
CAST([message_body] AS VARCHAR(4000))
FROM [SsbTargetQueue]
INTO @receive_table
), TIMEOUT 5000;
IF @@ROWCOUNT = 0
BEGIN
COMMIT;
BREAK;
END
ELSE
BEGIN
OPEN message_cursor;
WHILE (1=1)
BEGIN
FETCH NEXT FROM message_cursor
INTO @conversation_handle,
@message_type,
@message_body;
IF (@@FETCH_STATUS != 0) BREAK;
-- Process a message.
-- If an exception occurs, catch and attempt to recover.
BEGIN TRY
IF @message_type = 'SsbMsgType'
BEGIN
-- process the msg. Here we will just insert it into a table
INSERT INTO MsgTable values(@message_type, @message_body);
END
ELSE IF @message_type = 'EndOfStream'
BEGIN
-- initiator is signaling end of message stream: end the dialog
END CONVERSATION @conversation_handle;
END
ELSE IF @message_type = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- If the message_type indicates that the message is an error,
-- raise the error and end the conversation.
WITH XMLNAMESPACES ('https://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
SELECT
@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);
END CONVERSATION @conversation_handle;
END
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
-- For this level of error, it is best to exit the proc
-- and give the queue monitor control.
-- Breaking to the outer catch will accomplish this.
RAISERROR ('Message processing error', 16, 1);
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and continue processing messages.
-- Failing message could also be put aside for later processing here.
-- Otherwise it will be discarded.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
END
END CATCH
END
CLOSE message_cursor;
DELETE @receive_table;
END
COMMIT;
END
END TRY
BEGIN CATCH
-- Process the error and exit the proc to give the queue monitor control
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and commit transaction.
-- Here you could also save anything else you want before exiting.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
COMMIT;
END
END CATCH
END;
GO
-- Table to store processing errors.
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_processing_errors')
DROP TABLE target_processing_errors;
GO
CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
error_line INT, doomed_transaction TINYINT)
GO
-- Create Initiator and Target side SSB entities
CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE EndOfStream;
CREATE CONTRACT SsbContract
(
SsbMsgType SENT BY INITIATOR,
EndOfStream SENT BY INITIATOR
);
CREATE QUEUE SsbInitiatorQueue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = [initiator_queue_activated_procedure],
EXECUTE AS OWNER);
CREATE QUEUE SsbTargetQueue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = [target_queue_activated_procedure],
EXECUTE AS OWNER);
CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue;
CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract);
GO
-- SEND procedure. Uses a dialog from the dialog pool.
--
IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_send')
DROP PROC usp_send
GO
CREATE PROCEDURE [usp_send] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@messageType SYSNAME,
@messageBody NVARCHAR(MAX))
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialogHandle UNIQUEIDENTIFIER;
DECLARE @sendCount BIGINT;
DECLARE @counter INT;
DECLARE @error INT;
SELECT @counter = 1;
BEGIN TRANSACTION;
-- Will need a loop to retry in case the dialog is
-- in a state that does not allow transmission
--
WHILE (1=1)
BEGIN
-- Claim a dialog from the dialog pool.
-- A new one will be created if none are available.
--
EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT;
-- Attempt to SEND on the dialog
--
IF (@messageBody IS NOT NULL)
BEGIN
-- If the @messageBody is not null it must be sent explicitly
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
END
ELSE
BEGIN
-- Messages with no body must *not* specify the body,
-- cannot send a NULL value argument
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
END
SELECT @error = @@ERROR;
IF @error = 0
BEGIN
-- Successful send, increment count and exit the loop
--
SET @sendCount = @sendCount + 1;
BREAK;
END
SELECT @counter = @counter+1;
IF @counter > 10
BEGIN
-- We failed 10 times in a row, something must be broken
--
RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
BREAK;
END
-- Delete the associated dialog from the table and try again
--
EXEC usp_delete_dialog @dialogHandle;
SELECT @dialogHandle = NULL;
END
-- "Criterion" for dialog pool removal is send count > 1000.
-- Modify to suit application.
-- When deleting also inform the target to end the dialog.
IF @sendCount > 1000
BEGIN
EXEC usp_delete_dialog @dialogHandle ;
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];
END
ELSE
BEGIN
-- Free the dialog.
EXEC usp_free_dialog @dialogHandle, @sendCount;
END
COMMIT
END;
GO
------------------------------------------------------------------------------------
-- Run application section
------------------------------------------------------------------------------------
-- Send some messages
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message1.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message2.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message3.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message4.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message5.</xml>'
GO
-- Show the dialog pool
SELECT * FROM [DialogPool]
GO
-- Show the dialogs used.
SELECT * FROM sys.conversation_endpoints;
GO
-- Check whether the TARGET side has processed the messages
SELECT * FROM MsgTable
GO
TRUNCATE TABLE MsgTable
GO
Comments
- Anonymous
December 17, 2008
About This Document SQL Server Service Broker(SSB) is a message queueing infrastructure that is integrated