Compilazione di applicazioni StreamInsight resilienti
In questo argomento vengono descritti i passaggi per creare un'applicazione StreamInsight resiliente.
La resilienza è disponibile solo nell'edizione Premium di StreamInsight. Per ulteriori informazioni, vedere Scelta di un'edizione di StreamInsight.
Per un esempio di codice end-to-end di un'applicazione resiliente con riproduzione e deduplicazione, vedere l'esempio di checkpoint nella pagina di esempi relativi a StreamInsight su CodePlexhttps://go.microsoft.com/fwlink/?LinkID=180356.
In questo argomento
Passaggio 1. Configurazione di un server resiliente
Passaggio 2. Definizione di una query resiliente
Passaggio 3. Acquisizione di checkpoint
Passaggio 4. Riproduzione di eventi nell'adattatore di input
Passaggio 5. Eliminazione di duplicati nell'adattatore di output
Passaggio 6. Recupero da errori
Arresto senza disabilitazione del recupero
Esempi
Passaggio 1. Configurazione di un server resiliente
Impostazioni necessarie
Per configurare un server resiliente, fornire i valori per le impostazioni di configurazione seguenti durante la creazione del server:
Archivio dei metadati. È necessario utilizzare SQL Server Compact per l'archiviazione dei metadati per il server. Non è possibile archiviare i metadati in memoria.
Percorso del log. Questa impostazione consente di stabilire la posizione in cui vengono archiviati i dati di checkpoint per le query resilienti. Il valore predefinito del percorso è la directory di lavoro del processo di StreamInsight. Un'impostazione correlata, CreateLogPathIfMissing, consente di determinare se creare la directory specificata, qualora non disponibile.
La configurazione di un server per la resilienza può consentire l'acquisizione di checkpoint, tuttavia tale operazione non viene eseguita automaticamente. Per informazioni sul richiamo dei checkpoint, vedere Passaggio 3. Acquisizione di checkpoint.
Gestione del percorso del log di checkpoint
Per evitare letture o alterazioni non autorizzate dei file del checkpoint, assicurarsi che le autorizzazioni della cartella contenitore siano impostate in modo che l'accesso sia consentito solo a entità attendibili.
Ogni istanza di StreamInsight deve disporre del relativo percorso del log.
Assicurarsi che il processo in cui è ospitato StreamInsight disponga dell'accesso in lettura e scrittura alla cartella specificata.
Non modificare il contenuto della cartella. StreamInsight consente di eliminare i file del checkpoint quando non più necessari.
Server out-of-process
In caso di un server out-of-process, a cui viene eseguita la connessione da parte del client effettuando una chiamata a Server.Connect, la configurazione della resilienza viene fornita dalla persona che esegue il provisioning del server. Se è possibile configurare la resilienza per il server out-of-process, quest'ultimo può essere utilizzato dal client come configurato. In caso contrario, le caratteristiche della resilienza non possono essere utilizzate dal client.
Metodi per specificare le opzioni della resilienza
È possibile specificare le impostazioni relative alla resilienza in base a uno dei metodi seguenti:
Specificare le impostazioni a livello di codice fornendo la configurazione della resilienza quando si effettua una chiamata a Server.Create.
Specificare le impostazioni in modo dichiarativo nel file di configurazione dell'applicazione.
Per un server in-process, si tratta del file app.config.
Per un server out-of-process, si tratta del file StreamInsightHost.exe.config, che può essere trovato nella cartella Host sotto la cartella di installazione di StreamInsight.
Se si utilizzano entrambi i metodi, le impostazioni specificate nella chiamata API consentono di eseguire l'override delle impostazioni nel file di configurazione.
Creazione di un server resiliente a livello di codice
Nell'esempio seguente viene mostrato come creare un server in-process resiliente a livello di codice. Per esempi più dettagliati, vedere Esempi. Provare a rilevare tutte le eccezioni che impediscono il completamento del checkpoint quando si effettua una chiamata a Server.Create.
SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server =
Server.Create("Default", metadataConfig, recoveryConfig))
Creazione di un server resiliente in modo dichiarativo
Nell'esempio seguente viene mostrato come creare un server resiliente in modo dichiarativo tramite un file di configurazione.
<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
<appSettings>
<add key="InstanceName" value="Default"/>
<add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
<add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
<add key="CheckpointEnabled" value="true"/>
<add key="CheckpointLogPath" value="CepLogPath"/>
<add key="CreateCheckpointLogPathIfMissing" value="true"/>
</appSettings>
<runtime>
<gcServer enabled="true"/>
</runtime>
</configuration>
Torna all'inizio
Passaggio 2. Definizione di una query resiliente
Per creare una query resiliente, includere i passaggi seguenti nel codice.
Prima di creare una nuova query, verificare se esiste già nei metadati. In tal caso, è stato recuperato un errore dell'applicazione. La query deve essere riavviata dal codice anziché essere ricreata.
Se la query non è disponibile nei metadati, crearla e definirla come resiliente specificando true per il parametro IsResilient del metodo ToQuery. È inoltre possibile effettuare una chiamata al metodo Application.CreateQuery con il parametro IsResilient.
La configurazione di una query per la resilienza può consentire l'acquisizione di checkpoint, tuttavia tale operazione non viene eseguita automaticamente. Per informazioni sul richiamo dei checkpoint, vedere Passaggio 3. Acquisizione di checkpoint.
Esempio di definizione di una query resiliente
Per esempi più dettagliati, vedere Esempi.
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true);
Torna all'inizio
Passaggio 3. Acquisizione di checkpoint
Al termine dell'esecuzione delle query, acquisire i checkpoint periodicamente per registrare lo stato delle query.
I metodi API che supportano il checkpoint seguono il modello tipico per un'operazione asincrona.
Per richiamare un checkpoint, effettuare una chiamata al metodo BeginCheckpoint. Se si fornisce il metodo AsyncCallback facoltativo, la chiamata a quest'ultimo verrà effettuata al completamento del checkpoint. L'oggetto IAsyncResult restituito dalla chiamata a BeginCheckpoint consente di identificare questa richiesta del checkpoint e può essere utilizzato in un secondo momento nelle chiamate a EndCheckpoint o CancelCheckpoint.
/// <summary> /// Take an asynchronous checkpoint for the query. /// </summary> /// <param name="query">The query to checkpoint.</param> /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param> /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param> /// <returns></returns> IAsyncResult BeginCheckpoint( Query query, AsyncCallback asyncCallback, Object asyncState);
Il metodo EndCheckpoint viene bloccato fino a quando l'operazione di checkpoint non sarà completata. Se tale operazione viene completata, dalla chiamata viene restituito true. In caso di errore, viene invece generata un'eccezione.
/// <summary> /// Waits for the pending asynchronous checkpoint request to complete. /// </summary> /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param> /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns> bool EndCheckpoint( IAsyncResult asyncResult);
È inoltre possibile effettuare una chiamata a CancelCheckpoint per annullare il processo del checkpoint. Quando la chiamata a CancelCheckpoint ha esito positivo, dalla successiva chiamata a EndCheckpoint viene restituito false.
/// <summary> /// Cancels the pending asynchronous checkpoint request. /// </summary> /// <param name="asyncResult">The asyncResult handle identifying the call.</param> void CancelCheckpoint( IAsyncResult asyncResult);
Questo modello asincrono può essere utilizzato in tre modalità diverse:
Una chiamata a BeginCheckpoint può essere seguita da una chiamata a EndCheckpoint. EndCheckpoint viene bloccato fino a quando l'operazione di checkpoint non viene completata e viene quindi restituito il risultato (o eccezione). In questo modello, asyncCallback e asyncState in genere non vengono utilizzati.
È possibile effettuare una chiamata a BeginCheckpoint e l'utente può quindi eseguire il polling della proprietà IsCompleted dell'oggetto IAsyncResult restituito. Quando IsCompleted è true, è possibile effettuare una chiamata a EndCheckpoint per recuperare il risultato. In questo modello, asyncCallback e asyncState in genere non vengono utilizzati.
È possibile effettuare una chiamata a BeginCheckpoint con un metodo di callback. In questo caso, l'oggetto asyncState può essere utilizzato per identificare la chiamata e restituire tutte le informazioni necessarie al metodo di callback. Quando in esecuzione, il callback consente di effettuare una chiamata a EndCheckpoint per recuperare il risultato.
È necessario effettuare una chiamata al metodo EndCheckpoint, indipendentemente dal modello utilizzato, anche quando il checkpoint viene annullato. Questo metodo rappresenta l'unica modalità con cui l'utente ottiene un valore restituito dalla chiamata. Inoltre, è l'unico modo con cui viene riconosciuto il completamento della chiamata da StreamInsight. Non è possibile iniziare un altro checkpoint fino a quando non è stata effettuata una chiamata a EndCheckpoint.
Gli errori che si verificano nel processo del checkpoint non determinano l'arresto delle query associate né influiscono su queste ultime. Se si arresta una query mentre un'operazione di checkpoint è in corso, il checkpoint viene annullato.
Torna all'inizio
Passaggio 4. Riproduzione di eventi nell'adattatore di input
Per supportare la riproduzione di eventi nell'ambito del recupero, deve essere implementata l'interfaccia IHighWaterMarkInputAdapterFactory o IHighWaterMarkTypedInputAdapterFactory dalla factory dell'adattatore di input. La chiamata al metodo Create della factory dell'adattatore fornisce il limite massimo che consente all'adattatore di identificare gli eventi da riprodurre.
Per assicurarsi del completamento dell'output, tutti gli eventi devono essere riprodotti da ogni adattatore di input nel flusso fisico che si verifica in corrispondenza della posizione indicata dal limite massimo o dopo tale posizione.
Torna all'inizio
Passaggio 5. Eliminazione di duplicati nell'adattatore di output
Per supportare l'eliminazione di duplicati nell'ambito del recupero, deve essere implementata l'interfaccia IHighWaterMarkOutputAdapterFactory o IHighWaterMarkTypedOutputAdapterFactory dalla factory dell'adattatore di output. La chiamata al metodo Create della factory dell'adattatore fornisce il limite massimo e il valore di offset che consentono all'adattatore di identificare i valori duplicati. L'offset è necessario perché il percorso nel flusso di output che corrisponde al checkpoint può rientrare in qualsiasi punto nel flusso.
Quando viene iniziata la query per la prima volta, il metodo Create della factory dell'adattatore viene chiamato senza il limite massimo e l'offset. Se non è stato ancora acquisito alcun checkpoint per la query da parte del server, il metodo Create della factory dell'adattatore viene chiamato con un limite massimo di DateTime.MinValue e un offset di 0 (zero).
Se una query viene riprodotta correttamente, tutti gli eventi prodotti dopo l'ultimo checkpoint acquisito, ma prima dell'interruzione, verranno prodotti di nuovo durante il riavvio. Si tratta dei duplicati che devono essere rimossi dall'adattatore di output. La modalità di rimozione di questi ultimi viene scelta dall'adattatore di output, ad esempio è possibile che le copie originali vengano abbandonate o che le copie duplicate vengano ignorate.
Per assicurarsi dell'equivalenza dell'output, gli eventi di input devono essere riprodotti correttamente da tutti gli adattatori di input e, tramite ogni adattatore di output devono essere rimossi tutti gli eventi duplicati nel flusso fisico che si sono verificati prima dell'interruzione e che si verificano in corrispondenza della posizione indicata dall'offset del limite massimo o dopo tale posizione.
Torna all'inizio
Passaggio 6. Recupero da errori
Il recupero viene eseguito automaticamente dal server all'avvio e tutte le query vengono portate in uno stato coerente. Si tratta di un'operazione asincrona; di conseguenza, il risultato della chiamata a Server.Create viene restituito prima del termine del recupero.
Le query non resilienti vengono inserite nello stato Arrestato. Questo comportamento non viene modificato.
Le query resilienti vengono inserite nello stato Inizializzazione in corso. Le informazioni sul checkpoint salvate vengono caricate dal server.
È possibile effettuare una chiamata a Start in questa fase per riavviare le query. Le query resilienti verranno riavviate appena completata l'inizializzazione.
Per il recupero dell'errore, è necessario che vengano eseguiti dal codice di avvio i passaggi seguenti:
Recuperare l'elenco delle query dell'applicazione dai metadati.
Per ogni query, verificare se esiste già nei metadati.
In tal caso, riavviare la query.
Se la query non è disponibile nei metadati, crearla e definirla come resiliente, come illustrato precedentemente nel Passaggio 2. Definizione di una query resiliente.
Se si verifica un problema durante il recupero stesso, è possibile riavviare il server senza resilienza.
Torna all'inizio
Arresto senza disabilitazione del recupero
È possibile arrestare il server senza disabilitare il recupero effettuando una chiamata al metodo Dispose dell'oggetto Server.
Le query non resilienti vengono arrestate.
Le query resilienti vengono sospese. Quando si riavvia il server, tramite quest'ultimo si tenta di recuperare lo stato delle query sospese. Per impedire questo comportamento, arrestare le query prima dello spegnimento.
I metadati sia per le query non resilienti sia per quelle resilienti vengono mantenuti quando si arresta il server in questo modo.
Torna all'inizio
Esempi
Per un esempio di codice end-to-end di un'applicazione resiliente con riproduzione e deduplicazione, vedere l'esempio di checkpoint nella pagina di esempi relativi a StreamInsight su CodePlexhttps://go.microsoft.com/fwlink/?LinkID=180356.
Torna all'inizio
Definizione di una query resiliente con il modello di sviluppo esplicito
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// SQL CE was available as an optional metadata provider in v1.1
// For the server to support recovery, this becomes mandatory
// A log path is also a mandatory requirement.
SqlCeMetadataProviderConfiguration metadataConfig = new
SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server = Server.Create(
"Default", metadataConfig, recoveryConfig))
{
try
{
Application application = server.CreateApplication("TrafficJoinSample");
QueryTemplate queryTemplate = CreateQueryTemplate(application);
InputAdapter csvInputAdapter =
application.CreateInputAdapter<TextFileReaderFactory>(
"CSV Input", "Reading tuples from a CSV file");
OutputAdapter csvOutputAdapter =
application.CreateOutputAdapter<TextFileWriterFactory>(
"CSV Output", "Writing result events to a CSV file");
// bind query to event producers and consumers
QueryBinder queryBinder = BindQuery(
csvInputAdapter, csvOutputAdapter, queryTemplate);
// Create bound query that can be run
Console.WriteLine("Registering bound query");
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true); // v1.2 addition - Specify the query as resilient
// Start the query
// v1.2 has additional semantics during recovery
query.Start();
// submit a checkpoint request
// query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}
Checkpoint: modello rendezvous di callback
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// Same code through query start …
{
try
{
// Start the query
query.Start();
// submit a checkpoint request
IAsyncResult result = server.BeginCheckpoint(query,
r => {
if (server.EndCheckpoint(r))
{
// the checkpoint succeeded
}
else
{
// the checkpoint was canceled
}
},
null);
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}
Vedere anche
Concetti
Compilazione di applicazioni StreamInsight resilienti
Monitoraggio di applicazioni StreamInsight resilienti
Risoluzione dei problemi relativi ad applicazioni StreamInsight resilienti