다음을 통해 공유


Репликация таблиц средствами Change Tracking

Прочитал книжку Александра Юрьевича Гладченко о репликации. Чертовски сильная вещь. Практически, Шолохов. Или Горький. В общем, где-то между. Произвела такое впечатление, что весь давешний вечер я безвылазно предавался репликации, однако, решив разнообразить процесс и привнести в него немного фантазии, вместо штатных механизмов тиражирования использовал Change Tracking (см. https://blogs.msdn.com/alexejs/archive/2009/08/09/change-tracking.aspx). Получился очень простой и показательный пример, иллюстрирующий работу Change Tracking в задаче синхронизации. Постановка задачи. Имеется таблица

use ChangeTracking_Test

if object_id('dbo.tbl_1', 'U') is not null drop table tbl_1

go

create table tbl_1 (

id1 int identity,

id2 int default (datepart(ns, sysdatetime()) / 100),

fld1 varchar(10),

fld2 sql_variant,

primary key(id1, id2)

)

Скрипт 1

Я чуть модифицировал таблицу tbl из предыдущего поста. id2 не имеет никакого потаенного смысла, кроме как сделать в таблице композитный ключ, чтоб служба медом не казалась. Можно было сделать его по дефолту, например, константой, не суть. Мне захотелось сделать его наносекундами, благо точность datetime2 составляет 100 нс. Ставим условием, что операции update возможны только над fld1, fld2, т.е. поля в составе РК не апдейтятся. Все равно это был бы нечестный апдейт - как мы видели в предыдущем посте, Рис.1, апдейт над полем в составе РК – это на самом деле не update, a delete + insert.

И имеется копия этой таблицы:

if object_id('dbo.tbl_2', 'U') is not null drop table tbl_2

select * into tbl_2 from tbl_1 where 1 = 0

Скрипт 2

Констрейнты в копию не потащим, identity удалим, хватит набора полей. Чтобы не усложнять, копия будет лежать здесь же, на том же сервере, в той же базе.

Включаем Change Tracking над базой (если не был до этого включен) и над таблицей tbl_1 (См. пост Change Tracking\Скрипт 2, 3)

if not exists (select 1 from sys.change_tracking_databases where database_id = db_id('ChangeTracking_Test'))

alter database ChangeTracking_Test set change_tracking = on

(change_retention = 10 minutes, auto_cleanup = on)

if not exists (select 1 from sys.change_tracking_tables where object_id = object_id('tbl_1'))

alter table tbl_1 enable change_tracking

Скрипт 3

По сравнению с предыдущим постом, я не буду сейчас отслеживать, какие именно колонки апдейтились, чтобы не усложнять. Просто в случае обновления записи будем обновлять в ее копии разом и fld1, и fld2.

В скрипте синхронизации будет использоваться уровень изоляции snapshot, чтобы пользовательская активность не влезла некстати посередке и не подправила, допустим, текущую версию. На время синхронизации работаем с собственной картиной данных, замороженной на ее длительность . В настройках базы требуется предварительно выставить его поддержку:

alter database ChangeTracking_Test set single_user with rollback immediate

alter database ChangeTracking_Test set read_committed_snapshot on

alter database ChangeTracking_Test set allow_snapshot_isolation on

alter database ChangeTracking_Test set multi_user

Скрипт 4

Имитируем пользовательскую активность, запустив с одного из соединений процесс, вносящий изменения в таблицу tbl_1. Каждые случайное число секунд над одной случайной записью таблицы происходит случайно выбранная одна из операций удаления, вставки или обновления. В случае вставки или обновления генерируются случайные значения для полей fld1, fld2. Я не стал ветвить дальше на ситуации, когда было обновлено поле fld1, либо fld2, либо оба, т.к. with (track_columns_updated = on) при включении Change Tracking на таблицу мы в целях простоты опустили, т.е. какие именно поля в записи обновились, не отслеживаем. При синхронизации, как только будем видеть в таблице изменений tbl_1 против записи U, будем обновлять в tbl_2 чохом все неключевые поля соответствующей записи.

while 1 = 1 begin

--Пауза от 1 до 10 сек.

      declare @t varchar(8) = '00:00:' + cast(floor(rand() * 10 + 1) as char(2))

      print @t

      waitfor delay @t

     

--Какую операцию будем производить (I = 2, U = 3, D = 1)

      declare @op tinyint = floor(rand() * 3 + 1)

      print @op

      if @op = 1 --Если выпало удаление

      delete t1 from tbl_1 t1 join (select top 1 * from tbl_1 order by newid()) t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2

      else

      begin

--Генерим случайно значения для fld1 и fld2

      declare @fld1 char(3) = replicate(char(floor(rand() * 26 + ascii('a'))), 3)

      declare @fld2 char(3) = replicate(char(floor(rand() * 26 + ascii('a'))), 3)

      print @fld1 + ', ' + @fld2

      if @op = 2 --Если выпала вставка

      insert tbl_1 (fld1, fld2) values (@fld1, @fld2)

      else --Если обновление

      update t1 set fld1 = @fld1, fld2 = @fld2 from tbl_1 t1 join (select top 1 * from tbl_1 order by newid()) t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2

      end

     

end

select * from tbl_1

Скрипт 5

Для журналирования синхронизации я создал табличку, куда будут скидываться ее результаты: момент времени, текущая версия, название таблицы-оригинала, название копии, результат синхронизации и ее краткая сводка: сколько записей было удалено, добавлено и обновлено в копии в результате синхронизации.

use ChangeTracking_Test

if object_id('dbo.Sync_Log', 'U') is not null drop table dbo.Sync_Log

create table dbo.Sync_Log (dt datetime default sysdatetime(), version bigint default change_tracking_current_version(),

                           source sysname, destination sysname, status nvarchar(200),

                           deleted bigint, inserted bigint, updated bigint)

insert dbo.Sync_Log (version, source, destination)

values (change_tracking_min_valid_version(object_id('dbo.tbl_1')), 'tbl_1', 'tbl_2')

Скрипт 6

Поле version будет использоваться для определения последней версии, применной на tbl_2, от которой нужно синхронизироваться в этот раз. На начальный момент номер версии будет равен change_tracking_min_valid_version(object_id('dbo.tbl_1')). Дело в том, что если Change Tracking над таблицей прерывался (alter table ... disable/enable change_tracking), все изменения, понятно, чистятся, однако номер последней версии не сбрасывается: он имеется во внутренней таблице sys.syscommittab (она же DMV sys.dm_tran_commit_table), которую создает Change Tracking одну на базу. Старые версии чистятся из нее autocleanup'ом, но приостановка Change Tracking над таблицей не вычеркивает из нее изменений. Отслеживание изменений возобновляется для таблицы не с 0-й версии, а с той последней, что была до приостановки.

Процесс синхронизации, я думаю, построим следующим образом. Сначала сравниваем последнюю версию, примененную на tbl_2, с минимальной версией в изменениях tbl_1. Если минимальная версия больше, это аварийная ситуация. Это значит, что процесс autocleanup в Change Tracking почистил старые изменения, которые мы не успели донести на tbl_2. Синхронизацию нужно останавливать, вручную синхронизировать таблички, скорее всего, применять полный снимок, потом запускать по-новой. Если минимальная версия не больше последней версии на tbl_2, все в порядке. Проверяем текущую версию изменений с версией tbl_2, чтобы понять, а были ли вообще изменения в tbl_1 за этот промежуток. Если они равны, то изменений не было и делать вообще ничего в этот раз не требуется. Если изменения были, то применяем их на tbl_2: удаленные записи удаляем, новые добавляем, в проапдейтившихся – апдейтим поля fld1 и fld2. Все это проходит в рамках snapshot-транзакции, чтобы работа пользователей (Скрипт 3) не вносила помех на время синхронизации.

В тестовых целях я еще устроил проверку идентичности tbl_1 и tbl_2 после выполнения синхронизации, просто чтобы навскидку убедиться, что нигде не накосячил. В реальной жизни на больших таблицах подобное сравнение, понятно, не имеет смысла.

while 1 = 1 begin

waitfor delay '00:01:00'

set transaction isolation level snapshot

begin tran

declare @lastVersion bigint --здесь будет храниться последняя версия, которой синхронизирована tbl_2

declare @curVersion table (curVersion bigint); delete from @curVersion --здесь будет храниться текущая версия изменений

select @lastVersion = max(version) from dbo.Sync_Log where source = 'tbl_1' and destination = 'tbl_2' --берем последнюю версию из нашего журнала

insert dbo.Sync_Log (source, destination) output inserted.Version into @curVersion values ('tbl_1', 'tbl_2') --отмечаем в журнале текущий факт синхронизации

--Если autocleanup успел почистить изменения  tbl_1, которые еще не были доставлены на tbl_2, поднимаем аварийную ситуацию.

if @lastVersion < change_tracking_min_valid_version(object_id('dbo.tbl_1')) begin

 declare @msg nvarchar(200) = 'Часть изменений потеряна! Требуется ручная синхронизация!'

 update dbo.Sync_Log set status = @msg where version = (select curVersion from @curVersion) --фиксируем ее в журнале

 raiserror (@msg, 21, 1) with log --и вызываем строгую ошибку, которая прерывает выполнение скрипта

end

--Если за период с прошлой синхронизации ничего нового не произошло, можно не париться.

if @lastVersion = change_tracking_current_version() goto konec

--Применяем к копии изменения, произошедшие в оригинале.

delete t from tbl_2 t join (select * from changetable(changes tbl_1, @lastVersion) ct where SYS_CHANGE_OPERATION = 'D') ct on t.id1 = ct.id1 and t.id2 = ct.id2 --удаляем удалившиеся записи

update dbo.Sync_Log set deleted = @@rowcount where version = (select curVersion from @curVersion) --их количество вносим в журнал

insert tbl_2 select t1.* from changetable(changes tbl_1,@lastVersion) ct join tbl_1 t1 on t1.id1 = ct.id1 and t1.id2 = ct.id2 where ct.SYS_CHANGE_OPERATION = 'I' --вставляем новые

update dbo.Sync_Log set inserted = @@rowcount where version = (select curVersion from @curVersion) --их количество вносим в журнал

update t2 set t2.fld1 = t1.fld1, t2.fld2 = t1.fld2 from tbl_2 t2 join changetable(changes tbl_1, @lastVersion) ct on t2.id1 = ct.id1 and t2.id2 = ct.id2 join tbl_1 t1 on ct.id1 = t1.id1 and ct.id2 = t1.id2 where ct.SYS_CHANGE_OPERATION = 'U' --обновляем обновившиеся

update dbo.Sync_Log set updated = @@rowcount where version = (select curVersion from @curVersion) --их количество вносим в журнал

konec:

--Сравнение копии с оригиналом.

declare @n1 bigint, @n2 bigint

select @n1 = count(1) from (select * from tbl_1 except select * from tbl_2) t --сколько записей в оригинале не хватает в копии

select @n2 = count(1) from (select * from tbl_2 except select * from tbl_1) t --и наоборот

update dbo.Sync_Log set status = case when @n1 <> 0 or @n2 <> 0 then 'Обнаружено ' + cast(@n1 as varchar(20)) + ' записей в tbl_1, не совпадающих с tbl_2, и ' + cast(@n2 as varchar(20)) + ' записей в tbl_2, не совпадающих с tbl_1.' else 'OK' end where version = (select curVersion from @curVersion) --отражаем несовпадения в журнале, а если их нет, то ОК

commit

set transaction isolation level read committed

end

Скрипт 7

Синхронизация у нас будет происходить через фиксированные интервалы времени. Я взял интервал в 1 мин. При желании можно вместо цикла с waitfor засунуть скрипт T-SQLным шагом в какую-нибудь джобу и заскедьюлировать ее на SQL Agente. Не будем сейчас с этим напрягаться, просто выполним цикл с отдельного коннекта.

Ну что, попробуем, как оно все работает? Открываем SSMS и на первом соединении выполняем Скрипты 1-4, 6. Необходимые настройки сделаны, структуры созданы. Открываем второе соединение, запускаем на нем Скрипт 5. В tbl_1 посыпалась пользовательская активность. Открываем третье соединение и запускаем Скрипт 7 - ежеминутную синхронизацию. И уходим курить. В кофематах традиционно не хватает одной полезной кнопки - кофе с коньяком. М-да. Правда, тогда к заряжальщику придется приставить секьюрити, чтобы по дороге не перехватили. Ну ладно, останавливаем синхронизацию и пользовательскую активность, смотрим, что у нас получилось.

image001

Рис.1

Видите, красота какая. Все работает. Таблицы поддерживаются в синхронном состоянии. Кстати, давайте проверим.

image003

Рис.2

Какой пассаж! Налицо 3 записи в tbl_1, которых нет в tbl_2, 2 записи в tbl_2, коих нет в tbl_1, и имеются еще 2 несовпадения по значениям полей в записях 40 и 44. Впрочем, я уверен, что это второй коннект, эмулирующий пользовательскую активность, успел потрудиться после последней синхронизации. У него же периодичность короче. Он может навтыкать 6 - 60 изменений между двумя синхронизациями. Для целей зрелищности надо было его стопить сразу после очередной синхронизации. Ну не переповторять же из-за этого. Много курить вредно. Просто идем в 3-й коннект и вручную запускаем один раз синхронизацию.

image005

Рис.3

Вот, пожалуйста: 3 недостающие записи были добавлены в tbl_2, 2, которых уже не было в tbl_1, удалены и 2 проапдейчены. Данные в таблицах tbl_1и tbl_2 теперь полностью тождественны. Change Tracking рулит. Все просто.

Какой хитрый, скажете вы. Конечно, просто, когда копия лежит в той же базе. Ни РК, ни identity, ни прочие дефолты из оригинала в нее не перетащены. Как быть с ними? Давайте разберемся. Копия может лежать где угодно, например, на каком-нибудь прилинкованном сервере. Важно, чтобы ее источник поддерживал операции insert, update, delete в ответ на соответствующий код операции в change tracking, и все будет происходить идейно так же. Теперь об отсутствии ограничений оргинальной таблицы в копии. Primary key в копии на (id1, id2) можно поставить, это ничему не противоречит. Что касается значений по умолчанию, это вопрос двоякий. Можно утверждать, что раз tbl_2 является копией tbl_1, ей не полагаются значения по умолчанию и прочие способы самостоятельного присвоения значений, поскольку источником для нее всецело является tbl_1. С другой стороны, может понадобиться реплицировать не все поля, а только вертикальный фрагмент в виде колонок fld1, fld2. Однако, как только мы не реплицируем РК, простой однозвенный процесс репликации на этом заканчивается, поскольку теряется соответствие между записями источника и назначения. На самом деле, не такая уж редкая ситуация. Предположим, tbl_1 – некоторый справочник в Master Data, а tbl_2 – его аналог в прикладной бизнес-системе, например, SAP. Естественно, бизнес-система будет иметь свои ключи в этом справочнике, потому что они завязаны на другие ее таблицы. Следовательно, должна быть таблица перекодировки, причем вестись она, по-хорошему, должна на стороне бизнес-системы, ибо только бизнес-системе известны ее правила генерации ее собственных ключей в этом справочнике. Это означает, что процесс репликации становится двузвенным. Первое звено совпадает с рассмотренным в данном посте. Мы просто кидаем tbl1_1 в ее копию в некоторый шлюзовой отдел бизнес-системы, причем копии, понятно, не нужны ни РК, ни дефолты, только данные. Это будет, своего рода, staging. Спрашивается, зачем иметь на стороне назначения тупую копию? Во-первых, чтобы застраховаться от транспортных проблем, обрыва связи, недоступности источника и т.д. Второе – бизнес-система может иметь свои механизмы оповещения об обновлении копии, чтобы проапдейтить соответствующий ей справочник не по расписанию, а в масштабе времени, близком к реальному. Апдейт ее собственного справочника на основе копии tbl_1 – это второе звено репликации. В принципе, его также можно реализовать средствами Change Tracking, только к джойну changetable с исходной таблицей добавляется еще джойн с таблицей перекодировки ключей.