如何配置变更跟踪和同步对等方

本主题介绍一个应用程序的几个要点,该应用程序使用 Sync Services for ADO.NET 来同步 SQL Server 的三个实例。此应用程序中的代码主要涉及以下 Sync Services 类:

用于对等同步的体系结构和类中介绍了上述各类。在此 Microsoft 网站的 Sync Framework 核心文档中介绍了 SyncOrchestratorSyncOperationStatistics 类。

配置同步涉及以下步骤:

  1. 创建用于存储元数据的跟踪表

  2. 创建用于更新数据和元数据的存储过程

  3. 使用架构和变更跟踪基础结构初始化每个数据库

执行同步涉及以下步骤:

  1. 创建对等同步提供程序和同步适配器

  2. 创建同步控制器,并且同步对等方

本主题中的代码示例来自包括三个对等数据库的应用程序。这些数据库始终以对的形式同步:SyncSamplesDb_Peer1SyncSamplesDb_Peer2SyncSamplesDb_Peer2SyncSamplesDb_Peer3;以及 SyncSamplesDb_Peer1SyncSamplesDb_Peer3。对于生产应用程序,该应用程序的副本通常应部署到每个对等方,以便可以从上述任何对等方启动同步。

创建用于存储元数据的跟踪表

对等同步要求采用某一方法以标识在特定同步会话期间应同步哪些行。每个表都必须有一个主键。请看下面的代码示例。它显示 SyncSamplesDb_Peer1 数据库中的 Sales.Customer 表架构。

CREATE TABLE Sales.Customer(
    CustomerId uniqueidentifier NOT NULL PRIMARY KEY DEFAULT NEWID(), 
    CustomerName nvarchar(100) NOT NULL,
    SalesPerson nvarchar(100) NOT NULL,
    CustomerType nvarchar(100) NOT NULL)

您同步的每个表具有关联的 DbSyncAdapter 对象。将在本主题后面的“创建对等同步提供程序和同步适配器”一节中介绍此对象。按以下方式将 CustomerId 主键添加到 DbSyncAdapter 对象的 RowIdColumns 集合:adapterCustomer.RowIdColumns.Add("CustomerId")。若要在上下文中查看此代码,请参见本主题末尾的完整代码示例。

除了主键外,Sync Services 同步还需要一个方法来跟踪自两个对等方间上次同步会话后哪些行已变更。在跟踪变更中涉及两个主要行为:

  • 为同步的每个表跟踪插入、更新和删除操作。

    这可以通过使用“耦合”**或“解除耦合”**变更跟踪进行处理。耦合变更跟踪意味着用于插入和更新的变更跟踪元数据存储在基表中,并且具有用于跟踪删除操作的逻辑删除表。解除耦合变更跟踪意味着用于插入、更新和删除的元数据存储在单独的表中(通常每个基表对应于一个表)。通过为 ChangeTracking 属性指定一个值,设置变更跟踪的类型。对于上述任何一种变更跟踪,您为 DbSyncAdapter 对象指定的命令将使用变更跟踪元数据来确定在每一个对等方上已进行的增量变更。

    本文档中的示例使用解除耦合的变更跟踪。有关使用耦合跟踪的示例的信息,请参见针对对等同步的示例应用程序

    备注

    您可以为不同的对等提供程序使用不同类型的变更跟踪。但是,您必须将相同类型的变更跟踪用于由特定对等提供程序同步的所有表。除了本主题中所述的跟踪方法之外,还可以使用有关对等同步的 SQL Server 变更跟踪。但是,Sync Services 要求可以更新对等同步的变更跟踪元数据,而 SQL Server 变更跟踪表是只读的。要使用 SQL Server 变更跟踪,要求您维护 Sync Services 可以更新的单独的元数据表。有关如何使用涉及客户端与服务器同步的 SQL Server 变更跟踪的信息,请参见如何使用 SQL Server 变更跟踪

  • 跟踪每个对等方从其他对等方接收了哪些变更。

    这通常由每个对等数据库中的单个表进行处理。此表存储每个“作用域”的二进制格式的同步“知识”。Sync Services 使用知识确定在同步期间要将哪些变更发送到每个对等方。应用程序不必直接使用知识。作用域可被视作表的逻辑分组。以一个具有三个对等方的同步拓扑为例:

    1. Peer1Peer2 同步所有变更。

    2. Peer1Peer3 同步。

    3. 一个用户在 Peer2 上执行更新。

    4. Peer3Peer2 同步。

    Peer3Peer2 同步时,Peer3 已具有来自 Peer2 的大部分变更,因为 Peer3 首先与 Peer1 同步。知识使 Sync Services 能够认识到这一情况,并且只同步在 Peer2 上发生的更新。有关知识的更多信息,请参见 Sync Framework 核心文档。

下面的代码示例创建一个表,它跟踪 Sales.Customer 表的变更。

CREATE TABLE Sales.Customer_Tracking(
    CustomerId uniqueidentifier NOT NULL PRIMARY KEY,          
    sync_row_is_tombstone int DEFAULT 0,
    sync_row_timestamp timestamp, 
    sync_update_peer_key int DEFAULT 0,
    sync_update_peer_timestamp bigint,        
    sync_create_peer_key int DEFAULT 0,
    sync_create_peer_timestamp bigint,
    last_change_datetime datetime DEFAULT GETDATE())

该基表和跟踪表必须在每个对等方上都存在。跟踪表的主键与基表中的主键相同,但还需要使用一些附加列。下表对这些附加列进行了说明。附加列的名称不必与表中所列的名称相同;它们必须匹配访问跟踪表的查询或过程。

附加列 用法

sync_row_is_tombstone

值为 1 表示元数据条目用于基表中的删除操作。

sync_row_timestamp

插入元数据条目的逻辑时间。

sync_update_peer_key

执行了更新的对等方的标识。值为 0 指示在本地对等方进行了更新。

sync_update_peer_timestamp

执行更新的逻辑时间。

sync_create_peer_key

执行了插入的对等方的标识。值为 0 指示在本地对等方进行了插入。

sync_create_peer_timestamp

执行插入的逻辑时间。

last_change_datetime

更新元数据条目的最后时间。

以下代码示例将创建一个触发器,该触发器将在对 Sales.Customer 表进行更新时更新 Sales.Customer_Tracking 表中的变更跟踪元数据。有关插入和删除触发器的示例,请参见用于 Sync Services 帮助主题的安装脚本

CREATE TRIGGER Customer_UpdateTrigger ON Sales.Customer FOR UPDATE
AS    
    UPDATE t    
    SET sync_update_peer_key = 0, 
        sync_update_peer_timestamp = @@DBTS + 1,        
        last_change_datetime = GETDATE()
    FROM Sales.Customer_Tracking t JOIN inserted i ON t.CustomerId = i.CustomerId       

以下代码示例创建 Sales.ScopeInfo 表并插入称作 Sales 的作用域。Sales.ScopeInfo 表为定义的每个作用域存储同步知识。该作用域不必与数据库架构 Sales 相对应。

CREATE TABLE Sales.ScopeInfo(      
    scope_id uniqueidentifier DEFAULT NEWID(),  
    scope_name nvarchar(100) NULL,
    scope_sync_knowledge varbinary(max) NULL,
    scope_tombstone_cleanup_knowledge varbinary(max) NULL,
    scope_timestamp timestamp)

SET NOCOUNT ON
INSERT INTO Sales.ScopeInfo(scope_name) VALUES (''Sales'')
SET NOCOUNT OFF

创建用于更新数据和元数据的存储过程

在您为每个对等方创建元数据表后,创建 Transact-SQL 查询或存储过程(推荐)以便将变更应用于每个对等方上的基表和元数据表。为以下 DbSyncAdapter 属性指定这些查询或过程。在“创建对等同步提供程序和同步适配器”一节中将介绍这些查询或过程。

下面的代码示例创建一组存储过程,以便为 Sales.Customer 表处理数据和元数据变更。为简洁起见,包括了用于选择数据和处理更新的过程,但未包括用于插入和删除的过程。有关插入和删除过程的示例,请参见用于 Sync Services 帮助主题的安装脚本

在本主题末尾的完整代码示例中,传递到这些过程的许多值都来自“会话变量”**。这些变量是内置变量,使 Sync Services 可以在同步会话期间将值传递到命令。有关会话变量的更多信息,请参见如何使用对等同步的会话变量

用于 SelectIncrementalChangesCommand 的过程

CREATE PROCEDURE Sales.sp_Customer_SelectChanges (             
        @sync_min_timestamp bigint,     
        @sync_metadata_only int,
        @sync_initialize int)
AS    

    --IF @sync_initialize = 0
    --BEGIN
        -- Perform additional logic if required.
    --END
    
    SELECT  t.CustomerId, 
            c.CustomerName,
            c.SalesPerson,
            c.CustomerType,                 
            t.sync_row_is_tombstone, 
            t.sync_row_timestamp,                          
            t.sync_update_peer_key, 
            t.sync_update_peer_timestamp, 
            t.sync_create_peer_key, 
            t.sync_create_peer_timestamp 
    FROM Customer c RIGHT JOIN Customer_Tracking t ON c.CustomerId = t.CustomerId
    WHERE t.sync_row_timestamp > @sync_min_timestamp     
    ORDER BY t.CustomerId ASC

用于 UpdateCommand 的过程

CREATE PROCEDURE Sales.sp_Customer_ApplyUpdate (                                   
        @CustomerId uniqueidentifier,
        @CustomerName nvarchar(100),
        @SalesPerson nvarchar(100),
        @CustomerType nvarchar(100),
        @sync_min_timestamp bigint ,                                
        @sync_row_count int OUT,
        @sync_force_write int)        
AS      
    UPDATE c
    SET c.CustomerName = @CustomerName, c.SalesPerson = @SalesPerson, c.CustomerType = @CustomerType      
    FROM Customer c JOIN Customer_Tracking t ON c.CustomerId = t.CustomerId
    WHERE ((t.sync_row_timestamp <= @sync_min_timestamp) OR @sync_force_write = 1)
        AND t.CustomerId = @CustomerId  
    SET @sync_row_count = @@rowcount

用于 UpdateMetadataCommand 的过程

CREATE PROCEDURE Sales.sp_Customer_UpdateMetadata (
        @CustomerId uniqueidentifier,
        @sync_create_peer_key int,
        @sync_create_peer_timestamp bigint,                 
        @sync_update_peer_key int,
        @sync_update_peer_timestamp timestamp,                      
        @sync_row_timestamp timestamp,
        @sync_check_concurrency int,
        @sync_row_count int OUT)        
AS          
    UPDATE Customer_Tracking SET
        sync_create_peer_key = @sync_create_peer_key, 
        sync_create_peer_timestamp =  @sync_create_peer_timestamp,
        sync_update_peer_key = @sync_update_peer_key, 
        sync_update_peer_timestamp =  @sync_update_peer_timestamp 
    WHERE CustomerId = @CustomerId AND 
        (@sync_check_concurrency = 0 OR sync_row_timestamp = @sync_row_timestamp)
    SET @sync_row_count = @@rowcount

用于 SelectRowCommand 的过程

CREATE PROCEDURE Sales.sp_Customer_SelectRow
        @CustomerId uniqueidentifier
AS
   SELECT  t.CustomerId, 
           c.CustomerName,
           c.SalesPerson,
           c.CustomerType,                 
           t.sync_row_timestamp, 
           t.sync_row_is_tombstone,
           t.sync_update_peer_key, 
           t.sync_update_peer_timestamp, 
           t.sync_create_peer_key, 
           t.sync_create_peer_timestamp 
    FROM Customer c RIGHT JOIN Customer_Tracking t ON c.CustomerId = t.CustomerId 
    WHERE t.CustomerId = @CustomerId 

用于 SelectMetadataForCleanupCommand 的过程

CREATE PROCEDURE Sales.sp_Customer_SelectMetadata     
    @metadata_aging_in_hours int
AS
    IF @metadata_aging_in_hours = -1
        BEGIN
            SELECT CustomerId,
                   sync_row_timestamp,         
                   sync_update_peer_key, 
                   sync_update_peer_timestamp, 
                   sync_create_peer_key, 
                   sync_create_peer_timestamp 
            FROM Customer_Tracking
            WHERE sync_row_is_tombstone = 1
        END
    
    ELSE
        BEGIN
            SELECT CustomerId,
                   sync_row_timestamp,         
                   sync_update_peer_key, 
                   sync_update_peer_timestamp, 
                   sync_create_peer_key, 
                   sync_create_peer_timestamp 
            FROM Customer_Tracking
            WHERE sync_row_is_tombstone = 1 AND
            DATEDIFF(hh, last_change_datetime, GETDATE()) > @metadata_aging_in_hours
        END

使用架构和变更跟踪基础结构初始化每个数据库

初始化对等数据库涉及向每个对等方复制表架构和变更跟踪基础结构以及所需的所有初始数据。对于对等同步,Sync Services 并不自动在对等数据库中创建表架构或跟踪基础结构。应用程序必须确保在尝试同步对等方之前这些对象存在。可以使用备份和还原或其他技术将这些对象复制到每个对等方,但是只有在从中获取备份的数据库未发生变更的情况下才这样做。如果第一个对等数据库正在使用中并且正被更新,强烈建议您只将架构和变更跟踪基础结构复制到各对等方,并且使用 Sync Services 复制数据。如果您通过使用其他方法复制数据,则某一对等方可能会失去在第一个对等方已提交的变更。只要至少一个对等方具有数据,Sync Services 就可以在每个对等方初始化这些数据。本主题中的示例代码使用以下方法:每个对等数据库包含两个表,但只有 SyncSamplesDb_Peer1 中的表包含数据。数据在第一次同步会话期间复制到其他对等方。

创建对等同步提供程序和同步适配器

现在变更跟踪基础结构已就绪,您可以将主要精力放在用于配置同步的代码上。同步配置代码主要涉及两种类型的对象:用于每个表的 DbSyncAdapter 对象,用于将在会话期间进行同步的每个对等方的 DbSyncProvider 对象。

同步适配器命令

对于每个表,必须对 DbSyncAdapter 对象设置以下命令。

同步适配属性 用法

SelectIncrementalChangesCommand

通过联接基表及其变更跟踪表,选择自前一同步后的所有变更。这是可与某一对等方同步的变更的最大集合。如前所述,在考虑到知识时可以进一步限制这一变更集合。

InsertCommandUpdateCommandDeleteCommand

向某一对等方应用已从其他对等方选择的插入、更新和删除。已通过使用您为 SelectIncrementalChangesCommand 属性指定的查询或过程选择了这些变更。

InsertMetadataCommand UpdateMetadataCommandDeleteMetadataCommand

在每一对等方更新变更跟踪表,以便反映已从一个对等方选择并应用到其他对等方的变更。通过这些更新,Sync Services 能够跟踪发生变更的时间和位置。

SelectRowCommand

选择在同步过程中发生冲突的行。有关更多信息,请参见如何如何处理对等同步中的数据冲突和错误

SelectMetadataForCleanupCommand

选择可以在某一对等方清除的元数据。清除通常基于数据保持期:将元数据保留特定时间。不过,应用程序可以使用其他逻辑来确定何时清除元数据。有关更多信息,请参见如何清除对等同步的元数据

下面的代码示例创建一些命令,这些命令调用本主题中的前述存储过程。

用于 SelectIncrementalChangesCommand 的应用程序代码

SqlCommand chgsCustomerCmd = new SqlCommand();
chgsCustomerCmd.CommandType = CommandType.StoredProcedure;
chgsCustomerCmd.CommandText = "Sales.sp_Customer_SelectChanges";
chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMetadataOnly, SqlDbType.Int);
chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt);
chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncInitialize, SqlDbType.Int);            

adapterCustomer.SelectIncrementalChangesCommand = chgsCustomerCmd;
Dim chgsCustomerCmd As New SqlCommand()

With chgsCustomerCmd
    .CommandType = CommandType.StoredProcedure
    .CommandText = "Sales.sp_Customer_SelectChanges"
    .Parameters.Add("@" + DbSyncSession.SyncMetadataOnly, SqlDbType.Int)
    .Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt)
    .Parameters.Add("@" + DbSyncSession.SyncInitialize, SqlDbType.Int)
End With

adapterCustomer.SelectIncrementalChangesCommand = chgsCustomerCmd

用于 UpdateCommand 的应用程序代码

SqlCommand updCustomerCmd = new SqlCommand();
updCustomerCmd.CommandType = CommandType.StoredProcedure;
updCustomerCmd.CommandText = "Sales.sp_Customer_ApplyUpdate";
updCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
updCustomerCmd.Parameters.Add("@CustomerName", SqlDbType.NVarChar);
updCustomerCmd.Parameters.Add("@SalesPerson", SqlDbType.NVarChar);
updCustomerCmd.Parameters.Add("@CustomerType", SqlDbType.NVarChar);
updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt);
updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;
updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncForceWrite, SqlDbType.Int);

adapterCustomer.UpdateCommand = updCustomerCmd;
Dim updCustomerCmd As New SqlCommand()

With updCustomerCmd
    .CommandType = CommandType.StoredProcedure
    .CommandText = "Sales.sp_Customer_ApplyUpdate"
    .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
    .Parameters.Add("@CustomerName", SqlDbType.NVarChar)
    .Parameters.Add("@SalesPerson", SqlDbType.NVarChar)
    .Parameters.Add("@CustomerType", SqlDbType.NVarChar)
    .Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt)
    .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
    .Parameters.Add("@" + DbSyncSession.SyncForceWrite, SqlDbType.Int)
End With

adapterCustomer.UpdateCommand = updCustomerCmd

用于 UpdateMetadataCommand 的应用程序代码

SqlCommand updMetadataCustomerCmd = new SqlCommand();
updMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
updMetadataCustomerCmd.CommandText = "Sales.sp_Customer_UpdateMetadata";
updMetadataCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt);
updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

adapterCustomer.UpdateMetadataCommand = updMetadataCustomerCmd;
Dim updMetadataCustomerCmd As New SqlCommand()

With updMetadataCustomerCmd
    .CommandType = CommandType.StoredProcedure
    .CommandText = "Sales.sp_Customer_UpdateMetadata"
    .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
    .Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int)
    .Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt)
    .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int)
    .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt)
    .Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int)
    .Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt)
    .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
End With

adapterCustomer.UpdateMetadataCommand = updMetadataCustomerCmd

用于 SelectRowCommand 的应用程序代码

SqlCommand selRowCustomerCmd = new SqlCommand();
selRowCustomerCmd.CommandType = CommandType.StoredProcedure;
selRowCustomerCmd.CommandText = "Sales.sp_Customer_SelectRow";
selRowCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);

adapterCustomer.SelectRowCommand = selRowCustomerCmd;
Dim selRowCustomerCmd As New SqlCommand()

With selRowCustomerCmd
    .CommandType = CommandType.StoredProcedure
    .CommandText = "Sales.sp_Customer_SelectRow"
    .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
End With

adapterCustomer.SelectRowCommand = selRowCustomerCmd

用于 SelectMetadataForCleanupCommand 的应用程序代码

SqlCommand selMetadataCustomerCmd = new SqlCommand();
selMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
selMetadataCustomerCmd.CommandText = "Sales.sp_Customer_SelectMetadata";
selMetadataCustomerCmd.Parameters.Add("@metadata_aging_in_hours", SqlDbType.Int).Value = MetadataAgingInHours;

adapterCustomer.SelectMetadataForCleanupCommand = selMetadataCustomerCmd;
Dim selMetadataCustomerCmd As New SqlCommand()

With selMetadataCustomerCmd
    .CommandType = CommandType.StoredProcedure
    .CommandText = "Sales.sp_Customer_SelectMetadata"
    .Parameters.Add("@metadata_aging_in_hours", SqlDbType.Int).Value = MetadataAgingInHours
End With

adapterCustomer.SelectMetadataForCleanupCommand = selMetadataCustomerCmd

同步提供程序命令

对于每个对等方,必须对 DbSyncProvider 对象设置以下命令。

同步提供程序属性 用法

SelectNewTimestampCommand

返回一个时间戳值,该值用于选择变更集合并将变更集合应用于每一对等方。在当前同步会话期间,该命令会提供一个新的时间戳值。将对上次同步会话期间的时间戳值之后和新的时间戳值之前所做的变更进行同步。然后,新值将存储起来并用作下一次会话的起始点。以下代码将指定选择一个新的时间戳的查询。

SelectScopeInfoCommand

返回来自作用域元数据表的信息,例如 Sync Services 所需的同步知识和清除知识。

UpdateScopeInfoCommand

更新作用域元数据表中的信息。

下面的代码示例为 SelectNewTimestampCommand 属性创建一个命令。在本主题末尾的完整代码示例中包括用于 SelectScopeInfoCommandUpdateScopeInfoCommand 属性的命令。

用于 SelectNewTimestampCommand 的应用程序代码

SqlCommand selectNewTimestampCommand = new SqlCommand();
string newTimestampVariable = "@" + DbSyncSession.SyncNewTimestamp;
selectNewTimestampCommand.CommandText = "SELECT " + newTimestampVariable + " = min_active_rowversion() - 1";
selectNewTimestampCommand.Parameters.Add(newTimestampVariable, SqlDbType.Timestamp);
selectNewTimestampCommand.Parameters[newTimestampVariable].Direction = ParameterDirection.Output;

peerProvider.SelectNewTimestampCommand = selectNewTimestampCommand;
Dim newTimestampVariable As String = "@" + DbSyncSession.SyncNewTimestamp

Dim selectNewTimestampCommand As New SqlCommand()

With selectNewTimestampCommand
    .CommandText = "SELECT " + newTimestampVariable + " = min_active_rowversion() - 1"
    .Parameters.Add(newTimestampVariable, SqlDbType.Timestamp)
    .Parameters(newTimestampVariable).Direction = ParameterDirection.Output
End With

peerProvider.SelectNewTimestampCommand = selectNewTimestampCommand

创建同步控制器,并且同步对等方

本主题的前面一节中介绍的代码为每个对等方配置了同步。现在就可以同步对等方了。以下代码示例实例化两个 DbSyncProvider 对象并且调用已为示例应用程序创建的 SetupSyncProvider 方法。此方法显示用于配置对等提供程序和同步适配器的方式。这些代码然后指定哪一提供程序是本地提供程序以及哪一提供程序是远程提供程序。最后,这些代码指定变更首先从远程数据库上载到本地数据库,然后按反向下载。

public class SampleSyncAgent : SyncOrchestrator
{
    public SampleSyncAgent(string localProviderConnString, string remoteProviderConnString)
    {

        //Instantiate the sample provider that allows us to create a provider
        //for both of the peers that are being synchronized.
        SampleSyncProvider sampleSyncProvider = new SampleSyncProvider();

        //Instantiate a DbSyncProvider for the local peer and the remote peer.
        //For example, if this code is running at peer1 and is
        //synchronizing with peer2, peer1 would be the local provider
        //and peer2 the remote provider.
        DbSyncProvider localProvider = new DbSyncProvider();
        DbSyncProvider remoteProvider = new DbSyncProvider();

        //Create a provider by using the SetupSyncProvider on the sample class.             
        sampleSyncProvider.SetupSyncProvider(localProviderConnString, localProvider);
        localProvider.SyncProviderPosition = SyncProviderPosition.Local;
        
        sampleSyncProvider.SetupSyncProvider(remoteProviderConnString, remoteProvider);
        remoteProvider.SyncProviderPosition = SyncProviderPosition.Remote;

        //Specify the local and remote providers that should be synchronized,
        //and the direction and order of changes. In this case, changes are first
        //uploaded from remote to local and then downloaded in the other direction.
        this.LocalProvider = localProvider;
        this.RemoteProvider = remoteProvider;
        this.Direction = SyncDirectionOrder.UploadAndDownload;
    }
Public Class SampleSyncAgent
    Inherits SyncOrchestrator

    Public Sub New(ByVal localProviderConnString As String, ByVal remoteProviderConnString As String)

        'Instantiate the sample provider that allows us to create a provider
        'for both of the peers that are being synchronized.
        Dim sampleSyncProvider As New SampleSyncProvider()

        'Instantiate a DbSyncProvider for the local peer and the remote peer.
        'For example, if this code is running at peer1 and is
        'synchronizing with peer2, peer1 would be the local provider
        'and peer2 the remote provider.  
        Dim localProvider As New DbSyncProvider()
        Dim remoteProvider As New DbSyncProvider()

        'Create a provider by using the SetupSyncProvider on the sample class.
        sampleSyncProvider.SetupSyncProvider(localProviderConnString, localProvider)
        localProvider.SyncProviderPosition = SyncProviderPosition.Local

        sampleSyncProvider.SetupSyncProvider(remoteProviderConnString, remoteProvider)
        remoteProvider.SyncProviderPosition = SyncProviderPosition.Remote

        'Specify the local and remote providers that should be synchronized,
        'and the direction and order of changes. In this case, changes are first
        'uploaded from local to remote and then downloaded in the other direction.
        Me.LocalProvider = localProvider
        Me.RemoteProvider = remoteProvider
        Me.Direction = SyncDirectionOrder.UploadAndDownload

    End Sub 'New
End Class 'SampleSyncAgent

以下代码通过实例化 SampleSyncAgent 类(派生自 SyncOrchestrator),然后调用 Synchronize 方法以便同步三对对等数据库,设置三个同步会话。

//The SampleStats class handles information from the SyncStatistics
//object that the Synchronize method returns.
SampleStats sampleStats = new SampleStats();

try
{
    //Initial synchronization. Instantiate the SyncOrchestrator
    //and call Synchronize. Note that data is not synchronized during the
    //session between peer 1 and peer 3, because all rows have already
    //been delivered to peer 3 during its synchronization session with peer 2.     
    SyncOrchestrator sampleSyncAgent;
    SyncOperationStatistics syncStatistics;

    sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString);
    syncStatistics = sampleSyncAgent.Synchronize();
    sampleStats.DisplayStats(syncStatistics, "initial");

    sampleSyncAgent = new SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString);
    syncStatistics = sampleSyncAgent.Synchronize();
    sampleStats.DisplayStats(syncStatistics, "initial");

    sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString);
    syncStatistics = sampleSyncAgent.Synchronize();
    sampleStats.DisplayStats(syncStatistics, "initial");
}


catch (DbOutdatedSyncException ex)
{
    Console.WriteLine("Outdated Knowledge: " + ex.OutdatedPeerSyncKnowledge.ToString() +
                      " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString());
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}
'The SampleStats class handles information from the SyncStatistics
'object that the Synchronize method returns.
Dim sampleStats As New SampleStats()

Try
    'Initial synchronization. Instantiate the SyncOrchestrator
    'and call Synchronize. Note that data is not synchronized during the
    'session between peer 1 and peer 3, because all rows have already
    'been delivered to peer 3 during its synchronization session with peer 2.              
    Dim sampleSyncAgent As SyncOrchestrator
    Dim syncStatistics As SyncOperationStatistics

    sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString)
    syncStatistics = sampleSyncAgent.Synchronize()
    sampleStats.DisplayStats(syncStatistics, "initial")

    sampleSyncAgent = New SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString)
    syncStatistics = sampleSyncAgent.Synchronize()
    sampleStats.DisplayStats(syncStatistics, "initial")

    sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString)
    syncStatistics = sampleSyncAgent.Synchronize()
    sampleStats.DisplayStats(syncStatistics, "initial")


Catch ex As DbOutdatedSyncException
    Console.WriteLine("Outdated Knowledge: " & ex.OutdatedPeerSyncKnowledge.ToString() _
                    & " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString())
Catch ex As Exception
    Console.WriteLine(ex.Message)
End Try

完整的代码示例

下面的完整代码示例包括了前述代码示例、DbSyncAdapterDbSyncProvider 对象所需的代码以及用于显示同步统计信息的附加代码。

using System;
using System.IO;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using Microsoft.Synchronization;
using Microsoft.Synchronization.Data;

namespace Microsoft.Samples.Synchronization
{
    class Program
    {
        static void Main(string[] args)
        {

            //The Utility class handles all functionality that is not
            //directly related to synchronization, such as holding peerConnection 
            //string information and making changes to the server database.
            Utility util = new Utility();

            //The SampleStats class handles information from the SyncStatistics
            //object that the Synchronize method returns.
            SampleStats sampleStats = new SampleStats();

            try
            {
                //Initial synchronization. Instantiate the SyncOrchestrator
                //and call Synchronize. Note that data is not synchronized during the
                //session between peer 1 and peer 3, because all rows have already
                //been delivered to peer 3 during its synchronization session with peer 2.     
                SyncOrchestrator sampleSyncAgent;
                SyncOperationStatistics syncStatistics;

                sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "initial");

                sampleSyncAgent = new SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "initial");

                sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "initial");
            }


            catch (DbOutdatedSyncException ex)
            {
                Console.WriteLine("Outdated Knowledge: " + ex.OutdatedPeerSyncKnowledge.ToString() +
                                  " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString());
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }

            //Make changes in each peer database.
            util.MakeDataChangesOnPeer(util.Peer1ConnString, "Customer");
            util.MakeDataChangesOnPeer(util.Peer2ConnString, "Customer");
            util.MakeDataChangesOnPeer(util.Peer3ConnString, "Customer");

            try
            {
                //Subsequent synchronization. Changes are now synchronized between all
                //peers. 
                //reports two changes at each peer because of the changes made to
                //the change tracking tables.
                SyncOrchestrator sampleSyncAgent;
                SyncOperationStatistics syncStatistics;

                sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "subsequent");

                sampleSyncAgent = new SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "subsequent");

                sampleSyncAgent = new SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString);
                syncStatistics = sampleSyncAgent.Synchronize();
                sampleStats.DisplayStats(syncStatistics, "subsequent");
            }


            catch (DbOutdatedSyncException ex)
            {
                Console.WriteLine("Outdated Knowledge: " + ex.OutdatedPeerSyncKnowledge.ToString() +
                                  " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString());
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }

            //Return peer data back to its original state.
            util.CleanUpPeer(util.Peer1ConnString);
            util.CleanUpPeer(util.Peer2ConnString);
            util.CleanUpPeer(util.Peer3ConnString);

            //Exit.
            Console.Write("\nPress Enter to close the window.");
            Console.ReadLine();
        }

        //Create a class that is derived from 
        //Microsoft.Synchronization.SyncOrchestrator.
        public class SampleSyncAgent : SyncOrchestrator
        {
            public SampleSyncAgent(string localProviderConnString, string remoteProviderConnString)
            {

                //Instantiate the sample provider that allows us to create a provider
                //for both of the peers that are being synchronized.
                SampleSyncProvider sampleSyncProvider = new SampleSyncProvider();

                //Instantiate a DbSyncProvider for the local peer and the remote peer.
                //For example, if this code is running at peer1 and is
                //synchronizing with peer2, peer1 would be the local provider
                //and peer2 the remote provider.
                DbSyncProvider localProvider = new DbSyncProvider();
                DbSyncProvider remoteProvider = new DbSyncProvider();

                //Create a provider by using the SetupSyncProvider on the sample class.             
                sampleSyncProvider.SetupSyncProvider(localProviderConnString, localProvider);
                localProvider.SyncProviderPosition = SyncProviderPosition.Local;
                
                sampleSyncProvider.SetupSyncProvider(remoteProviderConnString, remoteProvider);
                remoteProvider.SyncProviderPosition = SyncProviderPosition.Remote;

                //Specify the local and remote providers that should be synchronized,
                //and the direction and order of changes. In this case, changes are first
                //uploaded from remote to local and then downloaded in the other direction.
                this.LocalProvider = localProvider;
                this.RemoteProvider = remoteProvider;
                this.Direction = SyncDirectionOrder.UploadAndDownload;
            }

            //
            public class SampleSyncProvider
            {
                public DbSyncProvider SetupSyncProvider(string peerConnString, DbSyncProvider peerProvider)
                {

                    //Set the amount of time to retain metadata.
                    const int MetadataAgingInHours = 100;

                    SqlConnection peerConnection = new SqlConnection(peerConnString);
                    peerProvider.Connection = peerConnection;
                    peerProvider.ScopeName = "Sales";

                    //Create a DbSyncAdapter object for the Customer table and associate it 
                    //with the DbSyncProvider. Following the DataAdapter style in ADO.NET, 
                    //DbSyncAdapter is the equivalent for synchronization. The commands that 
                    //are specified for the DbSyncAdapter object call stored procedures
                    //that are created in each peer database.
                    DbSyncAdapter adapterCustomer = new DbSyncAdapter("Customer");


                    //Specify the primary key, which Sync Services uses
                    //to identify each row during synchronization.
                    adapterCustomer.RowIdColumns.Add("CustomerId");


                    //Specify the command to select incremental changes.
                    //In this command and other commands, session variables are
                    //used to pass information at runtime. DbSyncSession.SyncMetadataOnly 
                    //and SyncMinTimestamp are two of the string constants that
                    //the DbSyncSession class exposes. You could also include 
                    //@sync_metadata_only and @sync_min_timestamp directly in your 
                    //queries:
                    //*  sync_metadata_only is used by Sync Services as an optimization
                    //   in some queries.
                    //* The value of the sync_min_timestamp session variable is compared to
                    //   values in the sync_row_timestamp column in the tracking table to 
                    //   determine which rows to select.
                    SqlCommand chgsCustomerCmd = new SqlCommand();
                    chgsCustomerCmd.CommandType = CommandType.StoredProcedure;
                    chgsCustomerCmd.CommandText = "Sales.sp_Customer_SelectChanges";
                    chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMetadataOnly, SqlDbType.Int);
                    chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt);
                    chgsCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncInitialize, SqlDbType.Int);            

                    adapterCustomer.SelectIncrementalChangesCommand = chgsCustomerCmd;

                    //Specify the command to insert rows.
                    //The sync_row_count session variable is used in this command 
                    //and other commands to return a count of the rows affected by an operation. 
                    //A count of 0 indicates that an operation failed.
                    SqlCommand insCustomerCmd = new SqlCommand();
                    insCustomerCmd.CommandType = CommandType.StoredProcedure;
                    insCustomerCmd.CommandText = "Sales.sp_Customer_ApplyInsert";
                    insCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    insCustomerCmd.Parameters.Add("@CustomerName", SqlDbType.NVarChar);
                    insCustomerCmd.Parameters.Add("@SalesPerson", SqlDbType.NVarChar);
                    insCustomerCmd.Parameters.Add("@CustomerType", SqlDbType.NVarChar);
                    insCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

                    adapterCustomer.InsertCommand = insCustomerCmd;


                    //Specify the command to update rows.
                    //The value of the sync_min_timestamp session variable is compared to
                    //values in the sync_row_timestamp column in the tracking table to 
                    //determine which rows to update.
                    SqlCommand updCustomerCmd = new SqlCommand();
                    updCustomerCmd.CommandType = CommandType.StoredProcedure;
                    updCustomerCmd.CommandText = "Sales.sp_Customer_ApplyUpdate";
                    updCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    updCustomerCmd.Parameters.Add("@CustomerName", SqlDbType.NVarChar);
                    updCustomerCmd.Parameters.Add("@SalesPerson", SqlDbType.NVarChar);
                    updCustomerCmd.Parameters.Add("@CustomerType", SqlDbType.NVarChar);
                    updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt);
                    updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;
                    updCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncForceWrite, SqlDbType.Int);
                    
                    adapterCustomer.UpdateCommand = updCustomerCmd;


                    //Specify the command to delete rows.
                    //The value of the sync_min_timestamp session variable is compared to
                    //values in the sync_row_timestamp column in the tracking table to 
                    //determine which rows to delete.
                    SqlCommand delCustomerCmd = new SqlCommand();
                    delCustomerCmd.CommandType = CommandType.StoredProcedure;
                    delCustomerCmd.CommandText = "Sales.sp_Customer_ApplyDelete";
                    delCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    delCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt);
                    delCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

                    adapterCustomer.DeleteCommand = delCustomerCmd;

                    //Specify the command to select any conflicting rows.
                    SqlCommand selRowCustomerCmd = new SqlCommand();
                    selRowCustomerCmd.CommandType = CommandType.StoredProcedure;
                    selRowCustomerCmd.CommandText = "Sales.sp_Customer_SelectRow";
                    selRowCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);

                    adapterCustomer.SelectRowCommand = selRowCustomerCmd;


                    //Specify the command to insert metadata rows.
                    //The session variables in this command relate to columns in
                    //the tracking table.
                    SqlCommand insMetadataCustomerCmd = new SqlCommand();
                    insMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
                    insMetadataCustomerCmd.CommandText = "Sales.sp_Customer_InsertMetadata";
                    insMetadataCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowIsTombstone, SqlDbType.Int);
                    insMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

                    adapterCustomer.InsertMetadataCommand = insMetadataCustomerCmd;


                    //Specify the command to update metadata rows.
                    SqlCommand updMetadataCustomerCmd = new SqlCommand();
                    updMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
                    updMetadataCustomerCmd.CommandText = "Sales.sp_Customer_UpdateMetadata";
                    updMetadataCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt);
                    updMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

                    adapterCustomer.UpdateMetadataCommand = updMetadataCustomerCmd;

                    //Specify the command to delete metadata rows.
                    SqlCommand delMetadataCustomerCmd = new SqlCommand();
                    delMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
                    delMetadataCustomerCmd.CommandText = "Sales.sp_Customer_DeleteMetadata";
                    delMetadataCustomerCmd.Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier);
                    delMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int);
                    delMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt);
                    delMetadataCustomerCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;

                    adapterCustomer.DeleteMetadataCommand = delMetadataCustomerCmd;


                    //Specify the command to select metadata rows for cleanup.
                    SqlCommand selMetadataCustomerCmd = new SqlCommand();
                    selMetadataCustomerCmd.CommandType = CommandType.StoredProcedure;
                    selMetadataCustomerCmd.CommandText = "Sales.sp_Customer_SelectMetadata";
                    selMetadataCustomerCmd.Parameters.Add("@metadata_aging_in_hours", SqlDbType.Int).Value = MetadataAgingInHours;

                    adapterCustomer.SelectMetadataForCleanupCommand = selMetadataCustomerCmd;

                    peerProvider.SyncAdapters.Add(adapterCustomer);

                    // Configure commands that relate to the provider itself rather 
                    // than the DbSyncAdapter object for each table:
                    // * SelectNewTimestampCommand: Returns the new high watermark for 
                    //   the current synchronization session.
                    // * SelectScopeInfoCommand: Returns sync knowledge, cleanup knowledge, 
                    //   and a scope version (timestamp).
                    // * UpdateScopeInfoCommand: Sets new values for sync knowledge and cleanup knowledge.            

                    //Select a new timestamp.
                    //During each synchronization, the new value and
                    //the last value from the previous synchronization
                    //are used: the set of changes between these upper and
                    //lower bounds is synchronized.
                    SqlCommand selectNewTimestampCommand = new SqlCommand();
                    string newTimestampVariable = "@" + DbSyncSession.SyncNewTimestamp;
                    selectNewTimestampCommand.CommandText = "SELECT " + newTimestampVariable + " = min_active_rowversion() - 1";
                    selectNewTimestampCommand.Parameters.Add(newTimestampVariable, SqlDbType.Timestamp);
                    selectNewTimestampCommand.Parameters[newTimestampVariable].Direction = ParameterDirection.Output;
                    
                    peerProvider.SelectNewTimestampCommand = selectNewTimestampCommand;

                    //Specify the command to select local replica metadata. 
                    //Set session variables with values from the Sales.ScopeInfo
                    //metadata table.
                    SqlCommand selReplicaInfoCmd = new SqlCommand();
                    selReplicaInfoCmd.CommandType = CommandType.Text;
                    selReplicaInfoCmd.CommandText = "SELECT " +
                                                    "@" + DbSyncSession.SyncScopeId + " = scope_id, " +
                                                    "@" + DbSyncSession.SyncScopeKnowledge + " = scope_sync_knowledge, " +
                                                    "@" + DbSyncSession.SyncScopeCleanupKnowledge + " = scope_tombstone_cleanup_knowledge, " +
                                                    "@" + DbSyncSession.SyncScopeTimestamp + " = scope_timestamp " +
                                                    "FROM Sales.ScopeInfo " +
                                                    "WHERE scope_name = @" + DbSyncSession.SyncScopeName;
                    selReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeName, SqlDbType.NVarChar, 100);
                    selReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeId, SqlDbType.UniqueIdentifier).Direction = ParameterDirection.Output;
                    selReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeKnowledge, SqlDbType.VarBinary, 10000).Direction = ParameterDirection.Output;
                    selReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeCleanupKnowledge, SqlDbType.VarBinary, 10000).Direction = ParameterDirection.Output;
                    selReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeTimestamp, SqlDbType.BigInt).Direction = ParameterDirection.Output;
                    
                    peerProvider.SelectScopeInfoCommand = selReplicaInfoCmd;


                    //Specify the command to select local replica metadata. 
                    //Update the Sales.ScopeInfo metadata table with values
                    //from session variables.
                    SqlCommand updReplicaInfoCmd = new SqlCommand();
                    updReplicaInfoCmd.CommandType = CommandType.Text;
                    updReplicaInfoCmd.CommandText = "UPDATE  Sales.ScopeInfo SET " +
                                                    "scope_sync_knowledge = @" + DbSyncSession.SyncScopeKnowledge + ", " +
                                                    "scope_tombstone_cleanup_knowledge = @" + DbSyncSession.SyncScopeCleanupKnowledge + " " +
                                                    "WHERE scope_name = @" + DbSyncSession.SyncScopeName + " AND " +
                                                    " ( @" + DbSyncSession.SyncCheckConcurrency + " = 0 or scope_timestamp = @" + DbSyncSession.SyncScopeTimestamp + "); " +
                                                    "SET @" + DbSyncSession.SyncRowCount + " = @@rowcount";
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeKnowledge, SqlDbType.VarBinary, 10000);
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeCleanupKnowledge, SqlDbType.VarBinary, 10000);
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeName, SqlDbType.NVarChar, 100);
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int);
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncScopeTimestamp, SqlDbType.BigInt);
                    updReplicaInfoCmd.Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output;
                    
                    peerProvider.UpdateScopeInfoCommand = updReplicaInfoCmd;

                    return peerProvider;
                }
            }
        }

        //Handle the statistics that are returned by the SyncAgent.
        public class SampleStats
        {
            public void DisplayStats(SyncOperationStatistics syncStatistics, string syncType)
            {
                Console.WriteLine(String.Empty);
                if (syncType == "initial")
                {
                    Console.WriteLine("****** Initial Synchronization ******");
                }
                else if (syncType == "subsequent")
                {
                    Console.WriteLine("***** Subsequent Synchronization ****");
                }

                Console.WriteLine("Start Time: " + syncStatistics.SyncStartTime);
                Console.WriteLine("Total Changes Uploaded: " + syncStatistics.UploadChangesTotal);
                Console.WriteLine("Total Changes Downloaded: " + syncStatistics.DownloadChangesTotal);
                Console.WriteLine("Complete Time: " + syncStatistics.SyncEndTime);
                Console.WriteLine(String.Empty);
            }
        }
    }
}
Imports System
Imports System.IO
Imports System.Text
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.Synchronization
Imports Microsoft.Synchronization.Data

Class Program

    Shared Sub Main(ByVal args() As String)

        'The Utility class handles all functionality that is not
        'directly related to synchronization, such as holding peerConnection 
        'string information and making changes to the server database.
        Dim util As New Utility()

        'The SampleStats class handles information from the SyncStatistics
        'object that the Synchronize method returns.
        Dim sampleStats As New SampleStats()

        Try
            'Initial synchronization. Instantiate the SyncOrchestrator
            'and call Synchronize. Note that data is not synchronized during the
            'session between peer 1 and peer 3, because all rows have already
            'been delivered to peer 3 during its synchronization session with peer 2.              
            Dim sampleSyncAgent As SyncOrchestrator
            Dim syncStatistics As SyncOperationStatistics

            sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "initial")

            sampleSyncAgent = New SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "initial")

            sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "initial")


        Catch ex As DbOutdatedSyncException
            Console.WriteLine("Outdated Knowledge: " & ex.OutdatedPeerSyncKnowledge.ToString() _
                            & " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString())
        Catch ex As Exception
            Console.WriteLine(ex.Message)
        End Try
        'Make changes in each peer database.
        util.MakeDataChangesOnPeer(util.Peer1ConnString, "Customer")
        util.MakeDataChangesOnPeer(util.Peer2ConnString, "Customer")
        util.MakeDataChangesOnPeer(util.Peer3ConnString, "Customer")

        Try
            'Subsequent synchronization. Changes are now synchronized between all
            'peers. 
            'reports two changes at each peer because of the changes made to
            'the change tracking tables.
            Dim sampleSyncAgent As SyncOrchestrator
            Dim syncStatistics As SyncOperationStatistics

            sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer2ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "subsequent")

            sampleSyncAgent = New SampleSyncAgent(util.Peer2ConnString, util.Peer3ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "subsequent")

            sampleSyncAgent = New SampleSyncAgent(util.Peer1ConnString, util.Peer3ConnString)
            syncStatistics = sampleSyncAgent.Synchronize()
            sampleStats.DisplayStats(syncStatistics, "subsequent")


        Catch ex As DbOutdatedSyncException
            Console.WriteLine("Outdated Knowledge: " & ex.OutdatedPeerSyncKnowledge.ToString() _
                            & " Clean up knowledge: " + ex.MissingCleanupKnowledge.ToString())
        Catch ex As Exception
            Console.WriteLine(ex.Message)
        End Try

        'Return peer data back to its original state.
        util.CleanUpPeer(util.Peer1ConnString)
        util.CleanUpPeer(util.Peer2ConnString)
        util.CleanUpPeer(util.Peer3ConnString)

        'Exit.
        Console.Write(vbLf + "Press Enter to close the window.")
        Console.ReadLine()

    End Sub 'Main

End Class 'Program

'Create a class that is derived from 
'Microsoft.Synchronization.SyncOrchestrator.
Public Class SampleSyncAgent
    Inherits SyncOrchestrator

    Public Sub New(ByVal localProviderConnString As String, ByVal remoteProviderConnString As String)

        'Instantiate the sample provider that allows us to create a provider
        'for both of the peers that are being synchronized.
        Dim sampleSyncProvider As New SampleSyncProvider()

        'Instantiate a DbSyncProvider for the local peer and the remote peer.
        'For example, if this code is running at peer1 and is
        'synchronizing with peer2, peer1 would be the local provider
        'and peer2 the remote provider.  
        Dim localProvider As New DbSyncProvider()
        Dim remoteProvider As New DbSyncProvider()

        'Create a provider by using the SetupSyncProvider on the sample class.
        sampleSyncProvider.SetupSyncProvider(localProviderConnString, localProvider)
        localProvider.SyncProviderPosition = SyncProviderPosition.Local

        sampleSyncProvider.SetupSyncProvider(remoteProviderConnString, remoteProvider)
        remoteProvider.SyncProviderPosition = SyncProviderPosition.Remote

        'Specify the local and remote providers that should be synchronized,
        'and the direction and order of changes. In this case, changes are first
        'uploaded from local to remote and then downloaded in the other direction.
        Me.LocalProvider = localProvider
        Me.RemoteProvider = remoteProvider
        Me.Direction = SyncDirectionOrder.UploadAndDownload

    End Sub 'New
End Class 'SampleSyncAgent

Public Class SampleSyncProvider

    Public Function SetupSyncProvider(ByVal peerConnString As String, ByVal peerProvider As DbSyncProvider) As DbSyncProvider

        'Set the amount of time to retain metadata.
        Const MetadataAgingInHours As Integer = 100

        Dim peerConnection As New SqlConnection(peerConnString)
        peerProvider.Connection = peerConnection
        peerProvider.ScopeName = "Sales"

        'Create a DbSyncAdapter object for the Customer table and associate it 
        'with the DbSyncProvider. Following the DataAdapter style in ADO.NET, 
        'DbSyncAdapter is the equivalent for synchronization. The commands that 
        'are specified for the DbSyncAdapter object call stored procedures
        ' that are created in each peer database.
        Dim adapterCustomer As New DbSyncAdapter("Customer")

        'Specify the primary key, which Sync Services uses
        'to identify each row during synchronization.
        adapterCustomer.RowIdColumns.Add("CustomerId")

        'Specify the command to select incremental changes.
        'In this command and other commands, session variables are
        'used to pass information at runtime. DbSyncSession.SyncMetadataOnly 
        'and SyncMinTimestamp are two of the string constants that
        'the DbSyncSession class exposes. You could also include 
        '@sync_metadata_only and @sync_min_timestamp directly in your 
        'queries:
        '*  sync_metadata_only is used by Sync Services as an optimization
        '   in some queries.
        '* The value of the sync_min_timestamp session variable is compared to
        '   values in the sync_row_timestamp column in the tracking table to 
        '   determine which rows to select.
        Dim chgsCustomerCmd As New SqlCommand()

        With chgsCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_SelectChanges"
            .Parameters.Add("@" + DbSyncSession.SyncMetadataOnly, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncInitialize, SqlDbType.Int)
        End With

        adapterCustomer.SelectIncrementalChangesCommand = chgsCustomerCmd

        'Specify the command to insert rows.
        'The sync_row_count session variable is used in this command 
        'and other commands to return a count of the rows affected by an operation. 
        'A count of 0 indicates that an operation failed.
        Dim insCustomerCmd As New SqlCommand()

        With insCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_ApplyInsert"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@CustomerName", SqlDbType.NVarChar)
            .Parameters.Add("@SalesPerson", SqlDbType.NVarChar)
            .Parameters.Add("@CustomerType", SqlDbType.NVarChar)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        adapterCustomer.InsertCommand = insCustomerCmd


        'Specify the command to update rows.
        'The value of the sync_min_timestamp session variable is compared to
        'values in the sync_row_timestamp column in the tracking table to 
        'determine which rows to update.
        Dim updCustomerCmd As New SqlCommand()

        With updCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_ApplyUpdate"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@CustomerName", SqlDbType.NVarChar)
            .Parameters.Add("@SalesPerson", SqlDbType.NVarChar)
            .Parameters.Add("@CustomerType", SqlDbType.NVarChar)
            .Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
            .Parameters.Add("@" + DbSyncSession.SyncForceWrite, SqlDbType.Int)
        End With

        adapterCustomer.UpdateCommand = updCustomerCmd


        'Specify the command to delete rows.
        'The value of the sync_min_timestamp session variable is compared to
        'values in the sync_row_timestamp column in the tracking table to 
        'determine which rows to delete.
        Dim delCustomerCmd As New SqlCommand()

        With delCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_ApplyDelete"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@" + DbSyncSession.SyncMinTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        adapterCustomer.DeleteCommand = delCustomerCmd

        'Specify the command to select any conflicting rows.
        Dim selRowCustomerCmd As New SqlCommand()

        With selRowCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_SelectRow"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
        End With

        adapterCustomer.SelectRowCommand = selRowCustomerCmd


        'Specify the command to insert metadata rows.
        'The session variables in this command relate to columns in
        'the tracking table. These are the same columns
        'that were specified as DbSyncAdapter properties at the beginning 
        'of this code example.
        Dim insMetadataCustomerCmd As New SqlCommand()

        With insMetadataCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_InsertMetadata"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowIsTombstone, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        adapterCustomer.InsertMetadataCommand = insMetadataCustomerCmd


        'Specify the command to update metadata rows.
        Dim updMetadataCustomerCmd As New SqlCommand()

        With updMetadataCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_UpdateMetadata"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@" + DbSyncSession.SyncCreatePeerKey, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncCreatePeerTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerKey, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncUpdatePeerTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        adapterCustomer.UpdateMetadataCommand = updMetadataCustomerCmd

        'Specify the command to delete metadata rows.
        Dim delMetadataCustomerCmd As New SqlCommand()

        With delMetadataCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_DeleteMetadata"
            .Parameters.Add("@CustomerId", SqlDbType.UniqueIdentifier)
            .Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncRowTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        adapterCustomer.DeleteMetadataCommand = delMetadataCustomerCmd


        'Specify the command to select metadata rows for cleanup.
        Dim selMetadataCustomerCmd As New SqlCommand()

        With selMetadataCustomerCmd
            .CommandType = CommandType.StoredProcedure
            .CommandText = "Sales.sp_Customer_SelectMetadata"
            .Parameters.Add("@metadata_aging_in_hours", SqlDbType.Int).Value = MetadataAgingInHours
        End With

        adapterCustomer.SelectMetadataForCleanupCommand = selMetadataCustomerCmd

        peerProvider.SyncAdapters.Add(adapterCustomer)

        ' Configure commands that relate to the provider itself rather 
        ' than the DbSyncAdapter object for each table:
        ' * SelectNewTimestampCommand: Returns the new high watermark for 
        '   the current synchronization session.
        ' * SelectScopeInfoCommand: Returns sync knowledge, cleanup knowledge, 
        '   and a scope version (timestamp).
        ' * UpdateScopeInfoCommand: Sets new values for sync knowledge and cleanup knowledge.            
        'Select a new timestamp.
        'During each synchronization, the new value and
        'the last value from the previous synchronization
        'are used: the set of changes between these upper and
        'lower bounds is synchronized.
        Dim newTimestampVariable As String = "@" + DbSyncSession.SyncNewTimestamp

        Dim selectNewTimestampCommand As New SqlCommand()

        With selectNewTimestampCommand
            .CommandText = "SELECT " + newTimestampVariable + " = min_active_rowversion() - 1"
            .Parameters.Add(newTimestampVariable, SqlDbType.Timestamp)
            .Parameters(newTimestampVariable).Direction = ParameterDirection.Output
        End With

        peerProvider.SelectNewTimestampCommand = selectNewTimestampCommand

        'Specify the command to select local replica metadata. 
        'Set session variables with values from the Sales.ScopeInfo
        'metadata table.
        Dim selReplicaInfoCmd As New SqlCommand()

        With selReplicaInfoCmd
            .CommandType = CommandType.Text
            .CommandText = "SELECT " _
                         & "@" + DbSyncSession.SyncScopeId + " = scope_id, " _
                         & "@" + DbSyncSession.SyncScopeKnowledge + " = scope_sync_knowledge, " _
                         & "@" + DbSyncSession.SyncScopeCleanupKnowledge + " = scope_tombstone_cleanup_knowledge, " _
                         & "@" + DbSyncSession.SyncScopeTimestamp + " = scope_timestamp " _
                         & "FROM Sales.ScopeInfo " _
                         & "WHERE scope_name = @" + DbSyncSession.SyncScopeName
            .Parameters.Add("@" + DbSyncSession.SyncScopeName, SqlDbType.NVarChar, 100)
            .Parameters.Add("@" + DbSyncSession.SyncScopeId, SqlDbType.UniqueIdentifier).Direction = ParameterDirection.Output
            .Parameters.Add("@" + DbSyncSession.SyncScopeKnowledge, SqlDbType.VarBinary, 10000).Direction = ParameterDirection.Output
            .Parameters.Add("@" + DbSyncSession.SyncScopeCleanupKnowledge, SqlDbType.VarBinary, 10000).Direction = ParameterDirection.Output
            .Parameters.Add("@" + DbSyncSession.SyncScopeTimestamp, SqlDbType.BigInt).Direction = ParameterDirection.Output
        End With

        peerProvider.SelectScopeInfoCommand = selReplicaInfoCmd


        'Specify the command to select local replica metadata. 
        'Update the Sales.ScopeInfo metadata table with values
        'from session variables.
        Dim updReplicaInfoCmd As New SqlCommand()

        With updReplicaInfoCmd
            .CommandType = CommandType.Text
            .CommandText = "UPDATE  Sales.ScopeInfo SET " _
                         & "scope_sync_knowledge = @" + DbSyncSession.SyncScopeKnowledge + ", " _
                         & "scope_tombstone_cleanup_knowledge = @" + DbSyncSession.SyncScopeCleanupKnowledge + " " _
                         & "WHERE scope_name = @" + DbSyncSession.SyncScopeName + " AND " _
                         & " ( @" + DbSyncSession.SyncCheckConcurrency + " = 0 or scope_timestamp = @" + DbSyncSession.SyncScopeTimestamp + "); " _
                         & "SET @" + DbSyncSession.SyncRowCount + " = @@rowcount"
            .Parameters.Add("@" + DbSyncSession.SyncScopeKnowledge, SqlDbType.VarBinary, 10000)
            .Parameters.Add("@" + DbSyncSession.SyncScopeCleanupKnowledge, SqlDbType.VarBinary, 10000)
            .Parameters.Add("@" + DbSyncSession.SyncScopeName, SqlDbType.NVarChar, 100)
            .Parameters.Add("@" + DbSyncSession.SyncCheckConcurrency, SqlDbType.Int)
            .Parameters.Add("@" + DbSyncSession.SyncScopeTimestamp, SqlDbType.BigInt)
            .Parameters.Add("@" + DbSyncSession.SyncRowCount, SqlDbType.Int).Direction = ParameterDirection.Output
        End With

        peerProvider.UpdateScopeInfoCommand = updReplicaInfoCmd

        Return peerProvider

    End Function 'SetupSyncProvider
End Class 'SampleSyncProvider


'Handle the statistics that are returned by the SyncAgent.
Public Class SampleStats

    Public Sub DisplayStats(ByVal syncStatistics As SyncOperationStatistics, ByVal syncType As String)
        Console.WriteLine(String.Empty)
        If syncType = "initial" Then
            Console.WriteLine("****** Initial Synchronization ******")
        ElseIf syncType = "subsequent" Then
            Console.WriteLine("***** Subsequent Synchronization ****")
        End If

        Console.WriteLine("Start Time: " & syncStatistics.SyncStartTime)
        Console.WriteLine("Total Changes Uploaded: " & syncStatistics.UploadChangesTotal)
        Console.WriteLine("Total Changes Downloaded: " & syncStatistics.DownloadChangesTotal)
        Console.WriteLine("Complete Time: " & syncStatistics.SyncEndTime)
        Console.WriteLine(String.Empty)

    End Sub 'DisplayStats
End Class 'SampleStats

请参阅

概念

对常见对等同步任务进行编程