Поделиться через


Построение эластичных приложений StreamInsight

В данном разделе описаны шаги создания эластичного приложения StreamInsight.

Поддержка эластичности доступна только в выпуске Premium продукта StreamInsight. Дополнительные сведения см. в разделе Выбор выпуска StreamInsight.

Для ознакомления с полным образцом кода эластичного приложения, который включает воспроизведение и удаление дубликатов, см. «Образец назначения контрольных точек» на странице «Образцы StreamInsight» сайта Codeplex.

В этом разделе

  1. Шаг 1. Настройка эластичного сервера

  2. Шаг 2. Определение эластичного запроса

  3. Шаг 3. Перехват контрольных точек

  4. Шаг 4. Воспроизведение событий во входном адаптере

  5. Шаг 5. Устранение дубликатов в выходном адаптере

  6. Шаг 6. Восстановление после сбоя

  7. Останов без отключения восстановления

  8. Примеры

Шаг 1. Настройка эластичного сервера

Обязательные параметры

Чтобы настроить эластичный сервер, предоставьте значения для следующих параметров конфигурации при создании сервера:

  • Хранилище метаданных. В целях хранения метаданных для сервера необходимо использовать SQL Server Compact. Метаданные не могут храниться в памяти.

  • Путь к журналу. Этот параметр определяет, где хранятся данные контрольных точек для эластичных запросов. Значением пути по умолчанию является рабочий каталог процесса StreamInsight. Связанный с этим параметр CreateLogPathIfMissing определяет, должен ли быть создан указанный каталог, если он не существует.

Настройка сервера для поддержки эластичности обеспечивает возможность перехвата контрольных точек, но не вызывает перехват контрольных точек. Сведения о вызове контрольных точек см. в шаге 3. Перехват контрольных точек.

Управление путем к журналу контрольных точек

  • Чтобы предотвратить несанкционированное чтение или внесение изменений в файлы контрольных точек, проследите за тем, чтобы разрешения на содержащую их папку были заданы в целях предоставления к ним доступа только сущностям, заслуживающим доверия.

  • Каждый экземпляр StreamInsight должен иметь свой собственный путь к журналу.

  • Проследите за тем, чтобы процесс, размещающий StreamInsight, имел доступ для чтения и записи к указанной папке.

  • Не изменяйте содержимое этой папки. StreamInsight удаляет файлы контрольных точек после того, как в них отпадает необходимость.

Внепроцессные серверы

Что касается внепроцессного сервера, к которому клиент подключается путем вызова Server.Connect, то настройка поддержки эластичности обеспечивается лицом, подготавливающим сервер. Если внепроцессный сервер имеет настройку поддержки эластичности, то клиент может его использовать в соответствии с настройкой. Если же сервер не имеет настройки поддержки эластичности, то клиент не может использовать функции поддержки эластичности.

Методы задания параметров поддержки эластичности

Предусмотрена возможность указывать параметры поддержки эластичности с применением одного из следующих методов:

  • Задание параметров программным путем с помощью предоставления конфигурации поддержки эластичности при вызове Server.Create.

  • Задание параметров в декларативной форме в файле конфигурации приложения.

    • Для внутрипроцессного сервера таковым является файл app.config.

    • Для внепроцессного сервера это файл StreamInsightHost.exe.config, который может находиться в папке Host под папкой установки StreamInsight.

Если используются оба метода, то параметры, указанные при вызове API, могут переопределять параметры в файле конфигурации.

Создание эластичного сервера программным путем

В следующем примере показано, как создать эластичный внутрипроцессный сервер программным путем. Более подробные примеры см. в разделе Примеры. Стремитесь к тому, чтобы перехватывались все исключения, которые могут вызвать неудачное завершение назначения контрольных точек при вызове метода Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Декларативное создание эластичного сервера

В следующем примере показано декларативное создание эластичного сервера с использованием файла конфигурации.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

В начало

Шаг 2. Определение эластичного запроса

Чтобы создать эластичный запрос, включите в свой код следующие шаги.

  1. Перед созданием нового запроса проверьте наличие этого запроса в метаданных. Если запрос уже существует, это указывает, что приложение было восстановлено после сбоя. В коде необходимо предусмотреть перезапуск этого запроса, а не его повторное создание.

  2. Если запрос не существует в метаданных, создайте его и определите как эластичный, задавая значение true для параметра IsResilient метода ToQuery. Можно также вызвать метод Application.CreateQuery с параметром IsResilient.

Настройка запроса для поддержки эластичности дает возможность перехватывать контрольные точки, но не вызывает перехват контрольных точек. Сведения о вызове контрольных точек см. в шаге 3. Перехват контрольных точек.

Пример определения эластичного запроса

Более подробные примеры см. в разделе Примеры.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

В начало

Шаг 3. Перехват контрольных точек

После запуска одного или нескольких запросов следует периодически перехватывать контрольные точки в целях записи состояния запросов.

Методы API, поддерживающие назначение контрольных точек, соответствуют типичному шаблону для асинхронных операций.

  1. Чтобы вызвать контрольную точку, следует вызвать метод BeginCheckpoint. Если предусмотрен необязательный метод AsyncCallback, то он будет вызван после завершения создания контрольной точки. Значение IAsyncResult, возвращенное после вызова BeginCheckpoint, идентифицирует данный запрос контрольной точки и может использоваться позднее в вызовах EndCheckpoint или CancelCheckpoint.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. Метод EndCheckpoint блокируется до завершения операции создания контрольной точки. Если операция создания контрольной точки выполняется успешно, этот вызов возвращает значение true. Если возникают ошибки, вызов активирует исключение.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. Можно также вызвать CancelCheckpoint, чтобы отменить процесс назначения контрольных точек. Если вызов CancelCheckpoint выполняется успешно, то последующий вызов EndCheckpoint возвращает значение false.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

Этот асинхронный шаблон может использоваться тремя различными способами:

  • За вызовом BeginCheckpoint может следовать вызов EndCheckpoint. В таком случае EndCheckpoint блокируется до завершения операции создания контрольной точки, а затем возвращает результат (или исключение). В этом шаблоне asyncCallback и asyncState, как правило, не используются.

  • Можно вызвать BeginCheckpoint, а затем пользователь может опрашивать свойство IsCompleted возвращенного объекта IAsyncResult. Если IsCompleted представляет собой true, то можно вызвать EndCheckpoint для получения результата. В этом шаблоне asyncCallback и asyncState, как правило, не используются.

  • BeginCheckpoint можно вызывать с помощью метода обратного вызова. В этом случае asyncState может использоваться для идентификации вызова и возврата всех необходимых сведений в метод обратного вызова. Обратный вызов при запуске вызывает EndCheckpoint для получения результата.

Должен быть вызван метод EndCheckpoint независимо от используемого шаблона, и даже если контрольная точка отменена. Этот метод предоставляет для пользователя единственный способ получения значения из вызова, а также единственный способ, позволяющий StreamInsight определить, что вызов завершен. До вызова EndCheckpoint нельзя приступать к созданию еще одной контрольной точки.

Ошибки, которые возникают в процессе назначения контрольных точек, не останавливают и не затрагивают связанные с этим запросы. Если запрос будет остановлен в ходе выполнения операции создания контрольной точки, контрольная точка отменяется.

В начало

Шаг 4. Воспроизведение событий во входном адаптере

Чтобы обеспечить поддержку воспроизведения событий в ходе восстановления, в фабрике входного адаптера необходимо реализовать интерфейс IHighWaterMarkInputAdapterFactory или IHighWaterMarkTypedInputAdapterFactory. После этого вызов метода Create фабрики адаптера предоставляет верхнюю конечную точку, которая позволяет адаптеру идентифицировать события, подлежащие воспроизведению.

Чтобы вывод был гарантированно полным, все входные адаптеры должны воспроизвести все события в физическом потоке, которые происходили в позиции, обозначенной верхней конечной точкой, или после нее.

В начало

Шаг 5. Устранение дубликатов в выходном адаптере

Чтобы обеспечить поддержку устранения дубликатов в ходе восстановления, в фабрике выходного адаптера необходимо реализовать интерфейс IHighWaterMarkOutputAdapterFactory или IHighWaterMarkTypedOutputAdapterFactory. После этого вызов метода Create фабрики адаптера предоставляет верхнюю конечную точку и величину смещения, которые позволяют адаптеру идентифицировать повторяющиеся значения. Это смещение необходимо, поскольку местоположение в выходном потоке, соответствующее контрольной точке, может оказаться в любой точке в потоке.

Если запрос запускается впервые, метод Create фабрики адаптера вызывается без верхней конечной точки и смещения. Если сервер еще не перехватывал ни одной контрольной точки для запроса, то метод Create фабрики адаптера вызывается с верхней конечной точкой DateTime.MinValue и смещением 0 (нуль).

Если запрос воспроизведен должным образом, то любые события, произошедшие после перехвата последней контрольной точки, но до начала простоя, будут снова воспроизведены после перезапуска. Это — дубликаты, которые должны быть удалены выходным адаптером. Применяемый способ их удаления зависит от выходного адаптера: он может отбрасывать исходные копии или пропускать дубликаты.

Чтобы выход был гарантированно эквивалентным, все входные адаптеры должны правильно воспроизводить события ввода, а все выходные адаптеры должны удалять все повторяющиеся события в физическом потоке, которые произошли до начала простоя, а затем происходят в позиции, обозначенной смещением верхней конечной точки, или после нее.

В начало

Шаг 6. Восстановление после сбоя

Сервер автоматически выполняет восстановление после запуска и переводит все запросы в согласованное состояние. Это асинхронная операция. В результате в вызове метода Server.Create происходит возврат перед завершением восстановления.

  • Неэластичные запросы переводятся в состояние «Остановлено». Это поведение не изменилось.

  • Эластичные запросы переводятся в состояние «Инициализация». Затем сервер загружает сохраненные сведения контрольной точки.

В этот момент можно вызвать Start для перезапуска запросов. Эластичные запросы будут перезапущены сразу после завершения инициализации.

В коде запуска должны быть выполнены следующие шаги для восстановления после сбоя:

  1. Получение списка запросов приложения из метаданных.

  2. Для каждого запроса выполните проверку наличия запроса в метаданных.

    1. Если запрос уже существует, его необходимо перезапустить.

    2. Если запрос не существует в метаданных, его необходимо создать и определить как эластичный в соответствии с приведенным выше описанием в шаге 2. Определение эластичного запроса.

Если проблема возникает в ходе самого восстановления, можно перезапустить сервер без поддержки эластичности.

В начало

Останов без отключения восстановления

Можно остановить сервер, не отключая восстановление, путем вызова метода Dispose, который относится к Server.

  • Неэластичные запросы останавливаются.

  • Эластичные запросы приостанавливаются. При перезагрузке сервера этот сервер предпринимает попытки восстановить состояние приостановленных запросов. Чтобы предотвратить такое поведение, остановите запросы перед остановом сервера.

При останове сервера таким образом сохраняются метаданные и для неэластичных, и для эластичных запросов.

В начало

Примеры

Для ознакомления с полным образцом кода эластичного приложения, который включает воспроизведение и удаление дубликатов, см. «Образец назначения контрольных точек» на странице «Образцы StreamInsight» сайта Codeplex.

В начало

Определение эластичного запроса с применением явной модели разработки

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

назначение контрольных точек — модель встречи обратного вызова

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

См. также

Основные понятия

Эластичность StreamInsight

Построение эластичных приложений StreamInsight

Отслеживание эластичных приложений StreamInsight

Устранение неисправностей эластичных приложений StreamInsight