Freigeben über



März 2018

Band 33, Nummer 3

Dieser Artikel wurde maschinell übersetzt.

Azure: Integrationsmuster für Unternehmensdaten mit Azure Service Bus

Durch Stefano Tempesta

Klicken Sie in das Alter von Big Data und Machine Learning abrufen und Verwalten von Informationen ist wichtig, aber dies daher kann sind ein komplexes Daten häufig je komplexer als Sie nutzen möglicherweise. Wenn Sie erwägen, wie Sie Ihre Anwendung mit anderer IT-Systeme zu kommunizieren, ist ein effektiver Entwurf für den Datenaustausch Schlüssel zum Erfolg. Dieser Artikel bietet eine Übersicht und die Implementierung von Data Integration-Prozessen zwischen Anwendungen, die mithilfe von Azure Service Bus.

Entwurfsmuster für Data-Integration

Die Daten fließen in mehrere Richtungen über Netzwerke, Anwendungen und temporäre oder permanente Repositorys. Es kann ausgetauscht werden, von einem Datensatz oder batchweise über Systeme, die in Echtzeit oder über geplante Synchronisierungsaufträge, miteinander zu kommunizieren. Trotz der Vielzahl von Datenintegration "Journeys" ist es möglich, zu identifizieren allgemeiner Entwurfsmuster für die Vorgehensweise in einem Enterprise-Kontext begegnen, bei denen Anforderungen für hohe Verfügbarkeit, garantierte Übermittlung und Sicherheit enthalten. Entwurfsmuster in der Softwareentwicklung vorgelagert sind die logischen und am besten erprobte Sequenzen Schritte aus, um eine Aufgabe zu beheben. Die vier am häufigsten verwendete Entwurfsmuster für die Datenintegration sind Broadcast, Aggregation, bidirektionale Synchronisierung und Korrelation.

In diesem Artikel ich aller dieser Daten Integration Entwurfsmuster einzuführen und ihre Anwendung im Kontext von Azure Service Bus beschreiben. Ein Ansatz für die Datenintegration, die einen Enterprise Servicebus (ESB) nutzt vereinfacht die Implementierung dieser Muster auf eine sehr effektive Weise von Quelle und Ziel-Systeme, Häufigkeit der Kommunikation, einfach zu definieren und Formatieren der Daten für die Eingabe und die Ausgabe. Zusammen mit der Beschreibung der einzelnen Muster enthalten ich Codebeispiele zum Veranschaulichen der Kommunikation mit Azure Service Bus verwendet.

Kontext und Anforderungen

Meine Weg zu Datenintegration zunächst definieren den Enterprise-Kontext. In diesem Fall gehen wir davon aus, dass ich bin, erstellen eine e-Commerce-Plattform mit den typischen Anforderungen für einen Onlinekatalog und Warenkorb Einkaufswagen und, dass ich meine Anwendung in Azure veröffentlicht werden. Die e-Commerce-Anwendung ist Teil einer größeren Ökosystem von IT-Systeme, einige für eine öffentliche Cloud verfügbar gemacht werden, einige noch auf einem privaten Datencenter gehostet. Daher habe ich eine wirklich hybride Kontext betrieben. Die datenintegrationsanforderungen umfassen die folgenden Funktionen:

  1. Übertragen Sie sales Produktinformationen aus der e-Commerce-Anwendung, einem Fakturierungssystem und soziale Medien-Plattformen.
  2. Empfangen Sie Produktinformationen für die Verfügbarkeit von einer-Exports (ERP)-Anwendung und produktbeschreibungen von Drittanbieter-Systemen.
  3. Fügen Sie die Verfolgung Pakete ausgeliefert hinzu.
  4. Teilen Sie Kundendaten mit Partnerorganisationen für Co-marketing-Aktivitäten.

Alle integrationsanforderungen können mit den Entwurfsmustern, die zuvor erwähnten behandelt werden. Sehen wir uns das einmal näher an.

Broadcast-Muster

Für die erste Funktion muss ich zum Extrahieren von Daten, die für Produktverkäufe aus der e-Commerce-Anwendung und auf mehrere Zielsystemen übertragen – eine finanzielle System für das Ausstellen von Rechnungen und eine oder mehrere soziale Medien-Plattformen für das Heraufstufen. Die datenflussrichtung wird eindirektionale aus der Anwendung mit externen Systemen. Ich bin im Grunde die Informationen auf der ganzen Welt außerhalb senden.

Broadcast Integration-Muster beschreibt, wie zum Übertragen von Daten aus einer Anwendung auf mehrere Zielsystemen in einem fortlaufenden in Echtzeit oder in der Nähe von Echtzeit-Fluss. Dieser Prozess wird erwartet, transaktional sein: Wenn eine Transaktion erfolgreich abgeschlossen wurde, sind die Daten am Ziel ein Commit ausgeführt wurde. Wenn die Transaktion ein Fehler auftritt, wird die Datenübertragung abgebrochen. Es ist offensichtlich, dass diese broadcast-Integrationskanal hochverfügbar und zuverlässig, sein muss, um den Verlust wichtiger Daten während der Übertragung zu vermeiden. Einsetzen von ESB als Mechanismus für die Datenpakete queuing und auftragsdetailinformationen gruppiert und Übermittlung am Ziel wird entscheidend.

Implementieren des broadcast-Musters eng ähnelt die Veröffentlichen/Abonnieren-Muster implementieren, in Azure Service Bus, anhand der Themen und Abonnements (bit.ly/2oOmTtM). Themen darstellen Warteschlangen von Nachrichten, die empfangenden Anwendungen (Abonnenten) zu abonnieren, um Updates zu erhalten, wenn eine Nachricht zurückgesendet wird. Meine eCommerce-Anwendung veröffentlicht eine Nachricht in ein Thema. Das ESB fungiert als ein nachrichtenbroker und garantiert die Zustellung der Nachricht am Ziel von "Push" die Nachricht an das Ziel, die nur von abonnierten Empfängern besteht.

Senden ein Datenpaket aus der e-Commerce-Anwendung im Wesentlichen bedeutet Veröffentlichung einer Nachricht in einem Thema und müssen eine Zielanwendung für ein bestimmtes Abonnement überwacht. Broadcast Muster hinzugefügt Datenfluss mit der Möglichkeit zum Abbrechen der Transaktions bei einem Ausfall der Übermittlung eine transaktionale Attribut. Wie Transaktionen System hinweg gelten, profitieren sie von einem "Zustandsautomaten", die eine Momentaufnahme der vermittelte Nachricht übertragen werden, bevor er von allen abonnierte Anwendungen gelesen wird beibehalten. Wenn alle Abonnenten ein Fehler auftritt, die Nachricht abzurufen, wird die gesamte Transaktion abgebrochen, zur Gewährleistung der Konsistenz auf allen beteiligten Systemen.

Der folgende Code sendet eine Nachricht an ein Azure Service Bus-Topic und implementiert ein Zustandsautomat (bit.ly/29tKRT3) zum Nachverfolgen der Lieferung einer Nachricht:

public class Broadcast
{
  public async Task Execute(Entity entity)
  {
    var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
    var message = new BrokeredMessage(JsonConvert.SerializeObject(entity));
    await client.SendAsync(message);
  }

Im Fall eines Fehlers bei der Übermittlung verschiebt das Zustandsautomat die Nachricht an die Warteschlange "Dead Letter" im Azure-Servicebus. Die Nachricht wird an diesem Punkt ist nicht mehr gültig ist, für die Datenübertragung und wird nicht weiter verarbeitet werden.

Senden einer Nachricht an ein Thema in der Azure Service Bus erfordert eine TopicClient-Verbindung und einer "brokeredmessage" umschließen die ursprüngliche Entität und gesendet, asynchron mit dem Bus. Alle erforderlichen Objekte für die Verbindung mit Azure Service Bus im WindowsAzure.ServiceBus NuGet-Paket verteilt werden und im Microsoft.ServiceBus.Messaging-Namespace verfügbar sind.

Das Zustandsautomat ist ein asynchroner Singleton-Wörterbuch, Transaktion Leistungsindikatoren Thema enthält. Das Wörterbuch verfolgt die Anzahl der aktiven Transaktionen – Abonnenten – warten, die auf eine Nachricht zu einem bestimmten Thema vom Service Bus. Das Wörterbuch ist threadsicher, um gleichzeitige Anforderungen zu ermöglichen:

private static StateMachine _instance;
public static StateMachine Current => _instance ?? (_instance = new StateMachine());
protected ConcurrentDictionary<string, int> transactions =
  new ConcurrentDictionary<string, int>();

Siehe Abbildung 1, eine Anwendung Abonnenten eine Nachricht aus dem Service Bus-Thema gelesen, die eine neue Transaktion (mit der BeginTransactionAsync-Methode) für ein bestimmtes Thema für den Zustandsautomaten ab und verarbeitet dann die OnMessage-Ereignis, das eine Kopie der Entität abzurufen. Die Entität wird dann intern verarbeitet; Sie können z. B. vom empfangenden System übernommen werden. Im Fall eines Fehlers wird die Transaktion abgebrochen.

Abbildung 1 Lesen einer Nachricht aus der Servicebus-Thema

public async Task ReadMessageAsync()
{
  await StateMachine.Current.BeginTransactionAsync(topicName);
  var client = SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName,
    subscriptionName);
  client.OnMessageAsync(async message =>
  {
    var entity = JsonConvert.DeserializeObject(message.GetBody<string>());
    try
    {
      Save(entity);
      await StateMachine.Current.SuccessAsync(message , topicName);
    }
    catch
    {
      await StateMachine.Current.CancelAsync(message , topicName);
    }
  });
}

Abschließen oder Abbrechen der Transaktion wird von der Zustandsautomat mit einer dieser beiden Methoden verwaltet – SuccessAsync oder CancelAsync. SuccessAsync ruft CompleteAsync auf die vermittelte Nachricht verarbeitet und aus dem Thema schließlich gelöscht, gibt an, dass die Nachricht als markiert werden soll. Dies erfolgt nur, wenn alle gleichzeitig aktive Transaktionen abgeschlossen werden:

public async Task<bool> SuccessAsync(BrokeredMessage message, string topicName)
{
  bool done = await EndTransactionAsync(topicName);
  int count = Current.transactions[topicName];
  // All concurrent transactions are done
  if (done && count == 0)
  {
     await message.CompleteAsync();
  }
  return done;
}

CancelAsync, bricht im Gegensatz dazu die Nachricht übertragen, indem Sie das Zurücksetzen des Transaktion Zähler für ein Thema ab. Durch Aufrufen der DeadLetterAsync-Methode, ist die vermittelte Nachricht dann an die Warteschlange "Dead Letter" verschoben, auf die nicht erfolgreich verarbeitete Nachrichten gespeichert sind:

public async Task<bool> CancelAsync(BrokeredMessage message, string topicName)
{
  // Cancel the message broadcast -> Remove all concurrent transactions
  int count = Current.transactions[topicName];
  bool done = Current.transactions.TryUpdate(topicName, 0, count);
  if (done)
  {
    await message.DeadLetterAsync();
  }
  return done;
}

Das Muster "Aggregation"

Die zweite Anforderung für Meine e-Commerce-Plattform ist zum Freigeben von Informationen zu Produkten von externen Systemen und in der Web-Portal zu konsolidieren. Die Richtung des Datenflusses wird in diesem Fall, broadcast-Muster. Die Anforderung jetzt wird zum Aggregieren von Daten aus verschiedenen Quellen in einer einzelnen Stelle. Ein einfacher Ansatz wäre, Daten aus der jeweiligen Quelle in die e-Commerce-Anwendung direkt, Point-to-Point zu importieren. Dies ist jedoch eine skalierbare Lösung, wie es schon sagt, erstellen eine andere Verbindung für jeden externen System, das Senden von Daten im Ziel-Repository. Stattdessen durch Aggregieren von Daten in einem Prozess über eine ESB ich entfällt der Bedarf für mehrere unidirektionale Integrationen und Bedenken hinsichtlich der Genauigkeit und Konsistenz mindern, wie Daten in einer atomarischen Transaktion verarbeitet werden.

Was tritt auf, ist jedoch die Herausforderung, Zusammenführen von Daten in eine einzelne Entität ohne Informationen duplizieren oder noch schlimmer, beschädigen sie. Es ist häufig erforderlich, implementieren benutzerdefinierte Merge Logik als Teil der Integration werden zum Nachverfolgen von aggregierten Datensätze, die von anderen Systemen stammen und in eine oder mehrere Entitäten in der Anwendung zu speichern. Ich benötige eine Zuordnungstabelle die Entitäts-ID in der Zieldatenbank Datensatz-IDs in der unterschiedliche Quelldatenbanken zugeordnet werden soll. Diese Zuordnungstabelle in der Regel in einer Datenbank hohe Transaktion beibehalten und durch benutzerdefinierte Logik zur Zusammenführung aktualisiert werden, bei der Aggregation.

Wie bei der Übertragung Muster gibt die Implementierung dieses Prinzips Integration auch an ein Azure Service Bus-Thema mit dem Unterschied, dass in diesem Fall eCommerce-Anwendung auf dem Zielsystem empfangen von Daten (Abonnieren ist veröffentlichen/abonnieren für ein Thema) aus anderen Quellsystemen über das ESB. Der gesamtlösung muss auch einige Daten Merge Logik und die datenzuordnung zu verwenden, um die Quelldatensatz-IDs für und die Entitäts-ID am Ziel nachzuverfolgen.

Wie Sie in sehen Abbildung 2, Lesen von Nachrichten aus einem Thema besteht aus eine Abonnement-Verbindung (SubscriptionClient) erstellen und zum Behandeln von OnMessage-Ereignis, das vom abonnementclient ausgelöst wird, wenn eine neue Nachricht verfügbar ist In diesem Thema. Angenommen, die empfangene Nachricht ein Objekt, das von einer externen Anwendung gesendete enthält ein Hersteller Produktdetails zu senden. Dieses Objekt dann in meinem System mithilfe der Zuordnung Entität einer Entität zugeordnet ist, und wenn die Entität bereits in der Datenbank vorhanden ist, kann ich aktualisieren sie; Andernfalls wird eine neue zu erstellen.

Abbildung 2 Lesen von Nachrichten aus einem Thema

public class Aggregation
{
  public void Execute()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessage(message => {
      ProductEntity product =
        EntityMap.Instance.MapToEntity<ProductEntity>(message);
      // Persist the product
      var exists = Find(product.Id) != null;
      if (exists)
        Update(product);
      else
        Create(product);
      });
  }

Der Zuordnungsprozess, in der Klasse EntityMap implementierten besteht aus zwei wichtige Schritte:

  1. Erstellt eine Zuordnung zwischen dem Objekt im externen System und die Entität in der Datenbank. Diese Zuordnung identifiziert das externe Objekt durch die Kopplung der Systemname (z. B. "ERP," "" Hersteller1"," "Vendor2") mit dem Primärschlüssel. Die zugeordnete Entität wird anhand des Typs in meiner Anwendung ("Produkt", "Customer", "Order") und seine ID identifiziert.
  2. Erstellt einen Datensatz der Entität mit den Eigenschaften des externen Objekts. Dies ist der benutzerdefinierte Merge-Logik, die genauso einfach wie das Verwenden einer Bibliothek wie AutoMapper (automapper.org) zum Zuordnen von Objekten sein kann.

Siehe Abbildung 3, die Objekt-Entität-Karte ist ein Wörterbuch, das ein System Schlüsselpaar primären Namen einer Entität Typ Entity Id Gegenstück zuordnet. Ein Objekt in einem externen System wird durch die Kombination aus Namen und den Primärschlüssel, eindeutig identifiziert werden, während eine Entität in der Anwendung durch die Kombination Entitäts-Typ und die Entität-Id identifiziert wird.

Abbildung 3: die Objekt-Entität-Karte

Systemname Primärschlüssel Entitätstyp Entitäts-Id
ERP ABC012345 Produkt FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
Anbieter 1 1000 Produkt FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
ERP ABD987655 Produkt 2110F684-C277-47E7-B8B9-6F17A579D0CE
Hersteller 2 1001 Produkt 2110F684-C277-47E7-B8B9-6F17A579D0CE

Die Zuordnung wird durch den Systemnamen und Primärschlüssel aus den Eigenschaften für vermittelte Nachrichten abrufen und erstellen und anschließend eine Entität mit AutoMapper, aufgefüllt, wie gezeigt in Abbildung 4.

Abbildung 4 Auffüllen der Karte

public T MapToEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  string systemName = message.Properties["SystemName"] as string;
  string primaryKey = message.Properties["PrimaryKey"] as string;
  T entity = BuildEntity<T>(message);
  map.Add((systemName, primaryKey), (entity.GetType(), entity.Id));
  return entity;
}
private T BuildEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  var source = JsonConvert.DeserializeObject(message.GetBody<string>());
  T entity = Mapper.Map<T>(source);
  return entity;
}

Eine brokernachricht kann mit eine zusätzliche Eigenschaft ergänzt werden, die die Verleger-Anwendung festgelegt werden soll, vor dem Senden der Nachricht an die ESB:

public async Task SendMessageToServiceBus(object obj)
{
  var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
  var message = new BrokeredMessage(JsonConvert.SerializeObject(obj));
  message.Properties["SystemName"] = "Publisher System Name";
  message.Properties["PrimaryKey"] = "Object Primary Key";
  await client.SendAsync(message);
}

Die bidirektionale Synchronisierung-Muster

Betrachten wir nun die dritte Anforderung: Nachverfolgen an Speicherort hinzufügen Pakete ausgeliefert. Ich möchte mehr generische ausgedrückt das Attribut einer Entität mit zusätzlichen Attributen zu erweitern, die von einer speziellen externe Anwendung bereitgestellt werden. Aus diesem Grund wird dieses Muster manchmal als Ergänzung Muster bezeichnet.

Ein bidirektionaler Synchronisierungsprozess ausgeführt mit einer Drittanbieter-Line-of-Business-Anwendung beteiligten Systeme können ihre Funktionalität jenseits von Umrisslinien erweitern. Beispielsweise können Dynamics 365 ist ein Kunde Beziehung Management-Plattform (und vieles mehr), die systemintern wird mit SharePoint integriert für das Management. Dynamics 365 bleibt weiterhin die master-Datenquelle für alle Datensätze Dokumente gespeichert werden hingegen in SharePoint als eine Erweiterung der Entitäten im CRM-System. Im Wesentlichen Wenn zwei Systeme in bidirektionalen Synchronisierung sind, verhalten sich wie ein System und gleichzeitig ihre eigenen Datasets und, natürlich Funktionalität beibehalten. Daten über die beiden Systeme verteilt ist, aber es werden als einzelne Entität über die nahtlose Integration angezeigt.

Die meisten der Zeit, und wie mit Dynamics 365 und SharePoint, diese direkten und Echtzeit-Synchronisierung nicht mit einem Servicebus implementiert wird. Ein Punkt-Connector existiert in der Regel, die weiß, wie mit entweder ein System mit Mindestkonfiguration kommunizieren kann. Aber was geschieht, wenn eine systemeigene Verbindung zwischen Anwendungen ist nicht vorhanden? Neben dem Erstellen eines benutzerdefinierten Connectors, möglicherweise nicht einfache Aufgabe an alle (nur Stellen Sie sich den Aufwand für die Authentifizierung und API-Aufrufe verstehen als auch bieten die hohe Verfügbarkeit und Übermittlung, die typisch für die ESB-Garantie), ist was Sie tun können implementieren ein Relay.

Azure Service Bus-Relay (bit.ly/2BNTBih) ist eine Erweiterung der Service Bus, die Kommunikation zwischen Systemen in einer hybridkonfiguration erleichtert, weil eine sichere Verbindung mit Systemen kann nicht zugegriffen werden aus der öffentlichen Cloud. Nehmen wir an, dass das GIS-System, dem ich verbunden bin in einem privaten Unternehmensnetzwerk gehostet wird. Übertragung von Daten vom lokalen Dienst initiiert eine Verbindung mit dem Relay über einen ausgehenden Port erstellen einen bidirektionalen Socket für die Kommunikation, die an eine bestimmte rendezvousadresse gebunden sind. E-Commerce-Anwendung, die in Azure gehosteten kann dann mit dem Dienst GIS hinter der Firewall durch Senden von Nachrichten an den Relaydienst, die Verwendung der rendezvousadresse kommunizieren. Der Relaydienst leitet"dann" Daten an den lokalen Dienst über einen dedizierten bidirektionalen Socket. Der e-Commerce-Anwendung eine direkte Verbindung mit dem lokalen GIS-Dienst muss nicht (und kann nicht erstellt werden), und er muss nicht auch wissen, wo sich der Dienst befindet, wie die gesamte Kommunikation nur für das Relay ist.

Nur um ein weiterer Vorteil der Implementierung einer Lösung basierend auf Azure-Relay zu verdeutlichen, unterscheiden sich die Relayfunktionen von Netzwerkebene integrationstechnologien wie VPNs, da sie auf einen einzelnen Anwendungsendpunkt auf einem einzelnen Computer begrenzt werden können. VPN-Technologie ist im Gegensatz dazu wesentlich mehr intrusiv, wie es beruht auf die Umgebung ändern.

Das NuGet-Paket Microsoft.Azure.Relay enthält die gleichnamige-Namespace mit der relevanten Objekte für die Verwaltung von Kommunikation mit Azure Service Bus-Relay. Zunächst definieren zuerst das GIS-Server jedoch die besteht aus:

  • GisObject: Ein Objekt zum Speichern von geografischen Koordinaten (Breitengrad und Längengrad) und ein vollständig aufgelöstes Speicheradresse.
  • GisProcess: Ein Prozess, der eine bidirektionale Verbindung mit dem Server GIS über Azure-Relay verwaltet und überträgt die Instanzen von GisObject zwischen dem GIS-Server und die e-Commerce-Anwendung.
  • ServerListener: Eine Erweiterung der GIS-Server, der als Brücke zwischen dem GIS-Server selbst und Azure-Relay fungiert.

Die bidirektionale Verbindung bleibt in mehreren Schritten:

Zunächst erstellen Sie einen Hybrid-Verbindungs-Client zu Azure-Relay, mittels eines Zugriffsschlüssels-Sicherheit über das Azure Portal abgerufen:

var tokenProvider =
  TokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key);
var client = new HybridConnectionClient(
  new Uri($"sb://{relayNamespace}/{connectionName}"), tokenProvider);
var relayConnection = await client.CreateConnectionAsync();

Nachdem die Verbindung hergestellt ist, werden ich zwei asynchrone Aufgaben nacheinander ausgeführt: Die erste Aufgabe sendet eine Instanz des GisObject mit breiten-und Längenkoordinaten an das Relay; der zweite Task liest dieses Objekt aus dem Relay wieder. Nach Abschluss der beide Aufgaben wird die hybridverbindung geschlossen:

await new Task(
  () => SendToRelay(relayConnection, gisObject)
  .ContinueWith(async (t) =>
  {
    GisObject resolved = await ReadFromRelay(relayConnection);
    ShowAddress(resolved);
  })
  .ContinueWith(async (t) =>
    await relayConnection.CloseAsync(CancellationToken.None))
  .Start());

Senden ein Objekt zu Azure-Relay wird durch eine Nachricht in einen Stream schreiben. Sie können das Objekt in verschiedenen Formaten serialisieren; in der Regel erfolgt dies im JSON-Format:

private async Task SendToRelay(HybridConnectionStream relayConnection,
  GisObject gisObject)
{
  // Write the GIS object to the hybrid connection
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  string message = JsonConvert.SerializeObject(gisObject);
  await writer.WriteAsync(message);
}

Auf ähnliche Weise umfasst das Lesen eines Objekts aus der Azure-Relay aus einem Stream lesen und Deserialisieren der erhaltenen Zeichenfolge von Zeichen in den ursprünglichen Objekttyp:

private async Task<GisObject> ReadFromRelay(HybridConnectionStream relayConnection)
{
  // Read the GIS object from the hybrid connection
  var reader = new StreamReader(relayConnection);
  string message = await reader.ReadToEndAsync();
  GisObject gisObject = JsonConvert.DeserializeObject<GisObject>(message);
  return gisObject;
}

Das GIS-Server auch Lauscht auf Datenverkehr über das Relay, liest die eingehende Nachrichten, die mit einer Instanz von serialisierten GisObject und löst eine Speicheradresse durch den Aufruf eines bestimmten GIS-Diensts (nicht in der vorgeschlagenen Lösung beschrieben):

private async Task Listen(HybridConnectionListener listener,
  CancellationTokenSource cts)
{
  // Accept the next available, pending connection request
  HybridConnectionStream relayConnection;
  do
  {
    relayConnection = await listener.AcceptConnectionAsync();
    if (relayConnection != null)
    {
      ProcessMessage(relayConnection, cts);
    }
  } while (relayConnection != null);
}

Die Verbindung ist ein vollständig bidirektionaler Datenstrom. Als Abbildung 5 gezeigt, die ich einem StreamReader und hinzufügen ein StreamWriter, mir die Leseberechtigung für das GIS JSON-serialisierten Objekt, und es an den relayendpunkt zurückzuschreiben nach dem Beheben der bereitgestellten Geo-Koordinaten in eine Speicheradresse Replikationsaktivität.

Abbildung 5 lesen und Schreiben der JSON-serialisierten GIS-Objekts

private async void ProcessMessage(HybridConnectionStream relayConnection,
  CancellationTokenSource cts)
{
  // Bidirectional streams for reading and writing to the relay
  var reader = new StreamReader(relayConnection);
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  while (!cts.IsCancellationRequested)
  {
    // Read a message in input from the relay
    var message = await reader.ReadToEndAsync();
    // Resolve address by invoking a service on the GIS server
    GisObject gisObject =
      JsonConvert.DeserializeObject<GisObject>(message);
    await new GisServer().ResolveAddressAsync(gisObject);
    // Write the message back to the relay
    message = JsonConvert.SerializeObject(gisObject);
    await writer.WriteLineAsync(message);
  }
  await relayConnection.CloseAsync(cts.Token);
}

Das Muster für die Korrelation

Es wird eine weitere Anforderung zu erfüllen: Ich benötige Partnerorganisationen Kundendaten freigeben. Aber ich möchte nicht an das Offenlegen von Informationen, die Partner sind nicht berechtigt, auf zuzugreifen. Ich muss eine Möglichkeit zum von Synchronisierungsdaten zwischen Systemen nur implementieren, wenn sie miteinander korreliert sind.

Das Muster für Korrelation konzentriert sich auf die Schnittmenge von zwei Datasets und führt eine Synchronisierung das bewertete Dataset, wenn ein Datensatz in beiden Systemen vorhanden ist. Während der Relay-Kommunikation mit dem GIS-Server einen neuen Datensatz erstellen würden, wenn das Objekt im System nicht gefunden werden, erfordert Datenintegration basierend auf der Korrelation-Muster implementieren unbedingt, dass in beiden Systemen für die Synchronisierung auf korrelierte Datensätze vorhanden sind der Fall sein. Dies gilt perfekt für meine Fall, in dem ich Daten gemeinsam mit Partnern, jedoch nur, wenn sie bereits diese Informationen in ihren eigenen System marketing möchte. Es gibt jedoch eine klare Herausforderung – wie identifiziere ich verknüpfte Datensätze, die die gleiche Entität (Kunden) über Systeme hinweg darstellen? Diese Bedingung definiert, ob die Kundendatensätze mit externen Partnern synchronisiert werden können.

Siehe Abbildung 6, der Daten Korrelation Workflow in der e-Commerce-Anwendung sendet einen Kundendatensatz mit einigen Marketinginformationen an ein Azure Service Bus-Thema. Der Kundendatensatz ist eine Aggregation von Daten aus mehreren Entitäten. Es ist nicht ratsam, verwenden Sie das gleiche Objekt (eine datenbankentität) als ein Datenübertragungsobjekt (DTO), wie dies in der quellanwendung eine Abhängigkeit zwischen dem Dienst und das Datenmodell erstellen würden. Die vermittelte Nachricht ergänzt wird auch mit einer Korrelations-ID, die der bestimmten Datensatz in einem Abonnement Thema identifiziert; Diese Korrelations-ID wird weiter unten in der partneranwendung für das Überprüfen, ob bereits ein Kundendatensatz hilfreich sein.

Abbildung 6 die Korrelation-Klasse

public class Correlation
{
  private async Task Execute(CustomerEntity customer)
  {
    // Map the Customer entity in the e-commerce application (source)
    // to Customer record in the partner application (destination)
    CustomerRecord customerRecord = PrepareCustomerRecord(customer);
    // Create a connection to an Azure Service Bus Topic
    // Serialize the customer record and send the message to the Topic
    var client = TopicClient.CreateFromConnectionString(
      connectionString, topicName);
    var message = new BrokeredMessage(
      JsonConvert.SerializeObject(customerRecord));
    // Register the customer record with the Correlation Service
    // and obtain a Correlation ID
    message.Properties["CorrelationId}"] =
      new CorrelationService().RegisterCustomer(customerRecord, subscriptionName);
    await client.SendAsync(message);
  }

Der Dienst für die Korrelation zeigt einfach Methoden zum Abgleich Kundendatensätze zu einem bestimmten Abonnement, und registrieren einen neuen Kunden und die Korrelations-ID zurückgeben:

public class CorrelationService
{
  public Guid RegisterCustomer(CustomerRecord record, string subscription)
  {
    return store.ContainsKey((record, subscription)) ?
      GetCustomerCorrelationId(record, subscription) :
      AddCustomer(record, subscription);
  }
  public bool CustomerExists(Guid correlationId)
  {
    return store.ContainsValue(correlationId);
  }

Partners-Anwendungen für dieses Thema zu abonnieren und den Kundendatensatz und die Korrelations-ID abrufen, wie gezeigt in Abbildung 7. Wenn der Kundendatensatz in ihrem System vorhanden ist, können sie schließlich gespeichert werden.

Abbildung 7 die Partner-Klasse

class Partner
{
  public void ReceiveCustomerRecord()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessageAsync(async message =>
    {
      CustomerRecord customerRecord =
        JsonConvert.DeserializeObject<CustomerRecord>(message.GetBody<string>());
      Guid correlationId = (Guid)message.Properties["CorrelationId"];
      if (CustomerRecordExists(correlationId))
      {
        await SaveAsync(customerRecord);
      }
    });
  }

Die gesamte Projektmappe steht kostenlos zum Herunterladen von meinem GitHub-Repository auf bit.ly/2s0FWow.


Stefano Tempestaist ein Microsoft MVP und MCT und Kapitel führender Anbieter von CRMUG Schweiz festgelegt wurde. Reguläre Referent bei internationalen Konferenzen, einschließlich der Microsoft Ignite "," Tech Summit "und" Woche für Entwickler, Erweitern Stefanos Interessen Office & Dynamics 365, Blockchain und AI-bezogene Technologien.

Unser Dank gilt dem folgenden technischen Experten bei Microsoft für die Durchsicht dieses Artikels: Massimo Bonanni
Massimo Bonanni ist senior im Team moderne Apps von Microsoft-Berater und zwanzig Jahre mit Microsoft-Technologien funktioniert hat. Er ist eine Gründern Mitglied der Gruppe der italienischen Benutzer DomusDotNet und DotNET {Podcast}. Er wurde von einem Microsoft-MVP und ist jetzt ein Intel Software Innovator und Intel Schwarz Fließband.


Diesen Artikel im MSDN Magazine-Forum diskutieren