如何分批传递变更 (SQL Server)

本主题说明如何在使用 SqlSyncProviderSqlCeSyncProviderDbSyncProvider 的 Sync Framework 中为数据库同步分批传递变更。本主题中的代码主要涉及以下 Sync Framework 类:

了解批处理

默认情况下,Sync Framework 在单个 DataSet 对象中将变更传送给每个节点。变更被应用到节点时,将此对象保留在内存中。如果应用变更的计算机上有足够的内存并且与该计算机的连接可靠,则默认行为很适用。但是,将变更划分为批次可能对某些应用程序更有利。请考虑以下针对同步应用程序的方案:

  • 使用 SqlCeSyncProvider 的大量客户端定期与使用 SqlSyncProvider 的服务器同步。

  • 每个客户端都具有有限的内存量和磁盘空间。

  • 服务器和客户端之间的连接为低带宽连接并且时断时续,常常导致很长的同步时间和连接掉线。

  • 典型同步会话的变更大小(以 KB 为单位)较大。

批处理变更十分适合于此类情况,因为它提供以下功能:

  • 使开发人员能够控制用于在客户端存储变更的内存量(内存数据缓存大小)。这可以消除客户端上的内存不足错误。

  • 使 Sync Framework 能够从当前批次开始时(而非整个变更集合开始时)重新启动失败的同步操作。

  • 可以减少或消除由于失败的操作而在服务器上重新下载变更或重新枚举变更的需要。

为双层和 N 层应用程序配置批处理十分简单,并且批处理可用于初始同步会话和后续会话。

配置和使用批处理

Sync Framework 中批处理的工作方式如下:

  1. 应用程序为参与同步会话的每个提供程序指定内存数据缓存大小。

    如果两个提供程序都指定一个缓存大小,则 Sync Framework 使用其中的较小值。实际缓存大小将不超过最小指定大小的 110%。在同步会话期间,如果单行大于该大小的 110%,则该会话将终止并显示异常。

    值为 0(默认值)将禁用批处理。如果一个提供程序启用批处理,而另一个提供程序未启用,则为上载和下载均启用批处理。

  2. 应用程序为每个提供程序指定假脱机文件的位置。默认情况下,假脱机文件写入同步进程运行所基于的帐户的临时目录中。

  3. 应用程序调用 Synchronize

  4. Sync Framework 一次一行枚举变更。如果达到了源提供程序的内存数据缓存大小,则变更将持久保留在本地假脱机文件中,并且将刷新内存中数据。此过程在枚举了所有变更前将继续执行。

  5. 对于 N 层方案,应用程序中的服务和代理代码将假脱机文件传输到目标中。有关更多信息,请参见本主题中的特定于 N 层的代码。对于双层方案,本地文件已位于目标中,因为在此情况下,所有同步代码都在目标运行。

  6. Sync Framework 从假脱机文件反序列化变更并应用这些变更。此过程将继续执行直到所有变更都应用于目标。

    所有批都在一个事务中应用。直到目标提供程序接收到最后一个批后,才会创建该事务。

  7. 对于双层方案,Sync Framework 清除假脱机文件。对于 N 层方案,Sync Framework 清除发起同步的计算机上的假脱机文件,但中间层上的文件应由代理清除(在本主题后面的 Cleanup() 方法示例中予以说明)。为了处理会话中止的情况,中间层还应使用一个进程来清除早于某个日期的文件。

备注

将应用于节点的数据变更可以从 DbChangesSelectedEventArgs 对象的 Context 属性获得。当对数据进行批处理时,ChangesSelected 事件只激发一次,并且所有变更都从 Context 属性中提供。当对数据进行批处理时,ChangesSelected 将针对每个批触发,此时只提供该批中的变更。如果您要求所有批中的变更,则响应每个 ChangesSelected 事件,并存储返回的数据。

下表说明了与批处理有关的类型和成员。批处理所需的唯一属性是 MemoryDataCacheSize,但是也建议设置 BatchingDirectory

类型或成员 说明

BatchingDirectory

获取或设置批文件假脱机保存到磁盘中的目录。指定的路径必须是正执行的提供程序或代理的本地目录。不支持 UNC 文件路径和非文件 URI 路径。

Note重要事项
假脱机文件包含原始数据库数据。文件写入的目录必须用适当的访问控制加以保护。

CleanupBatchingDirectory

获取或设置在文件中的变更已应用于目标后是否清除批处理文件。默认设置是清除这些文件。

MemoryDataCacheSize

获取或设置在将这些变更假脱机保存到磁盘前 Sync Framework 用于缓存变更的最大内存量,以 KB 为单位。

Note注意
此设置仅影响在内存中为发送到目标的变更保存的数据和元数据的大小。它并不限制其他 Sync Framework 组件或用户应用程序组件使用的内存。

BatchApplied

在每个变更批已应用于目标后发生的事件。

BatchSpooled

在每个变更批已写入磁盘后发生的事件。

DbBatchAppliedEventArgs

BatchApplied 事件提供数据,包括当前批号以及要应用的批的总数。

DbBatchSpooledEventArgs

BatchSpooled 事件提供数据,包括当前批号以及批大小。

BatchFileName

获取或设置假脱机的变更要写入的文件的名称。

IsDataBatched

获取或设置数据是在多个批中发送还是在单个 DataSet 对象中发送。

IsLastBatch

获取或设置当前批是否是变更的最后的批。

BatchedDeletesRetried

获取或设置对变更执行批处理的同步会话期间重试的删除操作的次数。

由于主键和外键删除的顺序而为批重试删除。如果某一外键删除在当前批或更早的批中不存在,则相应的主键删除将失败。一旦应用了所有的批后,将重试失败的删除。

SelectIncrementalChangesCommand(仅适用于 DbSyncProvider

获取或设置用于从本地数据库选择增量变更的查询或存储过程。

Note注意
建议指定的查询包括子句 ORDER BY [sync_row_timestamp]。按时间戳值为行排序将确保如果重新启动某一同步会话,提供程序将开始从最高的时间戳水印进行枚举(各个表水印将与每个批一起保存)并且不会丢失任何变更。

DataTable

获取或设置包含要同步的变更的 DataTable 对象。如果启用批处理,访问此属性将反序列化来自磁盘的假脱机文件。然后,对表进行的任何变更将保存回该假脱机文件。

DataSet

获取或设置 DataSet 对象,其中包含对等数据库中的选定行。如果 IsDataBatchedtrue,则返回 Null。

用于双层和 N 层的常见代码

本节中的代码示例说明如何在双层和 N 层方案中处理批处理。此代码摘自 Sync Framework SDK 中包括的两个示例:SharingAppDemo-CEProviderEndToEndWebSharingAppDemo-CEProviderEndToEnd。每个示例都使用代码位置引入,例如 SharingAppDemo/CESharingForm。就批处理而言,两个应用程序之间的主要差异在于在 N 层情况中要求一些附加的代码,以便上载和下载假脱机文件并为每个枚举变更的节点创建目录。

下面的代码示例来自 SharingAppDemo/CESharingForm 中的 synchronizeBtn_Click 事件处理程序,它设置内存数据缓存大小以及假脱机文件应写入的目录。为 BatchingDirectory 指定的路径必须是正执行的提供程序或代理的本地目录。不支持 UNC 文件路径和非文件 URI 路径。为 BatchingDirectory 指定的路径是根目录。对于每个同步会话,Sync Framework 都创建一个唯一的子目录,用于存储该会话的假脱机文件。该目录对于当前的源-目标组合是唯一的,以便隔离不同会话的文件。

下面的代码示例来自 WebSharingAppDemo/CESharingForm 中的 synchronizeBtn_Click 事件处理程序,它设置相同的属性,但目标的批处理目录是为代理设置的而非直接为提供程序设置,因为它位于双层方案中:

//Set memory data cache size property. 0 represents non batched mode.
//No need to set memory cache size for Proxy, because the source is 
//enabled for batching: both upload and download will be batched.
srcProvider.MemoryDataCacheSize = this._batchSize;
 

//Set batch spool location. Default value if not set is %Temp% directory.
if (!string.IsNullOrEmpty(this.batchSpoolLocation.Text))
{
    srcProvider.BatchingDirectory = this.batchSpoolLocation.Text;
    destinationProxy.BatchingDirectory = this.batchSpoolLocation.Text;
}

下面的代码示例来自这两个应用程序中的 SynchronizationHelper 文件,它创建方法以便处理在枚举变更和应用变更期间由提供程序引发的 BatchSpooledBatchAppliedEvents

void provider_BatchSpooled(object sender, DbBatchSpooledEventArgs e)
{
    this.progressForm.listSyncProgress.Items.Add("BatchSpooled event fired: Details");
    this.progressForm.listSyncProgress.Items.Add("\tSource Database :" + ((RelationalSyncProvider)sender).Connection.Database);
    this.progressForm.listSyncProgress.Items.Add("\tBatch Name      :" + e.BatchFileName);
    this.progressForm.listSyncProgress.Items.Add("\tBatch Size      :" + e.DataCacheSize);
    this.progressForm.listSyncProgress.Items.Add("\tBatch Number    :" + e.CurrentBatchNumber);
    this.progressForm.listSyncProgress.Items.Add("\tTotal Batches   :" + e.TotalBatchesSpooled);
    this.progressForm.listSyncProgress.Items.Add("\tBatch Watermark :" + ReadTableWatermarks(e.CurrentBatchTableWatermarks));
}
void provider_BatchApplied(object sender, DbBatchAppliedEventArgs e)
{
    this.progressForm.listSyncProgress.Items.Add("BatchApplied event fired: Details");
    this.progressForm.listSyncProgress.Items.Add("\tDestination Database   :" + ((RelationalSyncProvider)sender).Connection.Database);
    this.progressForm.listSyncProgress.Items.Add("\tBatch Number           :" + e.CurrentBatchNumber);
    this.progressForm.listSyncProgress.Items.Add("\tTotal Batches To Apply :" + e.TotalBatchesToApply);
}
//Reads the watermarks for each table from the batch spooled event. //The watermark denotes the max tickcount for each table in each batch.
private string ReadTableWatermarks(Dictionary<string, ulong> dictionary)
{
    StringBuilder builder = new StringBuilder();
    Dictionary<string, ulong> dictionaryClone = new Dictionary<string, ulong>(dictionary);
    foreach (KeyValuePair<string, ulong> kvp in dictionaryClone)
    {
        builder.Append(kvp.Key).Append(":").Append(kvp.Value).Append(",");
    }
    return builder.ToString();
}

特定于 N 层的代码

其余的代码示例仅应用于 WebSharingAppDemo 中的 N 层方案。相关的 N 层代码包含在三个文件中:

  • 服务约定:IRelationalSyncContract

  • Web 服务:RelationalWebSyncService

  • 代理:RelationalProviderProxy

SqlSyncProviderSqlCeSyncProvider 这两个提供程序均继承自 RelationalSyncProvider,因此,该代码应用于这两个提供程序。其他的特定于存储区的功能为每种提供程序划分为代理文件和服务文件。

为了理解批处理在 N 层方案中的工作方式,请考虑服务器是源、客户端是目标的同步会话。在变更已写入服务器上的本地目录后,对于下载的变更将发生以下过程:

  1. 对客户端代理调用 GetChangeBatch 方法。如示例代码中的后面所示,此方法应包括用于处理批处理的特定代码。

  2. 该服务从 SqlSyncProvider 获取一个批文件。该服务删除整个路径信息并且通过网络只发送文件名。这将防止向客户端公开服务器的目录结构。

  3. GetChangeBatch 的代理调用将返回。

    1. 该代理将检测到对变更已执行了批处理,因此它通过将该批文件名称作为参数传递,调用 DownloadBatchFile

    2. 该代理将在 RelationalProviderProxy.BatchingDirectory 下创建一个唯一目录(如果对于该会话不存在这样的目录),以便在本地存放这些批文件。目录名称是正在对变更进行枚举的对等方的副本 ID。这确保代理和服务对于每个枚举对等方具有一个唯一的目录。

  4. 该代理下载文件并在本地存储它。该代理用指向本地磁盘上的批文件的新完整路径替代上下文中的文件名。

  5. 该代理将上下文返回到控制器。

  6. 重复步骤 1 到步骤 6,直到该代理接收了最后一个批。

对于上载的变更将发生以下过程

  1. 控制器对代理调用 ProcessChangeBatch

  2. 该代理将确定它是一个批文件,这样它执行以下步骤:

    1. 删除整个路径信息并且通过网络只发送文件名。

    2. 调用 HasUploadedBatchFile 以确定该文件是否已上载。如果已上载,则无需执行步骤 C。

    3. 如果 HasUploadedBatchFile 返回 false,则对该服务调用 UploadBatchFile,并且上载批文件内容。

      该服务将接收对 UploadBatchFile 的调用并且将在本地存储该批。目录创建类似于上面的步骤 4。

    4. 对该服务调用 ApplyChanges

  3. 服务器将接收 ApplyChanges 调用并且确定它是一个批文件。它用指向本地磁盘上的批文件的新完整路径替代上下文中的文件名。

  4. 服务器将 DbSyncContext 传递给本地 SqlSyncProvider

  5. 重复步骤 1 到步骤 6,直到发送了最后一个批。

下面的代码示例来自 IRelationalSyncContract,它指定用于向中间层传输假脱机文件和从中间层传输假脱机文件的上载和下载方法:

[OperationContract(IsOneWay = true)]
void UploadBatchFile(string batchFileid, byte[] batchFile);

[OperationContract]
byte[] DownloadBatchFile(string batchFileId);

下面的代码示例来自 RelationalWebSyncService,它公开在约定中定义的 UploadBatchFileDownloadBatchFile 方法,并且在以下方法中包括附加的批处理相关的逻辑:

  • Cleanup:从指定的目录或临时目录(如果未指定目录)清除所有假脱机文件。

  • GetChanges:检查是否对数据执行了批处理;如果执行了批处理,它将删除假脱机文件的目录路径,以便不通过网络发送该路径。在 N 层方案中,通过网络连接发送完整目录路径存在安全风险。该文件名是 GUID。

  • HasUploadedBatchFile:返回特定批文件是否已上载到服务。

  • ApplyChanges:检查是否对数据执行了批处理;如果执行了批处理,它将检查是否已上载预期的批文件。如果尚未上载该文件,将引发异常。客户端应该已在调用 ApplyChanges 前上载了假脱机文件。

public abstract class RelationalWebSyncService: IRelationalSyncContract
{
    protected bool isProxyToCompactDatabase;
    protected RelationalSyncProvider peerProvider;
    protected DirectoryInfo sessionBatchingDirectory = null;
    protected Dictionary<string, string> batchIdToFileMapper;
    int batchCount = 0;

    public void Initialize(string scopeName, string hostName)
    {
        this.peerProvider = this.ConfigureProvider(scopeName, hostName);
        this.batchIdToFileMapper = new Dictionary<string, string>();
    }

    public void Cleanup()
    {
        this.peerProvider = null;
        //Delete all file in the temp session directory
        if (sessionBatchingDirectory != null && sessionBatchingDirectory.Exists)
        {
            try
            {
                sessionBatchingDirectory.Delete(true);
            }
            catch 
            { 
                //Ignore 
            }
        }
    }

    public void BeginSession(SyncProviderPosition position)
    {
        Log("*****************************************************************");
        Log("******************** New Sync Session ***************************");
        Log("*****************************************************************");
        Log("BeginSession: ScopeName: {0}, Position: {1}", this.peerProvider.ScopeName, position);
        //Clean the mapper for each session.
        this.batchIdToFileMapper = new Dictionary<string, string>();

        this.peerProvider.BeginSession(position, null/*SyncSessionContext*/);
        this.batchCount = 0;
    }

    public SyncBatchParameters GetKnowledge()
    {
        Log("GetSyncBatchParameters: {0}", this.peerProvider.Connection.ConnectionString);
        SyncBatchParameters destParameters = new SyncBatchParameters();
        this.peerProvider.GetSyncBatchParameters(out destParameters.BatchSize, out destParameters.DestinationKnowledge);
        return destParameters;
    }

    public GetChangesParameters GetChanges(uint batchSize, SyncKnowledge destinationKnowledge)
    {
        Log("GetChangeBatch: {0}", this.peerProvider.Connection.ConnectionString);
        GetChangesParameters changesWrapper = new GetChangesParameters();
        changesWrapper.ChangeBatch  = this.peerProvider.GetChangeBatch(batchSize, destinationKnowledge, out changesWrapper.DataRetriever);

        DbSyncContext context = changesWrapper.DataRetriever as DbSyncContext;
        //Check to see if data is batched
        if (context != null && context.IsDataBatched)
        {
            Log("GetChangeBatch: Data Batched. Current Batch #:{0}", ++this.batchCount);
            //Dont send the file location info. Just send the file name
            string fileName = new FileInfo(context.BatchFileName).Name;
            this.batchIdToFileMapper[fileName] = context.BatchFileName;
            context.BatchFileName = fileName;
        }
        return changesWrapper;
    }

    public SyncSessionStatistics ApplyChanges(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges, object changeData)
    {
        Log("ProcessChangeBatch: {0}", this.peerProvider.Connection.ConnectionString);

        DbSyncContext dataRetriever = changeData as DbSyncContext;

        if (dataRetriever != null && dataRetriever.IsDataBatched)
        {
            string remotePeerId = dataRetriever.MadeWithKnowledge.ReplicaId.ToString();
            //Data is batched. The client should have uploaded this file to us prior to calling ApplyChanges.
            //So look for it.
            //The Id would be the DbSyncContext.BatchFileName which is just the batch file name without the complete path
            string localBatchFileName = null;
            if (!this.batchIdToFileMapper.TryGetValue(dataRetriever.BatchFileName, out localBatchFileName))
            {
                //Service has not received this file. Throw exception
                throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("No batch file uploaded for id " + dataRetriever.BatchFileName, null));
            }
            dataRetriever.BatchFileName = localBatchFileName;
        }

        SyncSessionStatistics sessionStatistics = new SyncSessionStatistics();
        this.peerProvider.ProcessChangeBatch(resolutionPolicy, sourceChanges, changeData, new SyncCallbacks(), sessionStatistics);
        return sessionStatistics;
    }

    public void EndSession()
    {
        Log("EndSession: {0}", this.peerProvider.Connection.ConnectionString);
        Log("*****************************************************************");
        Log("******************** End Sync Session ***************************");
        Log("*****************************************************************");
        this.peerProvider.EndSession(null);
        Log("");
    }

    /// <summary>
    /// Used by proxy to see if the batch file has already been uploaded. Optimizes by not resending batch files.
    /// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
    /// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
    /// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
    /// without doing proper security analysis.
    /// 
    /// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
    /// 
    /// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
    /// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
    /// </summary>
    /// <param name="batchFileId"></param>
    /// <returns>bool</returns>
    public bool HasUploadedBatchFile(String batchFileId, string remotePeerId)
    {
        this.CheckAndCreateBatchingDirectory(remotePeerId);

        //The batchFileId is the fileName without the path information in it.
        FileInfo fileInfo = new FileInfo(Path.Combine(this.sessionBatchingDirectory.FullName, batchFileId));
        if (fileInfo.Exists && !this.batchIdToFileMapper.ContainsKey(batchFileId))
        {
            //If file exists but is not in the memory id to location mapper then add it to the mapping
            this.batchIdToFileMapper.Add(batchFileId, fileInfo.FullName);
        }
        //Check to see if the proxy has already uploaded this file to the service
        return fileInfo.Exists;
    }

    /// <summary>
    /// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
    /// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
    /// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
    /// without doing proper security analysis.
    /// 
    /// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
    /// 
    /// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
    /// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
    /// </summary>
    /// <param name="batchFileId"></param>
    /// <param name="batchContents"></param>
    /// <param name="remotePeerId"></param>
    public void UploadBatchFile(string batchFileId, byte[] batchContents, string remotePeerId)
    {
        Log("UploadBatchFile: {0}", this.peerProvider.Connection.ConnectionString);
        try
        {
            if (HasUploadedBatchFile(batchFileId, remotePeerId))
            {
                //Service has already received this file. So dont save it again.
                return;
            }
            
            //Service hasnt seen the file yet so save it.
            String localFileLocation = Path.Combine(sessionBatchingDirectory.FullName, batchFileId);
            FileStream fs = new FileStream(localFileLocation, FileMode.Create, FileAccess.Write);
            using (fs)
            {
                    fs.Write(batchContents, 0, batchContents.Length);
            }
            //Save this Id to file location mapping in the mapper object
            this.batchIdToFileMapper[batchFileId] = localFileLocation;
        }
        catch (Exception e)
        {
            throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to save batch file.", e));
        }
    }

    /// <summary>
    /// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
    /// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
    /// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
    /// without doing proper security analysis.
    /// 
    /// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
    /// 
    /// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
    /// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
    /// </summary>
    /// <param name="batchFileId"></param>
    /// <returns></returns>
    public byte[] DownloadBatchFile(string batchFileId)
    {
        try
        {
            Log("DownloadBatchFile: {0}", this.peerProvider.Connection.ConnectionString);
            Stream localFileStream = null;

            string localBatchFileName = null;

            if (!this.batchIdToFileMapper.TryGetValue(batchFileId, out localBatchFileName))
            {
                throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to retrieve batch file for id." + batchFileId, null));
            }

            localFileStream = new FileStream(localBatchFileName, FileMode.Open, FileAccess.Read);
            byte[] contents = new byte[localFileStream.Length];
            localFileStream.Read(contents, 0, contents.Length);
            return contents;
        }
        catch (Exception e)
        {
            throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to read batch file for id " + batchFileId, e));
        }
    }

    protected void Log(string p, params object[] paramArgs)
    {
        Console.WriteLine(p, paramArgs);
    }

    //Utility functions that the sub classes need to implement.
    protected abstract RelationalSyncProvider ConfigureProvider(string scopeName, string hostName);


    private void CheckAndCreateBatchingDirectory(string remotePeerId)
    {
        //Check to see if we have temp directory for this session.
        if (sessionBatchingDirectory == null)
        {
            //Generate a unique Id for the directory
            //We use the peer id of the store enumerating the changes so that the local temp directory is same for a given source
            //across sync sessions. This enables us to restart a failed sync by not downloading already received files.
            string sessionDir = Path.Combine(this.peerProvider.BatchingDirectory, "WebSync_" + remotePeerId);
            sessionBatchingDirectory = new DirectoryInfo(sessionDir);
            //Create the directory if it doesnt exist.
            if (!sessionBatchingDirectory.Exists)
            {
                sessionBatchingDirectory.Create();
            }
        }
    }
}

下面的代码示例来自 RelationalProviderProxy,它设置属性并对 Web 服务调用方法:

  • BatchingDirectory:使应用程序能够为中间层设置批处理目录。

  • EndSession:从指定的目录中清除所有假脱机文件。

  • GetChangeBatch:通过调用 DownloadBatchFile 方法下载变更批。

  • ProcessChangeBatch:通过调用 UploadBatchFile 方法上载变更批。

public abstract class RelationalProviderProxy : KnowledgeSyncProvider, IDisposable
{
    protected IRelationalSyncContract proxy;
    protected SyncIdFormatGroup idFormatGroup;
    protected string scopeName;
    protected DirectoryInfo localBatchingDirectory;

    //Represents either the SQL server host name or the CE database file name. Sql database name
    //is always peer1
    //For this sample scopeName is always Sales
    protected string hostName;

    private string batchingDirectory = Environment.ExpandEnvironmentVariables("%TEMP%");

    public string BatchingDirectory
    {
        get { return batchingDirectory; }
        set 
        {
            if (string.IsNullOrEmpty(value))
            {
                throw new ArgumentException("value cannot be null or empty");
            }
            try
            {
                Uri uri = new Uri(value);
                if (!uri.IsFile || uri.IsUnc)
                {
                    throw new ArgumentException("value must be a local directory");
                }
                batchingDirectory = value;
            }
            catch (Exception e)
            {
                throw new ArgumentException("Invalid batching directory.", e);
            }
        }
    }

    public RelationalProviderProxy(string scopeName, string hostName)
    {
        this.scopeName = scopeName;
        this.hostName = hostName;
        this.CreateProxy();            
        this.proxy.Initialize(scopeName, hostName);
    }

    public override void BeginSession(SyncProviderPosition position, SyncSessionContext syncSessionContext)
    {
        this.proxy.BeginSession(position);
    }

    public override void EndSession(SyncSessionContext syncSessionContext)
    {
        proxy.EndSession();
        if (this.localBatchingDirectory != null && this.localBatchingDirectory.Exists)
        {
            //Cleanup batching releated files from this session
            this.localBatchingDirectory.Delete(true);
        }
    }

    public override ChangeBatch GetChangeBatch(uint batchSize, SyncKnowledge destinationKnowledge, out object changeDataRetriever)
    {
        GetChangesParameters changesWrapper = proxy.GetChanges(batchSize, destinationKnowledge);
        //Retrieve the ChangeDataRetriever and the ChangeBatch
        changeDataRetriever = changesWrapper.DataRetriever;

        DbSyncContext context = changeDataRetriever as DbSyncContext;
        //Check to see if the data is batched.
        if (context != null && context.IsDataBatched)
        {
            if (this.localBatchingDirectory == null)
            {
                //Retrieve the remote peer id from the MadeWithKnowledge.ReplicaId. MadeWithKnowledge is the local knowledge of the peer 
                //that is enumerating the changes.
                string remotePeerId = context.MadeWithKnowledge.ReplicaId.ToString();

                //Generate a unique Id for the directory.
                //We use the peer id of the store enumerating the changes so that the local temp directory is same for a given source
                //across sync sessions. This enables us to restart a failed sync by not downloading already received files.
                string sessionDir = Path.Combine(this.batchingDirectory, "WebSync_" + remotePeerId);
                this.localBatchingDirectory = new DirectoryInfo(sessionDir);
                //Create the directory if it doesnt exist.
                if (!this.localBatchingDirectory.Exists)
                {
                    this.localBatchingDirectory.Create();
                }
            }

            string localFileName = Path.Combine(this.localBatchingDirectory.FullName, context.BatchFileName);
            FileInfo localFileInfo = new FileInfo(localFileName);
            
            //Download the file only if doesnt exist
            FileStream localFileStream = new FileStream(localFileName, FileMode.Create, FileAccess.Write);
            if (!localFileInfo.Exists)
            {
                byte[] remoteFileContents = this.proxy.DownloadBatchFile(context.BatchFileName);
                using (localFileStream)
                {
                    localFileStream.Write(remoteFileContents, 0, remoteFileContents.Length);
                }
            }
            //Set DbSyncContext.Batchfile name to the new local file name
            context.BatchFileName = localFileName;
        }

        return changesWrapper.ChangeBatch;
    }

    public override FullEnumerationChangeBatch GetFullEnumerationChangeBatch(uint batchSize, SyncId lowerEnumerationBound, SyncKnowledge knowledgeForDataRetrieval, out object changeDataRetriever)
    {
        throw new NotImplementedException();
    }

    public override void GetSyncBatchParameters(out uint batchSize, out SyncKnowledge knowledge)
    {
        SyncBatchParameters wrapper = proxy.GetKnowledge();
        batchSize = wrapper.BatchSize;
        knowledge = wrapper.DestinationKnowledge;
    }

    public override SyncIdFormatGroup IdFormats
    {
        get
        {
            if (idFormatGroup == null)
            {
                idFormatGroup = new SyncIdFormatGroup();

                //
                // 1 byte change unit id (Harmonica default before flexible ids)
                //
                idFormatGroup.ChangeUnitIdFormat.IsVariableLength = false;
                idFormatGroup.ChangeUnitIdFormat.Length = 1;

                //
                // Guid replica id
                //
                idFormatGroup.ReplicaIdFormat.IsVariableLength = false;
                idFormatGroup.ReplicaIdFormat.Length = 16;


                //
                // Sync global id for item ids
                //
                idFormatGroup.ItemIdFormat.IsVariableLength = true;
                idFormatGroup.ItemIdFormat.Length = 10 * 1024;
            }

            return idFormatGroup;
        }
    }

    public override void ProcessChangeBatch(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallbacks, SyncSessionStatistics sessionStatistics)
    {
        DbSyncContext context = changeDataRetriever as DbSyncContext;
        if (context != null && context.IsDataBatched)
        {
            string fileName = new FileInfo(context.BatchFileName).Name;

            //Retrieve the remote peer id from the MadeWithKnowledge.ReplicaId. MadeWithKnowledge is the local knowledge of the peer 
            //that is enumerating the changes.
            string peerId = context.MadeWithKnowledge.ReplicaId.ToString();

            //Check to see if service already has this file
            if (!this.proxy.HasUploadedBatchFile(fileName, peerId))
            {
                //Upload this file to remote service
                FileStream stream = new FileStream(context.BatchFileName, FileMode.Open, FileAccess.Read);
                byte[] contents = new byte[stream.Length];
                using (stream)
                {
                    stream.Read(contents, 0, contents.Length);
                }
                this.proxy.UploadBatchFile(fileName, contents, peerId);
            }

            context.BatchFileName = fileName;
        }
        this.proxy.ApplyChanges(resolutionPolicy, sourceChanges, changeDataRetriever);
    }

    public override void ProcessFullEnumerationChangeBatch(ConflictResolutionPolicy resolutionPolicy, FullEnumerationChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallbacks, SyncSessionStatistics sessionStatistics)
    {
        throw new NotImplementedException();
    }

    protected abstract void CreateProxy();

    #region IDisposable Members

    public void Dispose()
    {
        this.proxy.Cleanup();
        this.proxy = null;
        GC.SuppressFinalize(this);
    }

    #endregion
}

请参阅

概念

同步 SQL Server 和 SQL Server Compact