Репликация средствами Change Tracking. Те же и сервис-брокер.
Предыдущие посты по этой теме:
· Репликация таблиц средствами Change Tracking
· Краткое введение в сервис-брокер
· Репликация средствами Change Tracking. Небольшое упражнение на FOR XML PATH и XQuery.
В посте Репликация таблиц средствами Change Tracking мы рассмотрели вариант синхронизации таблиц tbl_1 и tbl_2 при помощи появившегося в SQL Server 2008 механизма отслеживания изменений Change Tracking. В данном посте мы разовьем этот сценарий на случай, когда таблицы находятся на разных серверах. В качестве транспорта будет использоваться появившийся в SQL Server 2005 механизм асинхронного взаимодействия сервис-брокер. Таблица tbl_1 будет находиться на сервере Маша. Change Tracking будет отслеживать происходящие над ней изменения (delete, insert, update). Эти изменения будут превращаться в XML-сообщение и доставляться сервис-брокером на сервер Дубровский, где XML превратится обратно в DML-команды, которые будут применены к таблице tbl_2 на этом сервере. Вместо сервис-брокера можно задействовать свой транспорт по доставке XML, тогда этот сценарий может применяться, когда Маша и Дубровский оба SQL Expressы. Сервис-брокер входит в состав SQL Express, однако два SQL Expressа через него общаться не могут. Необходимо, чтобы хотя бы одна сторона имела взрослую редакцию.
В упрощенном примере обе таблицы будут находиться на одном сервере в одной базе ChangeTracking_Test.
use tempdb
if exists(select 1 from sys.databases where name = 'ChangeTracking_Test') begin
alter database ChangeTracking_Test set single_user with rollback immediate
drop database ChangeTracking_Test
end
create database ChangeTracking_Test
use ChangeTracking_Test
Скрипт 1
В базе со стороны Маши будут работать два процесса: имитация пользовательской активности, вносящая в tbl_1 случайные изменения, и периодическая синхронизация. Чтобы заморозить tbl_1 на момент синхронизации, используется уровень изоляции snapshot. Его нужно включить на базе со стороны Маши:
alter database ChangeTracking_Test set single_user with rollback immediate
alter database ChangeTracking_Test set read_committed_snapshot on
alter database ChangeTracking_Test set multi_user
alter database ChangeTracking_Test set allow_snapshot_isolation on
Скрипт 2
На базе со стороны Маши должен быть поднят Change Tracking:
if not exists (select 1 from sys.change_tracking_databases where database_id = db_id('ChangeTracking_Test'))
alter database ChangeTracking_Test set change_tracking = on (change_retention = 10 minutes, auto_cleanup = on)
Скрипт 3
Со стороны Маши и со стороны Дубровского должен быть задействован сервис-брокер. Практикой хорошего тона при использовании сервис-брокера является иметь мастер-ключ на базе, чтобы потом не вылезла ошибка Краткое введение в сервис-брокер\Скрипт 3.
create master key encryption by password = 'AbraCadabra'
if (select is_broker_enabled from sys.databases where name = 'ChangeTracking_Test') = 0
alter database ChangeTracking_Test set enable_broker with rollback immediate
Скрипт 4
На стороне Маши имеется таблица tbl_1
if object_id('dbo.tbl_1', 'U') is not null drop table tbl_1
create table tbl_1 (
id1 int identity,
id2 int default (datepart(ns, sysdatetime()) / 100),
fld1 varchar(10),
fld2 sql_variant,
primary key(id1, id2)
)
Скрипт 5
с которой будет синхронизироваться таблица tbl_2 на стороне Дубровского:
if object_id('dbo.tbl_2', 'U') is not null drop table tbl_2
create table tbl_2 (
id1 int,
id2 int,
fld1 varchar(10),
fld2 sql_variant,
primary key(id1, id2)
)
Скрипт 6
Включаем отслеживание изменений по tbl_1:
if not exists (select 1 from sys.change_tracking_tables where object_id = object_id('tbl_1'))
alter table tbl_1 enable change_tracking
Скрипт 7
На стороне Маши создаем вспомогательную таблицу для хранения предыдущей версии синхронизации и журналирования результатов
use ChangeTracking_Test
if object_id('dbo.Sync_Log', 'U') is not null drop table dbo.Sync_Log
create table dbo.Sync_Log (dt datetime default sysdatetime(), version bigint default change_tracking_current_version(),
source sysname, destination sysname, status nvarchar(200),
deleted bigint, inserted bigint, updated bigint)
insert dbo.Sync_Log (version, source, destination)
values (change_tracking_min_valid_version(object_id('dbo.tbl_1')), 'tbl_1', 'tbl_2')
Скрипт 8
Работающий на периодической основе со стороны Маши скрипт синхронизации будет отлавливать при помощи функции ChangeTable изменения в tbl_1 и превращать их в XML вида Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. Я для него даже создал схему:
if exists (select 1 from sys.xml_schema_collections where name = 'CT_Changes_tbl_1_xsd') drop xml schema collection CT_Changes_tbl_1_xsd
create xml schema collection CT_Changes_tbl_1_xsd as
N'<?xml version="1.0" encoding="utf-16"?>
<xs:schema xmlns:xs="https://www.w3.org/2001/XMLSchema">
<xs:element name="CT_Changes">
<xs:complexType>
<xs:sequence>
<xs:element ref="Record" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="table_name" type="xs:string" use="required" />
<xs:attribute name="version_since" type="xs:long" use="required" />
<xs:attribute name="version_upto" type="xs:long" use="required" />
</xs:complexType>
</xs:element>
<xs:element name="Record">
<xs:complexType>
<xs:sequence>
<xs:element ref="PK" minOccurs ="1" maxOccurs ="1"/>
<xs:element name="fld1" type="xs:string" minOccurs="0" />
<xs:element name="fld2" type="xs:string" minOccurs="0" />
</xs:sequence>
<xs:attribute name="operation" type="xs:string" use="required" />
<xs:attribute name="change_no" type="xs:long" use="required" />
<xs:attribute name="commit_time" type="xs:dateTime" use="required" />
</xs:complexType>
</xs:element>
<xs:element name="PK">
<xs:complexType>
<xs:sequence>
<xs:element name="id1" type="xs:int" />
<xs:element name="id2" type="xs:int" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>'
Скрипт 9
Конфигурируем сервис-брокер, создавая тип сообщения для передачи изменений (он будет валидироваться схемой CT_Changes_tbl_1_xsd), контракт, по которому будут передаваться сообщения этого типа (имена чувствительны к регистру невзирая на коллацию), очереди для сообщений, сервисы как конечные точки и открываем диалог, в рамках которого сервис Маша будет передавать сервису Дубровский сообщения по только что определенному контракту, короче, Краткое введение в сервис-брокер\Скрипты 4 - 9.
if exists(select 1 from sys.services where name = 'Masha') drop service Masha
if exists(select 1 from sys.services where name = 'Dubrovsky') drop service Dubrovsky
if exists(select 1 from sys.service_contracts where name = 'CT_Changes_tbl_1_Contract')
drop contract CT_Changes_tbl_1_Contract
if exists(select 1 from sys.service_message_types where name = 'CT_Changes_tbl_1_MessageType')
drop message type CT_Changes_tbl_1_MessageType
create message type CT_Changes_tbl_1_MessageType validation = valid_xml with schema collection CT_Changes_tbl_1_xsd
create contract CT_Changes_tbl_1_Contract (CT_Changes_tbl_1_MessageType sent by initiator)
if exists(select 1 from sys.service_queues where name = 'QueueMashi') drop queue QueueMashi
create queue QueueMashi
if exists(select 1 from sys.service_queues where name = 'QueueDubrovskogo') drop queue QueueDubrovskogo
create queue QueueDubrovskogo
create service Masha on queue QueueMashi (CT_Changes_tbl_1_Contract)
create service Dubrovsky on queue QueueDubrovskogo (CT_Changes_tbl_1_Contract)
declare @ch uniqueidentifier
begin dialog conversation @ch from service Masha to service 'Dubrovsky' on contract CT_Changes_tbl_1_Contract
Скрипт 10
С отдельного коннекта в SSMS на стороне Маши запускаем имитацию пользовательской активности Репликация таблиц средствами Change Tracking\Скрипт 5.
Модифицируем скрипт синхронизации Репликация таблиц средствами Change Tracking\Скрипт 7. Скрипт состоит из 3-х частей, разделенных комментарными линиями. Первая часть осталась без изменений. Вторая изменена с тем, чтобы он не напрямую применял изменения к tbl_2, а превращал их в XML (Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 3) и кидал в очередь брокеру. Третья, как и первая, осталась без изменений. Она опциональна. В ней я пользуюсь тем, что таблицы на самом деле находятся в одной базе, и сравниваю их, чтобы убедиться, что синхронизация работает.
while 1 = 1 begin
-----------------------------------------------------------------------------------------------------
waitfor delay '00:01:00'
set transaction isolation level snapshot
begin tran
declare @lastVersion bigint --здесь будет храниться последняя версия, которой синхронизирована tbl_2
declare @curVersion table (curVersion bigint); delete from @curVersion --здесь будет храниться текущая версия изменений
select @lastVersion = isnull(max(version), 0) from dbo.Sync_Log where source = 'tbl_1' and destination = 'tbl_2' --берем последнюю версию из нашего журнала
insert dbo.Sync_Log (source, destination) output inserted.Version into @curVersion values ('tbl_1', 'tbl_2') --отмечаем в журнале текущий факт синхронизации
--Если autocleanup успел почистить изменения tbl_1, которые еще не были доставлены на tbl_2, поднимаем аварийную ситуацию.
if @lastVersion < change_tracking_min_valid_version(object_id('dbo.tbl_1')) begin
declare @msg nvarchar(200) = 'Часть изменений потеряна! Требуется ручная синхронизация!'
update dbo.Sync_Log set status = @msg where version = (select curVersion from @curVersion) --фиксируем ее в журнале
raiserror (@msg, 21, 1) with log --и вызываем строгую ошибку, которая прерывает выполнение скрипта
end
--Если за период с прошлой синхронизации ничего нового не произошло, можно не париться.
if @lastVersion = change_tracking_current_version() goto konec
-----------------------------------------------------------------------------------------------------
--Превращаем результат changetable в xml и передаем его в очередь.
declare @x xml = (
select 'tbl_1' as [@table_name], @lastVersion as [@version_since], change_tracking_current_version() as [@version_upto],
(
select ct.SYS_CHANGE_OPERATION as [@operation], ct.SYS_CHANGE_VERSION as [@change_no], sct.commit_time as [@commit_time],
ct.id1 as [PK/id1], ct.id2 as [PK/id2], t.fld1 as fld1, t.fld2 as fld2
from changetable(changes tbl_1, @lastVersion) ct
join sys.dm_tran_commit_table sct on ct.sys_change_version = sct.commit_ts
left join tbl_1 t on t.id1 = ct.id1 and t.id2 = ct.id2
for xml path('Record'), type
)
for xml path('CT_Changes')
)
declare @ch uniqueidentifier =
(
select top 1 ce.conversation_handle from sys.conversation_endpoints ce join
sys.services s on ce.service_id = s.service_id
join sys.service_queues sq on s.service_queue_id = sq.object_id
where s.name = 'Masha' and ce.far_service = 'Dubrovsky' and ce.is_initiator = 1 and ce.state <> 'ER'
) --всегда открыт только один диалог, инициатором которого является Маша
;send on conversation @ch message type CT_Changes_tbl_1_MessageType (@x) --в него и зафигачиваем этот XML
-----------------------------------------------------------------------------------------------------
konec:
--Сравнение копии с оригиналом.
declare @n1 bigint, @n2 bigint
select @n1 = count(1) from (select * from tbl_1 except select * from tbl_2) t --сколько записей в оригинале не хватает в копии
select @n2 = count(1) from (select * from tbl_2 except select * from tbl_1) t --и наоборот
update dbo.Sync_Log set status = case when @n1 <> 0 or @n2 <> 0 then 'Обнаружено ' + cast(@n1 as varchar(20)) + ' записей в tbl_1, не совпадающих с tbl_2, и ' + cast(@n2 as varchar(20)) + ' записей в tbl_2, не совпадающих с tbl_1.' else 'OK' end where version = (select curVersion from @curVersion) --отражаем несовпадения в журнале, а если их нет, то ОК
commit
set transaction isolation level read committed
end
Скрипт 11
Запускаем этот скрипт с нового коннекта в SSMS. Каждую минуту в очередь Дубровского будет капать сообщение от Маши. Можно их посмотреть select *, cast(message_body as xml) from QueueDubrovskogo и убедиться, что в message_body приходит нечто по образу Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. На несовпадения между tbl_1 и tbl_2, о которых сообщается в таблице Sync_Log, пока не обращаем внимания. Мы убедились, что Change Tracking исправно отслеживает изменения в tbl_1, а сервис-брокер исправно доставляет их на сервер с tbl_2. Теперь на Дубровском напишем процедуру очереди, которая будет разгребать валящиеся сообщения, превращать XML обратно в операторы DML и применять их к tbl_2, чтобы синхронизировать ее с tbl_1. Джойним tbl_2 по полям РК c записями, полученными из XML. Те, у которых operation="D", удаляются, "I" - вставляются, "U" - обновляются. Апдейты dbo.Sync_Log, которые идут после каждой операции, предназначены для контрольных целей. По-хорошему, Sync_Log нужно было разделить, сделав журнал на стороне отправки и журнал на стороне приема. Я не стал этим заморачиваться, беззастенчиво воспользовавшись тем, что в данном примере стороны физически совпадают. Процедура очереди будет написана на основе Краткое введение в сервис-брокер\Скрипты 21-22 и Небольшое упражнение на FOR XML PATH и XQuery\Скрипты 4-5.
if object_id('ProcessSyncMessages', 'P') is not null drop proc ProcessSyncMessages
go
--Процедура производит синхронизацию tbl_2 с tbl_1
create proc ProcessSyncMessages as begin
declare @ch uniqueidentifier, @msgtype sysname, @body varbinary(max)
--Читаем из очереди сообщение с изменениями
while 1 = 1 begin
waitfor (receive top(1) @ch = conversation_handle, @msgtype = message_type_name, @body = message_body from QueueDubrovskogo)
if @@rowcount = 0 return
if @msgtype <> 'CT_Changes_tbl_1_MessageType' return
select @@rowcount, @msgtype
declare @x xml = @body
declare @curVersion bigint = (select x.value('@version_upto[1]', 'bigint') from @x.nodes('CT_Changes') d(x))
--Удаляем удаленные записи
delete t from tbl_2 t join (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2 from @x.nodes('CT_Changes/Record[@operation="D"]') d(x)) ct on t.id1 = ct.id1 and t.id2 = ct.id2
update dbo.Sync_Log set deleted = @@rowcount where version = @curVersion --их количество вносим в журнал
--Добавляем новые
insert tbl_2 select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="I"]') d(x)
update dbo.Sync_Log set inserted = @@rowcount where version = @curVersion --их количество вносим в журнал
--Обновляем модифицированные
;with cte as (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="U"]') d(x))
update t2 set t2.fld1 = cte.fld1, t2.fld2 = cte.fld2 from tbl_2 t2 join cte on t2.id1 = cte.id1 and t2.id2 = cte.id2
update dbo.Sync_Log set updated = @@rowcount where version = @curVersion --их количество вносим в журнал
end
end
go
alter queue QueueDubrovskogo with activation
(
status = on,
procedure_name = ProcessSyncMessages,
max_queue_readers = 1,
execute as self
)
Скрипт 12
Пускаем по-новой, выжидаем, смотрим, что получилось.
Рис.1
Не, ну это просто праздник какой-то. Все ж работает. Change Tracking, Service Broker, все рулит. А что в журнале?
Рис.2
В журнале тоже все правильно. Черт, единственно, я промахнулся с полем status. Третью часть из Скрипта 11 (Сравнение копии с оригиналом) надо было перетащить в процедуру очереди (Скрипт 12). Не сообразил, осталось исторически с поста Репликация таблиц средствами Change Tracking, где все было синхронно. А сейчас получается, что она отдала изменения в очередь и сразу полезла проверять таблицы на совпадение, а изменения еще не успели дойти и примениться. Поэтому вместо ОК в поле status везде несовпадения. Принципиально это ни на что не влияет, но выглядит неаккуратно. Ладно, предоставляется читателям в качестве самостоятельного упражнения.
По окончании демонстрации стопятся Скрипты 10, 11. Чтобы погасить процедуру обработки очереди, нужно выполнить скрипт
alter queue QueueDubrovskogo with activation
(
status = off
)
select * from sys.dm_broker_activated_tasks
kill 35
где вместо 35 нужно поставить спид, на котором она болтается в конкретном случае.