生成弹性 StreamInsight 应用程序
本主题介绍创建弹性 StreamInsight 应用程序所需的步骤。
仅在 StreamInsight 的 Premium 版本中提供弹性。有关详细信息,请参阅选择 StreamInsight 版本。
有关包括重播和取消复制的弹性应用程序的端到端代码示例,请参阅 Codeplex 上的 StreamInsight 示例页中的“检查点示例”。
主题内容
步骤 1.配置弹性服务器
步骤 2.定义弹性查询
步骤 3.捕获检查点
步骤 4.在输入适配器中重播事件
步骤 5.删除输出适配器中的重复项
步骤 6.从失败中恢复
在不禁用恢复的情况下关闭
示例
步骤 1. 配置弹性服务器
所需设置
若要配置弹性服务器,请在创建服务器时为以下配置设置提供值:
元数据存储区。您必须使用 SQL Server Compact 存储服务器元数据;元数据不能存储于内存中。
日志路径。此设置确定为弹性查询存储检查点数据的位置。该路径的默认值是 StreamInsight 进程的工作目录。相关设置 CreateLogPathIfMissing 确定是否创建指定目录(如果该目录不存在)。
通过为弹性配置服务器,可以支持捕获检查点,但不导致捕获检查点。有关调用检查点的信息,请参阅步骤 3.捕获检查点。
管理检查点日志路径
为了避免未授权地读取或篡改检查点文件,请确保设置包含文件夹的权限,以便只有可信实体才具有访问权限。
StreamInsight 的每个实例都应该具有自己的日志路径。
请确保承载 StreamInsight 的进程对于指定文件夹具有读写权限。
不要删除该文件夹的内容。StreamInsight 将在不再需要检查点文件时删除它们。
进程外服务器
在客户端通过调用 Server.Connect 连接到的进程外服务器中,弹性配置由设置服务器的人士提供。如果进程外服务器具有弹性配置,则客户端可按照配置使用弹性功能;如果服务器不具有弹性配置,则客户端将无法使用弹性功能。
用于指定弹性选项的方法
可使用下列方法之一指定弹性设置:
通过在调用 Server.Create 时提供弹性配置,以编程方式指定这些设置。
在应用程序配置文件中以声明方式指定这些设置。
对于进程内服务器,此为 app.config 文件。
对于进程外服务器,此为 StreamInsightHost.exe.config 文件,该文件位于 StreamInsight 安装文件夹下的 Host 文件夹中。
如果您使用这两种方法,则在 API 调用中指定的设置将覆盖配置文件中的设置。
以编程方式创建弹性服务器
以下示例显示了如何以编程方式创建弹性的进程内服务器。有关详细示例,请参阅示例。尝试捕获在调用 Server.Create 时将导致检查点失败的任何异常。
SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server =
Server.Create("Default", metadataConfig, recoveryConfig))
以声明方式创建弹性服务器
以下示例说明如何通过使用配置文件以声明方式创建弹性服务器。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
<appSettings>
<add key="InstanceName" value="Default"/>
<add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
<add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
<add key="CheckpointEnabled" value="true"/>
<add key="CheckpointLogPath" value="CepLogPath"/>
<add key="CreateCheckpointLogPathIfMissing" value="true"/>
</appSettings>
<runtime>
<gcServer enabled="true"/>
</runtime>
</configuration>
页首
步骤 2. 定义弹性查询
若要创建弹性查询,请在代码中包含以下步骤。
在创建新查询之前,请查看该查询是否已在元数据中存在。如果该查询已存在,则表示应用程序已从失败中恢复。您的代码必须重新开始该查询,而非重新创建它。
如果该查询在元数据中不存在,则通过为 ToQuery 方法的 IsResilient 参数指定 true,创建该查询并将其定义为弹性查询。您也可以使用 IsResilient 参数调用 Application.CreateQuery 方法。
通过为弹性配置查询,可以支持捕获检查点,但不导致捕获检查点。有关调用检查点的信息,请参阅步骤 3.捕获检查点。
定义弹性查询的示例
有关详细示例,请参阅示例。
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true);
页首
步骤 3. 捕获检查点
在运行一个或多个查询后,定期捕获检查点以便记录查询的状态。
支持检查点的 API 方法将按照用于异步操作的典型模式执行。
若要调用检查点,请调用 BeginCheckpoint 方法。如果您提供可选的 AsyncCallback,则在完成检查点后调用它。从调用 BeginCheckpoint 返回的 IAsyncResult 将标识此检查点请求并且可在以后调用 EndCheckpoint 或 CancelCheckpoint 时使用。
/// <summary> /// Take an asynchronous checkpoint for the query. /// </summary> /// <param name="query">The query to checkpoint.</param> /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param> /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param> /// <returns></returns> IAsyncResult BeginCheckpoint( Query query, AsyncCallback asyncCallback, Object asyncState);
在完成检查点操作前,EndCheckpoint 方法将进行阻止。如果检查点操作成功,则该调用将返回 true;如果发生错误,则该调用将引发异常。
/// <summary> /// Waits for the pending asynchronous checkpoint request to complete. /// </summary> /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param> /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns> bool EndCheckpoint( IAsyncResult asyncResult);
您还可以调用 CancelCheckpoint 以便取消检查点进程。在对 CancelCheckpoint 的调用成功后,随后对 EndCheckpoint 的调用将返回 false。
/// <summary> /// Cancels the pending asynchronous checkpoint request. /// </summary> /// <param name="asyncResult">The asyncResult handle identifying the call.</param> void CancelCheckpoint( IAsyncResult asyncResult);
可通过三种不同的方式使用此异步模式:
可在调用 EndCheckpoint 后调用 BeginCheckpoint。然后,在完成检查点操作前 EndCheckpoint 将进行阻止,之后返回结果(或异常)。在此模式中,通常不使用 asyncCallback 和 asyncState。
可以调用 BeginCheckpoint,然后,用户可以轮询返回的 IAsyncResult 的 IsCompleted 属性。在 IsCompleted 为 true 时,可调用 EndCheckpoint 以便检索结果。在此模式中,通常不使用 asyncCallback 和 asyncState。
可以使用回调方法调用 BeginCheckpoint。在此情况下,可使用 asyncState 标识调用并且向回调方法返回任何所需信息。在回调运行时,它将调用 EndCheckpoint 以便检索结果。
必须调用 EndCheckpoint 方法,而与所使用的模式无关,甚至在取消该检查点时也是如此。此方法是用户从调用中获取返回值的唯一方法,也是 StreamInsight 知道调用完成的唯一方法。在调用 EndCheckpoint 之前,无法开始其他检查点。
在检查点进程中发生的错误不会停止或影响关联查询。如果您在某一检查点操作正在进行中时停止某一查询,则该检查点将被取消。
页首
步骤 4. 在输入适配器中重播事件
为了支持事件重播作为恢复的一部分,输入适配器工厂必须实现 IHighWaterMarkInputAdapterFactory 或 IHighWaterMarkTypedInputAdapterFactory 接口。然后,对适配器工厂的 Create 方法的调用将提供有助于适配器标识要重播的事件的高水印。
为确保输出是“完整的”,所有输入适配器都必须重播物理流中在高水印指示的位置处或之后发生的所有事件。
页首
步骤 5. 删除输出适配器中的重复项
为了支持删除重复项作为恢复的一部分,输出适配器工厂必须实现 IHighWaterMarkOutputAdapterFactory 或 IHighWaterMarkTypedOutputAdapterFactory 接口。然后,对适配器工厂的 Create 方法的调用将提供有助于适配器标识重复值的高水印和偏移值。此偏移值是必需的,因为与检查点相对应的输出流中的位置可处于该流中的任何点。
在首次启动该查询时,将在没有高水印和偏移值的情况下调用该适配器工厂的 Create 方法。如果服务器尚未捕获查询的任何检查点,则使用高水印 DateTime.MinValue 和偏移值 0(零)调用该适配器工厂的 Create 方法。
如果某一查询正确重播,则在捕获最后一个检查点之后、但在中断之前生成的所有事件都将在重新启动时再次生成。它们是输出适配器必须删除的重复项。如何删除这些重复项由输出适配器确定:放弃原始副本,或者忽略重复的副本。
为确保输出是“等效的”,所有输入适配器都必须正确重播输入事件,并且所有输出适配器都必须删除物理流中在中断之前、但在高水印偏移值指示的位置处或之后发生的所有重复的事件。
页首
步骤 6. 从失败中恢复
服务器将在启动时自动执行恢复并且使所有查询都处于一致的状态。这是一个异步操作;因此,对 Server.Create 的调用将在完成恢复之前返回。
非弹性查询将处于“已停止”状态。此行为尚未更改。
弹性查询将处于“正在初始化”状态。然后,服务器将加载已保存的检查点信息。
您可以在此时调用 Start 以便重新启动查询。一旦初始化完成后,弹性查询将立即重新启动。
为从失败中恢复,启动代码必须执行以下步骤:
从元数据中检索应用程序的查询的列表。
对于每个查询,请查看该查询是否已在元数据中存在。
如果该查询已存在,则重新启动它。
如果该查询在元数据中不存在,则创建该查询并将其定义为弹性查询,如前面的步骤 2.定义弹性查询中所述。
如果在恢复自身的过程中出现问题,您可以在不具有弹性功能的情况下重新启动服务器。
页首
在不禁用恢复的情况下关闭
您可以通过调用 Server 的 Dispose 方法,在不禁用恢复的情况下关闭服务器。
非弹性查询将停止。
弹性查询将挂起。在您重新启动服务器时,服务器将尝试恢复已挂起的查询的状态。若要禁止此行为,请在关闭前停止查询。
当您用此方法关闭服务器时,用于非弹性查询和弹性查询的元数据都将保存。
页首
示例
有关包括重播和取消复制的弹性应用程序的端到端代码示例,请参阅 Codeplex 上的 StreamInsight 示例页中的“检查点示例”。
页首
使用显式的开发模型定义弹性查询
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// SQL CE was available as an optional metadata provider in v1.1
// For the server to support recovery, this becomes mandatory
// A log path is also a mandatory requirement.
SqlCeMetadataProviderConfiguration metadataConfig = new
SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server = Server.Create(
"Default", metadataConfig, recoveryConfig))
{
try
{
Application application = server.CreateApplication("TrafficJoinSample");
QueryTemplate queryTemplate = CreateQueryTemplate(application);
InputAdapter csvInputAdapter =
application.CreateInputAdapter<TextFileReaderFactory>(
"CSV Input", "Reading tuples from a CSV file");
OutputAdapter csvOutputAdapter =
application.CreateOutputAdapter<TextFileWriterFactory>(
"CSV Output", "Writing result events to a CSV file");
// bind query to event producers and consumers
QueryBinder queryBinder = BindQuery(
csvInputAdapter, csvOutputAdapter, queryTemplate);
// Create bound query that can be run
Console.WriteLine("Registering bound query");
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true); // v1.2 addition - Specify the query as resilient
// Start the query
// v1.2 has additional semantics during recovery
query.Start();
// submit a checkpoint request
// query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}
检查点 - 回调会合模型
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// Same code through query start …
{
try
{
// Start the query
query.Start();
// submit a checkpoint request
IAsyncResult result = server.BeginCheckpoint(query,
r => {
if (server.EndCheckpoint(r))
{
// the checkpoint succeeded
}
else
{
// the checkpoint was canceled
}
},
null);
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}