Az Azure Queue streamek áttekintése

Minden streamszolgáltató (Azure Queues, EventHub, SMS, SQS stb.) saját üzenetsor-specifikus részletekkel és konfigurációval rendelkezik. Ez a szakasz részletesen ismerteti az Azure Queue Streams használatát, konfigurációját és implementálásátOrleans. Ez a szakasz nem átfogó. További részleteket a streamelési tesztekben talál, amelyek a legtöbb konfigurációs lehetőséget tartalmazzák, kifejezetten a AQClientStreamTests és AQSubscriptionMultiplicityTests, valamint a IAzureQueueStreamConfigurator és ISiloPersistentStreamConfigurator bővítményfüggvényeket.

Orleans Az Azure Queue használatához a Microsoft.Orleans. Streaming.AzureStorage NuGet-csomag. Az implementáció mellett a csomag olyan bővítménymetóciós módszereket is tartalmaz, amelyek leegyszerűsítik a siló indításkor történő konfigurálást. A minimális konfigurációhoz meg kell adnia a kapcsolati sztringet, például:

hostBuilder
    .AddAzureQueueStreams("AzureQueueProvider", configurator =>
    {
        configurator.ConfigureAzureQueue(
            ob => ob.Configure(options =>
            {
                options.ConnectionString = "[PLACEHOLDER]";
                options.QueueNames = new List<string> { "yourprefix-azurequeueprovider-0" };
            }));
    configurator.ConfigureCacheSize(1024);
    configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
    {
      options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
    }));
  })
  // a PubSubStore could be needed, as example Azure Table Storage
  .AddAzureTableGrainStorage("PubSubStore", options => {
    options.ConnectionString = "[PLACEHOLDER]";
  })

A lekéréses ügynökök többször lekérnek, amíg nincs több üzenet az üzenetsoron, majd egy konfigurálható időszakra késleltetik a lekérés folytatását. Ez a folyamat minden sor esetében megtörténik. A lekéréses ügynökök belsőleg egy gyorsítótárban (üzenetsoronként egy gyorsítótárban) helyezik el az üzeneteket a fogyasztóknak való kézbesítéshez, de ha a gyorsítótár megtelik, leállítják az olvasást. Az üzenetek a felhasználók feldolgozása után törlődnek a gyorsítótárból, így az olvasási arányt nagyjából szabályozni kell a fogyasztók feldolgozási sebességével.

Az Azure Queue alapértelmezés szerint Orleans8 sor (lásd AzureQueueOptions) és 8 kapcsolódó lekérési ügynököt használ, késleltetése 100 ms (lásd StreamPullingAgentOptions.GetQueueMsgsTimerPeriod), gyorsítótármérete pedig (IQueueCache) 4096 üzenet (lásd SimpleQueueCacheOptions.CacheSize).

Konfiguráció

Az alapértelmezett konfigurációnak meg kell felelnie egy éles környezetnek, de speciális igények esetén lehetséges az alapértelmezett viselkedés konfigurálása. Egy fejlesztői gépen például csökkenthető a lekérdezési ügynökök száma úgy, hogy csak egyetlen üzenetsort használjunk. Ez segíthet csökkenteni a processzorhasználatot és az erőforrás-terhelést.

hostBuilder
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>(
        "AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options =>
            {
                options.ConnectionString = "[PLACEHOLDER]";
                options.QueueNames =
                    new List<string>
                    {
                        "yourprefix-azurequeueprovider-0"
                    };
            }))

Hangolás

Éles rendszerben előfordulhat, hogy az alapértelmezett konfigurációt finomhangolnia kell. A hangoláskor figyelembe kell venni néhány tényezőt, és ez szolgáltatásspecifikus.

  1. Először is a legtöbb beállítás üzenetsoronként van megadva, így sok stream esetén az egyes üzenetsorok terhelése csökkenthető több üzenetsor konfigurálásával.
  2. Mivel a streamek streamenként sorrendben dolgozzák fel az üzeneteket, a továbbítási tényező az egyetlen streamen küldött események száma lesz.
  3. A gyorsítótáridő () és a StreamPullingAgentOptions.GetQueueMsgsTimerPeriod (AzureQueueOptions.MessageVisibilityTimeout) ésszerű egyensúlya az, hogy a láthatóságot úgy kell konfigurálni, hogy a gyorsítótárban az üzenetek várható ideje megduplázódjon.

példa

Az alábbi jellemzőkkel rendelkező rendszer feltételezve:

  • 100 folyam
  • 10 üzenetsor,
  • Minden stream percenként 60 üzenetet dolgoz fel,
  • Minden üzenet feldolgozása körülbelül 30 m-t vesz igénybe,
  • 1 percnyi üzenet a gyorsítótárban (gyorsítótáridő).

Így kiszámíthatjuk a rendszer néhány paraméterét:

  • Az streamek üzenetsorok közötti kiegyensúlyozása ideális esetben 10 stream/üzenetsor (100 stream / 10 üzenetsor). Mivel a streamek nem mindig lesznek egyenletesen kiegyensúlyozottak az üzenetsorok között, az ideális duplázása (vagy akár triplázása) biztonságosabb, mint az ideális eloszlást várni. Ezért 20 folyam/üzenetsor (a biztonsági tényezőként 10 folyam/üzenetsor x 2) valószínűleg ésszerű.

  • Üzenetek/perc: Ez azt jelenti, hogy minden üzenetsor legfeljebb 1200 üzenetet/percet (60 üzenet x 20 streamet) fog feldolgozni.

Ezután meghatározhatjuk a láthatósági időt:

  • Láthatósági idő: A gyorsítótáridő (1 perc) 1 percnyi üzenet tárolására van konfigurálva (tehát 1200 üzenet, ahogy a fenti üzeneteket kiszámítottuk). Feltételeztük, hogy minden üzenet feldolgozása 30 ms-ot vesz igénybe, majd arra számíthatunk, hogy az üzenetek legfeljebb 36 másodpercet töltenek a gyorsítótárban (0,030 mp x 1200 msg = 36 mp), így a láthatósági idő - a biztonság érdekében megduplázva - több mint 72 másodpercnek kell lennie (36 másodperc az x 2 gyorsítótárban). Ennek megfelelően, ha nagyobb gyorsítótárat határozunk meg, az hosszabb láthatósági időt igényelne.

Végső szempontok egy valós rendszerben:

  • Mivel a sorrend csak streamenként van meghatározva, és az üzenetsorok sok streamet használnak, az üzenetek valószínűleg párhuzamosan több streamben kerülnek feldolgozásra (például van egy szemcse a stream számára, amely párhuzamosan futhat). Ez azt jelenti, hogy sokkal kevesebb idő alatt elégetjük a gyorsítótárat, de a rosszabb esetben azt terveztük: ez lehetővé teszi a rendszer számára, hogy továbbra is jól működjön, még időszakos késések és átmeneti hibák esetén is.

Az Azure Queue Streams így konfigurálható:

hostBuilder
  .AddAzureQueueStreams("AzureQueueProvider", configurator => {
    configurator.ConfigureAzureQueue(
      ob => ob.Configure(options => {
        options.ConnectionString = "[PLACEHOLDER]";
        options.QueueNames = new List<string> {
          "yourprefix-azurequeueprovider-1",
          [...]
          "yourprefix-azurequeueprovider-10",
        };
        options.MessageVisibilityTimeout = TimeSpan.FromSeconds(72);
      }));
    configurator.ConfigureCacheSize(1200);
  })