Vzory úloh replikace událostí
Přehled federace a funkce replikátoru vysvětlují odůvodnění a základní prvky úloh replikace a doporučuje se, abyste se s nimi seznámili, než budete pokračovat v tomto článku.
V tomto článku podrobně popisujeme pokyny k implementaci pro několik vzorů zvýrazněných v části Přehled.
Replikace
Vzor replikace kopíruje události z jednoho centra událostí do dalšího nebo z centra událostí do jiného cíle, jako je fronta služby Service Bus. Události se přeposílají bez jakýchkoli úprav datové části události.
Implementace tohoto modelu se zabývá replikací událostí mezi Event Hubs a replikací událostí mezi službami Event Hubs a ukázkami služby Service Bus a kurzEm Použití Apache Kafka MirrorMakeru se službou Event Hubs pro konkrétní případ replikace dat z zprostředkovatele Apache Kafka do služby Event Hubs.
Toky a zachování pořadí
Replikace prostřednictvím Azure Functions nebo Azure Stream Analytics nemá za cíl zajistit vytvoření přesných 1:1 klonů zdrojového centra událostí do cílového centra událostí, ale zaměřuje se na zachování relativního pořadí událostí, ve kterých ji aplikace vyžaduje. Aplikace to komunikuje seskupením souvisejících událostí se stejným klíčem oddílu a službou Event Hubs uspořádá zprávy se stejným klíčem oddílu postupně ve stejném oddílu.
Důležité
Informace o posunu jsou jedinečné pro každé centrum událostí a posuny pro stejné události se budou lišit napříč instancemi centra událostí. Pokud chcete vyhledat pozici ve zkopírovaných datových proudech událostí, použijte posuny založené na čase a odkazujte na rozšířená metadata přiřazená službou.
Posuny založené na čase začínají příjemce v určitém časovém okamžiku:
- EventPosition.FromStart() – znovu načtěte všechna zachovaná data.
- EventPosition.FromEnd() – čte všechna nová data z doby připojení.
- EventPosition.FromEnqueuedTime(dateTime) – všechna data od daného data a času.
V EventProcessoru nastavíte pozici prostřednictvím InitialOffsetProvider v EventProcessorOptions. S ostatními rozhraními API přijímače se pozice předává konstruktorem.
Předem sestavené pomocné rutiny replikační funkce poskytované jako ukázky , které se používají v doprovodných materiálech založených na Azure Functions, zajišťují, aby se do cílového centra událostí odeslaly streamy událostí se stejným klíčem oddílu načteným ze zdrojového oddílu jako dávka v původním datovém proudu a se stejným klíčem oddílu.
Pokud je počet oddílů zdrojového a cílového centra událostí stejný, všechny datové proudy v cíli se mapují na stejné oddíly jako ve zdroji. Pokud se počet oddílů liší, což záleží na některých dalších vzorech popsaných v následujícím příkladu, mapování se bude lišit, ale datové proudy se vždy uchovávají společně a v uvedeném pořadí.
Relativní pořadí událostí patřících do různých datových proudů nebo nezávislých událostí bez klíče oddílu v cílovém oddílu se může vždy lišit od zdrojového oddílu.
Metadata přiřazená službou
Metadata události přiřazená službou získaná ze zdrojového centra událostí, původní fronta času, pořadového čísla a posunu se nahradí novými hodnotami přiřazenými službou v cílovém centru událostí, ale s pomocnými funkcemi replikace, které jsou k dispozici v našich ukázkách, původní hodnoty se zachovají ve vlastnostech uživatele: repl-enqueue-time
(ISO8601 řetězec), repl-sequence
, repl-offset
.
Tyto vlastnosti jsou typu řetězec a obsahují řetězcovou hodnotu odpovídajících původních vlastností. Pokud se událost předá vícekrát, metadata přiřazená službou bezprostředního zdroje se připojí k již existujícím vlastnostem s hodnotami oddělenými středníky.
Převzetí služeb při selhání
Pokud používáte replikaci pro účely zotavení po havárii, k ochraně před regionálními událostmi dostupnosti ve službě Event Hubs nebo před přerušením sítě, bude každý takový scénář selhání vyžadovat převzetí služeb při selhání z jednoho centra událostí na další, aby producenti nebo příjemci použili sekundární koncový bod.
U všech scénářů převzetí služeb při selhání se předpokládá, že požadované prvky oborů názvů jsou strukturálně identické, což znamená, že centra událostí a skupiny příjemců jsou identické a že pravidla sdíleného přístupového podpisu a/nebo pravidla řízení přístupu na základě role jsou nastavena stejným způsobem. Sekundární obor názvů můžete vytvořit (a aktualizovat) podle pokynů pro přesunutí oborů názvů a vynechání kroku vyčištění.
Pokud chcete výrobcům a příjemcům vynutit přepnutí, musíte nastavit informace o tom, který obor názvů má být dostupný pro vyhledávání v umístění, které je snadno dostupné a aktualizovat. Pokud producenti nebo spotřebitelé narazí na časté nebo trvalé chyby, měli by se poradit s tímto umístěním a upravit jejich konfiguraci. Konfigurace se dá sdílet mnoha způsoby, ale v následujících oblastech ukazujeme na dva: DNS a sdílené složky.
Konfigurace převzetí služeb při selhání založená na DNS
Jedním z kandidátských přístupů je uchovávat informace v záznamech DNS SRV v DNS, které řídíte, a odkazovat na příslušné koncové body centra událostí.
Důležité
Mějte na paměti, že služba Event Hubs neumožňuje přímé aliasování koncových bodů pomocí záznamů CNAME, což znamená, že dns použijete jako odolný vyhledávací mechanismus pro adresy koncových bodů a nebudete přímo překládat informace o IP adresách.
Předpokládejme, že doménu vlastníte example.com
a pro vaši aplikaci zónu test.example.com
. Pro dvě alternativní služby Event Hubs teď vytvoříte dvě další vnořené zóny a v každé z nich záznam SRV.
Záznamy SRV jsou následující běžné konvence s předponou _azure_eventhubs._amqp
a obsahují dva záznamy koncového bodu: jeden pro protokol AMQP-over-TLS na portu 5671 a jeden pro AMQP-over-WebSockets na portu 443, oba odkazují na koncový bod služby Event Hubs v oboru názvů odpovídající zóně.
Zóna | Záznam SRV |
---|---|
eh1.test.example.com |
_azure_servicebus._amqp.eh1.test.example.com 1 1 5671 eh1-test-example-com.servicebus.windows.net 2 2 443 eh1-test-example-com.servicebus.windows.net |
eh2.test.example.com |
_azure_servicebus._amqp.eh2.test.example.com 1 1 5671 eh2-test-example-com.servicebus.windows.net 2 2 443 eh2-test-example-com.servicebus.windows.net |
V zóně vaší aplikace pak vytvoříte položku CNAME, která odkazuje na podřízenou zónu odpovídající vašemu primárnímu centru událostí:
Záznam CNAME | Alias |
---|---|
eventhub.test.example.com |
eh1.test.example.com |
Pomocí klienta DNS, který umožňuje explicitně dotazovat záznamy CNAME a SRV (integrované klienty Javy a .NET umožňují pouze jednoduché překlady názvů na IP adresy), pak můžete přeložit požadovaný koncový bod. U DnsClient.NET je například vyhledávací funkce:
static string GetEventHubName(string aliasName)
{
const string SrvRecordPrefix = "_azure_eventhub._amqp.";
LookupClient lookup = new LookupClient();
return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
where srv.Port == 5671
select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}
Funkce vrátí cílový název hostitele zaregistrovaný pro port 5671 zóny, který je aktuálně aliasovaný pomocí CNAME, jak je znázorněno výše.
Provedení převzetí služeb při selhání vyžaduje úpravu záznamu CNAME a jeho nasměrování na alternativní zónu.
Výhodou použití DNS a konkrétně Azure DNS je, že se informace Azure DNS globálně replikují, a proto jsou odolné proti výpadkům v jedné oblasti.
Tento postup je podobný tomu, jak funguje geografická zotavení po havárii služby Event Hubs, ale plně pod vaší vlastní kontrolou a funguje také s aktivními a aktivními scénáři.
Konfigurace převzetí služeb při selhání na základě sdílené složky
Nejjednodušší alternativou k použití DNS pro sdílení informací o koncovém bodu je vložení názvu primárního koncového bodu do souboru prostého textu a poskytování souboru z infrastruktury, která je robustní proti výpadkům a stále umožňuje aktualizace.
Pokud už spouštíte vysoce dostupnou infrastrukturu webu s globální dostupností a replikací obsahu, přidejte do ní takový soubor a znovu ho publikujte, pokud je potřeba přepnout.
Upozornění
Tímto způsobem byste měli publikovat jenom název koncového bodu, nikoli úplný připojovací řetězec včetně tajných kódů.
Další důležité informace pro převzetí služeb při selhání spotřebitelů
Pro uživatele centra událostí závisí další aspekty strategie převzetí služeb při selhání na základě potřeb procesoru událostí.
Pokud dojde k havárii, která vyžaduje opětovné sestavení systému, včetně databází, ze zálohovaných dat, a databáze se předávají přímo nebo prostřednictvím zprostředkujícího zpracování z událostí uložených v centru událostí, obnovíte zálohu a pak budete chtít začít znovu přehrávat události do systému od okamžiku, kdy byla vytvořena záloha databáze, a ne od okamžiku, kdy byl původní systém zničen.
Pokud selhání ovlivní pouze řez systému nebo pouze jedno centrum událostí, které je nedostupné, budete pravděpodobně chtít pokračovat ve zpracování událostí z přibližně stejné pozice, kde bylo zpracování přerušeno.
Pokud chcete realizovat některý scénář a použít procesor událostí příslušné sady Azure SDK, vytvoříte nové úložiště kontrolních bodů a poskytnete počáteční pozici oddílu na základě časového razítka , ze kterého chcete pokračovat ve zpracování.
Pokud máte stále přístup k úložišti kontrolních bodů centra událostí, ze kterého přecházíte, můžou vám výše uvedená rozšířená metadata pomoct přeskočit události, které už byly zpracovány a pokračovat přesně od místa, kde jste naposledy skončili.
Sloučení
Vzor sloučení má jeden nebo více úloh replikace odkazující na jeden cíl, případně současně s pravidelnými producenty, kteří také odesílají události do stejného cíle.
Varianty těchto patter jsou:
- Dvě nebo více replikačních funkcí současně získává události z oddělených zdrojů a odesílá je do stejného cíle.
- Jedna další funkce replikace, která získává události ze zdroje, zatímco cíl je také používán přímo producenty.
- Předchozí vzor, ale zrcadlený mezi dvěma nebo více službami Event Hubs, výsledkem jsou tyto služby Event Hubs obsahující stejné streamy bez ohledu na to, kde se události vytvářejí.
První dvě varianty vzorů jsou triviální a neliší se od úloh prosté replikace.
Poslední scénář vyžaduje, aby se již replikované události znovu nereplikovaly. Tato technika se demonstruje a vysvětluje v ukázce EventHubToEventHubMerge .
Editor
Vzor editoru vychází ze vzoru replikace , ale zprávy se před jejich přeposílání upraví.
Mezi příklady takových úprav patří:
- Překódování – Pokud obsah události (označovaný také jako "tělo" nebo "datová část") dorazí ze zdroje kódovaného pomocí formátu Apache Avro nebo nějakého proprietárního serializačního formátu, ale očekává se, že systém, který vlastní cíl, je obsah kódovaný ve formátu JSON, úloha transkódování replikace nejprve deserializuje datovou část z Apache Avro do grafu objektu v paměti a potom tento graf serializuje do json. formát události, která se předává. Překódování zahrnuje také kompresi obsahu a dekompresi úloh.
- Transformace – Události, které obsahují strukturovaná data, můžou vyžadovat změnu tvaru těchto dat, aby je mohli příjemní příjemci usnadnit. To může zahrnovat práci, jako je zploštění vnořených struktur, vyřazení nadbytečných datových prvků nebo změna tvaru datové části tak, aby přesně odpovídala danému schématu.
- Dávkování – Události mohou být přijaty v dávkách (více událostí v jednom přenosu) ze zdroje, ale musí se předávat ingálně do cíle nebo naopak. Úkol proto může předávat více událostí na základě jednoho vstupního přenosu událostí nebo agregovat sadu událostí, které se pak přenesou dohromady.
- Ověření – Data událostí z externích zdrojů je často potřeba zkontrolovat, jestli jsou v souladu se sadou pravidel, než je bude možné předat. Pravidla mohou být vyjádřena pomocí schémat nebo kódu. Události, které nejsou v souladu s předpisy, mohou být vyřazeny, s problémem v protokolech nebo mohou být předány do speciálního cílového cíle pro jejich další zpracování.
- Rozšiřování – Data událostí pocházející z některých zdrojů mohou vyžadovat rozšiřování s dalším kontextem, aby je bylo možné použít v cílových systémech. To může zahrnovat vyhledání referenčních dat a vložení těchto dat s událostí nebo přidání informací o zdroji, který je známý pro úlohu replikace, ale neobsahuje v událostech.
- Filtrování – Některé události přicházející ze zdroje můžou být z cíle na základě některého pravidla odmítnuty. Filtr testuje událost proti pravidlu a událost zahodí, pokud se událost neshoduje s pravidlem. Filtrování duplicitních událostí pomocí sledování určitých kritérií a vyřazení následných událostí se stejnými hodnotami je forma filtrování.
- Kryptografie – Úloha replikace může obsahovat dešifrování obsahu přicházejícího ze zdroje nebo šifrování obsahu předávaného směrem k cíli nebo k ověření integrity obsahu a metadat vzhledem k podpisu přenášeného v události nebo k připojení takového podpisu.
- Ověření identity – Úloha replikace může připojit metadata, potenciálně chráněná digitálním podpisem, k události, která potvrzuje, že událost byla přijata prostřednictvím konkrétního kanálu nebo v konkrétní době.
- Řetězení – Úloha replikace může použít podpisy na datové proudy událostí, aby byla chráněna integrita datového proudu a můžou být zjištěny chybějící události.
Vzory transformace, dávkování a rozšiřování jsou obecně nejlíp implementované s úlohami Azure Stream Analytics .
Všechny tyto vzory je možné implementovat pomocí azure Functions, pomocí triggeru služby Event Hubs pro získávání událostí a výstupní vazby centra událostí pro jejich doručování.
Směrování
Vzor směrování vychází ze vzoru replikace , ale místo toho, aby měl jeden zdroj a jeden cíl, má úloha replikace více cílů, jak je znázorněno tady v jazyce C#:
[FunctionName("EH2EH")]
public static async Task Run(
[EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
[EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
[EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
ILogger log)
{
foreach (EventData eventData in events)
{
// send to output1 and/or output2 based on criteria
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
});
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
});
}
}
Funkce směrování bude brát v úvahu metadata zpráv nebo datovou část zprávy a pak vybere jeden z dostupných cílů pro odeslání.
V Azure Stream Analytics můžete stejného dosáhnout definováním více výstupů a následným spuštěním dotazu na výstup.
select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2
Projekce protokolu
Vzor projekce protokolu zploštěluje datový proud událostí do indexované databáze a události se stávají záznamy v databázi. Události se obvykle přidají do stejné kolekce nebo tabulky a klíč oddílu centra událostí se stane součástí primárního klíče, který hledá jedinečný záznam.
Projekce protokolu může vytvořit historii časových řad dat události nebo komprimované zobrazení, přičemž pro každý klíč oddílu se zachová pouze nejnovější událost. Tvar cílové databáze je v konečném důsledku na vás a potřebách vaší aplikace. Tento model se také označuje jako "event sourcing".
Tip
V Azure Stream Analytics můžete snadno vytvářet projekce protokolů do Azure SQL Database a Azure Cosmos DB a tuto možnost byste měli preferovat.
Následující funkce Azure Functions projektuje obsah zkomprimovaného centra událostí do kolekce Azure Cosmos DB.
[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
[EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
[CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
ILogger log)
{
foreach (var ev in input)
{
if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
{
var record = new
{
id = ev.SystemProperties.PartitionKey,
data = JsonDocument.Parse(ev.Body),
properties = ev.Properties
};
await output.AddAsync(record);
}
}
}