共用方式為



2018 年 3 月

第 33 卷,第 3 期

本文章是由機器翻譯。

Azure - 企業資料整合模式與 Azure 服務匯流排

Stefano Tempesta

在巨量資料和機器學習的時代,取得並管理資訊非常重要,但是也可以為複雜的任務因為資料通常是更複雜,您可能知道。考慮如何讓您與其他 IT 系統通訊的應用程式,有效設計的資料交換時,成功的關鍵。本文章提供概觀,以及使用 Azure Service Bus 的應用程式之間的資料整合程序的實作。

資料整合設計模式

在透過網路、 應用程式和暫時性或永續性儲存機制的多個方向的資料流。它可能會記錄或逐一批次,透過系統即時或已排程的同步處理工作透過彼此交談中交換。儘管各種不同的資料整合 」 皆 」 中,很可能識別如何解決在企業環境中,需求也就是包含高可用性,保證傳遞和安全性的常見設計模式。在軟體工程的設計模式是最邏輯且經過實證的序列解決工作的步驟。四個最常見的設計模式的資料整合為廣播、 彙總、 雙向同步處理及相互關聯。

在本文中,我會介紹每個這些資料整合的設計模式,並描述其內容中的 Azure 服務匯流排的應用程式。會利用企業服務匯流排 (ESB) 的資料整合的方法,從而完成這些模式的實作非常有效率的方式,只定義來源和目標系統的通訊時,頻率和輸入的資料格式和輸出。每個模式的描述,以及包含程式碼範例來說明與 Azure 服務匯流排通訊。

內容和需求

首先,我會資料整合我之旅藉由定義企業內容。在此情況下,我們假設我建置電子商務平台,與線上目錄的一般需求和購物車,我會在 Azure 中發佈我的應用程式。電子商務應用程式是較大的生態系統,IT 系統、 公用雲端上公開某些、 部分仍私人資料中心上裝載的一部分。因此,我作業系統在真正的混合式內容中。資料整合需求包括下列功能:

  1. 發票的系統以及社交媒體平台,請從電子商務應用程式傳輸產品銷售資訊。
  2. 從企業資源規劃 (ERP) 應用程式和產品描述從協力廠商系統收到產品可用性資訊。
  3. 加入要出貨 parcels 追蹤的位置。
  4. 共用與夥伴組織的共同行銷活動的客戶資料。

所有這些整合需求可以使用先前所述的設計模式來處理。讓我們更仔細。

廣播的模式

針對第一項功能,需要擷取關於電子商務應用程式從產品銷售資料,並將它傳輸至多個目標系統,發出發票的財務系統與升級的一個或多個社交媒體平台。此資料流程方向是單向,從外部系統應用程式。我基本上廣播外界的資訊。

廣播的整合模式描述如何從多個目標系統中連續即時或接近即時串流應用程式傳送資料。此程序必須是交易式:如果交易成功完成,資料就會認可目的地端。如果交易失敗,則會中止資料傳輸。它是顯而易見此廣播的整合通道,必須是高度可用且可靠的以避免遺失重要資料在傳輸過程中。做為佇列資料封包,並保證傳遞,目的地機制採用 ESB 而言最重要的。

密切實作廣播的模式類似於實作中 Azure 服務匯流排主題和訂用帳戶為基礎的發佈/訂閱模式 (bit.ly/2oOmTtM)。主題代表佇列的訊息至收件者應用程式 (「 訂閱者 」) 將訊息張貼時接收更新的訂閱。我的電子商務應用程式發佈至主題的訊息。ESB 做為訊息代理程式,並位於目的地的訊息傳遞保證的訊息 「 推送 」 到目的地,其中僅包含已訂閱的收件者。

基本上廣播電子商務應用程式的資料封包表示在主題中,發佈訊息,以及取得接聽特定的訂用帳戶的目標應用程式。廣播的模式會將交易的屬性加入至資料流程中,取消故障傳遞交易的可能性。交易跨越系統界限,因為它們會從 「 狀態機器 」 會保留代理的訊息傳輸,它會讀取已訂閱的所有應用程式之前的快照集獲益。如果任何 「 訂閱者 」 無法擷取訊息,整個交易中止,以確保所有相關的系統之間的一致性。

下列程式碼會廣播訊息至 Azure 服務匯流排主題和實作狀態機器 (bit.ly/29tKRT3) 以追蹤訊息的傳遞:

public class Broadcast
{
  public async Task Execute(Entity entity)
  {
    var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
    var message = new BrokeredMessage(JsonConvert.SerializeObject(entity));
    await client.SendAsync(message);
  }

在傳遞的錯誤,狀態機器會將訊息移至 Azure 服務匯流排中的信件 」 佇列。此時訊息已不再有效的資料傳輸,並不會進一步處理。

將訊息傳送到 Azure 服務匯流排中的主題需要 TopicClient 連線,並包裝原始的實體,並將它,以非同步方式傳送至匯流排 BrokeredMessage。所有必要的物件,用於連接到 Azure 服務匯流排 WindowsAzure.ServiceBus NuGet 套件中發佈,並可 Microsoft.ServiceBus.Messaging 命名空間中。

狀態機器是單一的非同步字典,其中包含主題的異動計數器。字典保持使用中交易數目的計數,訂閱者 —,正在等候訊息從服務匯流排的特定主題。字典是安全執行緒,才能允許並行要求:

private static StateMachine _instance;
public static StateMachine Current => _instance ?? (_instance = new StateMachine());
protected ConcurrentDictionary<string, int> transactions =
  new ConcurrentDictionary<string, int>();

中所示圖 1,「 訂閱者 」 應用程式從服務匯流排主題讀取訊息,由特定主題狀態的電腦上,開始新交易 (使用 BeginTransactionAsync 方法),並且再處理若要取得之實體的 OnMessage 事件。在內部,然後處理實體比方說,它會保留收件者的系統。如果發生錯誤,交易被取消。

圖 1 讀取訊息從服務匯流排主題

public async Task ReadMessageAsync()
{
  await StateMachine.Current.BeginTransactionAsync(topicName);
  var client = SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName,
    subscriptionName);
  client.OnMessageAsync(async message =>
  {
    var entity = JsonConvert.DeserializeObject(message.GetBody<string>());
    try
    {
      Save(entity);
      await StateMachine.Current.SuccessAsync(message , topicName);
    }
    catch
    {
      await StateMachine.Current.CancelAsync(message , topicName);
    }
  });
}

完成或中止交易受使用兩種方法之一的狀態機器 — SuccessAsync 或 CancelAsync。SuccessAsync 會 CompleteAsync 叫用在代理的訊息中,這表示,訊息應該會標示為處理,而且從主題,最後刪除。只有當所有並行的使用中交易都完成時,這會發生:

public async Task<bool> SuccessAsync(BrokeredMessage message, string topicName)
{
  bool done = await EndTransactionAsync(topicName);
  int count = Current.transactions[topicName];
  // All concurrent transactions are done
  if (done && count == 0)
  {
     await message.CompleteAsync();
  }
  return done;
}

CancelAsync,相較之下,中止廣播重設交易計數器主題的訊息。藉由呼叫 DeadLetterAsync 方法,代理的訊息接著會移至 「 寄不出的信件 」 佇列中,未成功處理的訊息的儲存位置:

public async Task<bool> CancelAsync(BrokeredMessage message, string topicName)
{
  // Cancel the message broadcast -> Remove all concurrent transactions
  int count = Current.transactions[topicName];
  bool done = Current.transactions.TryUpdate(topicName, 0, count);
  if (done)
  {
    await message.DeadLetterAsync();
  }
  return done;
}

彙總模式

我的電子商務平台的第二個需求是共用產品從外部系統的相關資訊,並將其合併到 Web 入口網站。資料流程的方向在此情況下是見的廣播模式。從單一位置的各種來源彙總資料是現在的需求。簡單的方法是將資料匯入直接、 點,在電子商務應用程式的每一個來源。不過這不是靈活的解決方案,如所示建立不同的連接,每個外部系統傳送資料至目標儲存機制。相反地,由彙總資料透過 ESB 一個處理序中的,我不需要多個單向整合,並且減輕顧慮資料的準確性與一致性,不可部分完成的交易中處理資料時。

什麼是出現,不過,是資料合併到單一實體,而不需要重複資訊或更糟的,損毀的挑戰。通常,它是為了整合程序,以便追蹤來自不同系統的彙總的記錄,並將其儲存在應用程式中的一個或多個實體中實作自訂合併邏輯。我需要對應資料表中不同的來源資料庫的記錄識別碼與目標資料庫內的實體識別碼。此對應資料表通常會保存於高交易資料庫,並彙總程序期間更新的資料合併自訂邏輯。

如同廣播模式中,此整合模式的實作也會反映發行/訂閱的差異,在此情況下,我的電子商務應用程式為目標系統接收資料 (訂閱與 Azure 服務匯流排主題主題中) 從 ESB 透過其他來源系統。整體解決方案也必須使用某些資料合併邏輯和資料對應到追蹤來源記錄識別碼和目的地的實體識別碼。

您可以看到在圖 2,讀取訊息從主題包含建立訂用帳戶連接 (SubscriptionClient),以及處理 OnMessage 事件可以使用新的訊息時引發的訂閱用戶端中的主題。接收的訊息包含由外部應用程式所傳送的物件會假設傳送產品詳細資料的廠商。此物件接著我在系統中,使用的實體對應,對應到實體和實體已存在於資料庫中,如果更新它。否則,我要建立一個新。

圖 2 讀取訊息從主題

public class Aggregation
{
  public void Execute()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessage(message => {
      ProductEntity product =
        EntityMap.Instance.MapToEntity<ProductEntity>(message);
      // Persist the product
      var exists = Find(product.Id) != null;
      if (exists)
        Update(product);
      else
        Create(product);
      });
  }

對應程序、 EntityMap 類別中實作包含兩個重要步驟:

  1. 我的資料庫建立外部系統中的物件與實體之間的對應。此對應會識別主索引鍵配對 (例如"ERP,""Vendor1,""Vendor2") 的系統名稱外部物件。其型別在我的應用程式 (產品、 客戶、 訂單) 和識別碼所識別對應的實體
  2. 它會建置使用外部物件的屬性的實體記錄。這是自訂的合併邏輯,它可以是像是容易使用程式庫如 AutoMapper (automapper.org) 對應物件。

中所示圖 3,物件實體對應是一個字典,將實體型別實體識別碼對應項目使用系統名稱主要金鑰組產生關聯。外部系統中的物件識別唯一的系統名稱和主索引鍵組合而我的應用程式中的實體由組合的實體類型和實體識別碼。

圖 3 物件實體對應

系統名稱 主索引鍵 實體類型 實體識別碼
ERP ABC012345 產品 FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
廠商 1 1000 產品 FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
ERP ABD987655 產品 2110F684-C277-47E7-B8B9-6F17A579D0CE
廠商 2 1001 產品 2110F684-C277-47E7-B8B9-6F17A579D0CE

中所示,對應由從仲介訊息屬性擷取的系統名稱和主索引鍵,然後再建立 AutoMapper,與實體產生圖 4

圖 4 填入對應

public T MapToEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  string systemName = message.Properties["SystemName"] as string;
  string primaryKey = message.Properties["PrimaryKey"] as string;
  T entity = BuildEntity<T>(message);
  map.Add((systemName, primaryKey), (entity.GetType(), entity.Id));
  return entity;
}
private T BuildEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  var source = JsonConvert.DeserializeObject(message.GetBody<string>());
  T entity = Mapper.Map<T>(source);
  return entity;
}

代理的訊息可以增添任何額外的屬性,發行者應用程式應該設定傳送訊息至 ESB 之前:

public async Task SendMessageToServiceBus(object obj)
{
  var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
  var message = new BrokeredMessage(JsonConvert.SerializeObject(obj));
  message.Properties["SystemName"] = "Publisher System Name";
  message.Properties["PrimaryKey"] = "Object Primary Key";
  await client.SendAsync(message);
}

雙向同步處理模式

讓我們看看第三個需求的 [現在: 新增位置來追蹤出貨 parcels。在更泛型的詞彙中,我想要擴充之實體的屬性,以更具特製化的外部應用程式所提供的其他屬性。基於這個理由,這種模式有時稱為增強指定模式。

在雙向同步處理程序與第三方的特定業務應用程式所涉及的系統可以擴充其功能,其界限之外。例如,Dynamics 365 是客戶關係管理平台 (及更多) 的原生與 SharePoint 整合企業文件管理。Dynamics 365 仍有主要資料來源,所有的記錄,但是文件會儲存在 SharePoint 中做為擴充之實體的 CRM 系統中。基本上,這兩個系統都在雙向同步處理之後,它們的行為與一個系統同時仍然保留其自己的資料集和,很明顯地,功能。資料會分散到兩個系統,但被視為是透過此緊密整合的單一實體。

大部分的情況下,以及與 Dynamics 365 和 SharePoint,此直接和即時的同步處理未實作使用服務匯流排。點對點連接器通常存在知道如何向其中一個系統以最低組態。但沒有應用程式之間的原生連接器時,會發生什麼事?除了建置自訂連接器,這可能不是簡單的工作,在所有 (只是想了解驗證和應用程式開發介面呼叫,以及提供高可用性和確保典型的 ESB 傳遞所需投入時間),您可以時實作轉送。

Azure 服務匯流排轉送 (bit.ly/2BNTBih) 是藉由從公用雲端,無法存取的系統啟用安全連線,協助在混合式組態中的系統之間的通訊的服務匯流排的擴充功能。例如,假設我所連接的 GIS 系統裝載在私人公司網路中。在內部部署服務所初始化的資料傳輸會連線至轉送透過輸出連接埠,建立雙向通訊端,繫結至特定會合位址的通訊。電子商務應用程式,裝載在 Azure 中,可以再進行通訊與位於防火牆後面 GIS 服務傳送訊息至以會合位址為目標的轉送服務。轉送服務然後 「 轉送 」 至內部部署服務,透過專用的雙向通訊端的資料。電子商務應用程式不需要 (及無法建立) 直接連線到內部部署 GIS 服務,甚至不需要知道服務所在的位置,因為在整個通訊只是為了轉送。

只是為了提到的實作解決方案,根據 Azure 轉送另一個好處,轉送功能不同於網路層級整合技術,例如 Vpn 因為它們可以限定在單一電腦上的單一應用程式端點。VPN 技術相較之下,為更干擾,因為它是倚賴改變網路環境。

NuGet 封裝 Microsoft.Azure.Relay 包含 namesake 具有命名空間來管理與 Azure 服務匯流排轉送通訊相關的物件。但是,我們先來先定義 GIS 伺服器所組成:

  • GisObject:用於儲存地理座標 (緯度與經度) 與完整解析的位置位址物件。
  • GisProcess:此程序會維持 GIS 伺服器,透過 Azure 轉送雙向連接,與 GIS 伺服器與電子商務應用程式之間傳輸 GisObject 的執行個體。
  • ServerListener:要做為本身 GIS 伺服器與 Azure 轉送之間的橋樑 GIS 伺服器擴充功能。

雙向連接維護多個步驟:

首先,建立混合式連線用戶端到 Azure 轉送,使用取自 Azure 入口網站的存取安全性金鑰:

var tokenProvider =
  TokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key);
var client = new HybridConnectionClient(
  new Uri($"sb://{relayNamespace}/{connectionName}"), tokenProvider);
var relayConnection = await client.CreateConnectionAsync();

一旦建立連接之後,我會執行兩項非同步工作順序中:第一項工作將使用經度和緯度座標 GisObject 的執行個體傳送至轉送;第二項工作從轉送讀取這個物件。在這兩項工作的結束時,混合式連接已關閉:

await new Task(
  () => SendToRelay(relayConnection, gisObject)
  .ContinueWith(async (t) =>
  {
    GisObject resolved = await ReadFromRelay(relayConnection);
    ShowAddress(resolved);
  })
  .ContinueWith(async (t) =>
    await relayConnection.CloseAsync(CancellationToken.None))
  .Start());

將物件傳送至 Azure 轉送是將訊息寫入資料流。您可以將序列化的物件數種格式。通常這是在 JSON 中:

private async Task SendToRelay(HybridConnectionStream relayConnection,
  GisObject gisObject)
{
  // Write the GIS object to the hybrid connection
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  string message = JsonConvert.SerializeObject(gisObject);
  await writer.WriteAsync(message);
}

同樣地,從 Azure 轉送讀取物件包含從資料流讀取和還原序列化成原始的物件類型的 [取得的字元字串:

private async Task<GisObject> ReadFromRelay(HybridConnectionStream relayConnection)
{
  // Read the GIS object from the hybrid connection
  var reader = new StreamReader(relayConnection);
  string message = await reader.ReadToEndAsync();
  GisObject gisObject = JsonConvert.DeserializeObject<GisObject>(message);
  return gisObject;
}

GIS 伺服器也會接聽通過轉送流量、 讀取內送訊息包含的序列化 GisObject,執行個體位址,並解析位置叫用特定 GIS 服務 (不建議的解決方案說明):

private async Task Listen(HybridConnectionListener listener,
  CancellationTokenSource cts)
{
  // Accept the next available, pending connection request
  HybridConnectionStream relayConnection;
  do
  {
    relayConnection = await listener.AcceptConnectionAsync();
    if (relayConnection != null)
    {
      ProcessMessage(relayConnection, cts);
    }
  } while (relayConnection != null);
}

連接是完全雙向資料流。做為圖 5所示,新增資料流讀取器和資料流寫入器,可讓我讀取 JSON 序列化 GIS 物件,並寫回轉送之後的位置位址解析提供的地理座標。

圖 5 讀取和寫入 JSON 序列化 GIS 物件

private async void ProcessMessage(HybridConnectionStream relayConnection,
  CancellationTokenSource cts)
{
  // Bidirectional streams for reading and writing to the relay
  var reader = new StreamReader(relayConnection);
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  while (!cts.IsCancellationRequested)
  {
    // Read a message in input from the relay
    var message = await reader.ReadToEndAsync();
    // Resolve address by invoking a service on the GIS server
    GisObject gisObject =
      JsonConvert.DeserializeObject<GisObject>(message);
    await new GisServer().ResolveAddressAsync(gisObject);
    // Write the message back to the relay
    message = JsonConvert.SerializeObject(gisObject);
    await writer.WriteLineAsync(message);
  }
  await relayConnection.CloseAsync(cts.Token);
}

相互關聯模式

沒有符合其中一個詳細的需求:我需要分享夥伴組織中的客戶資料。但是,我不想透露未授權存取夥伴的資訊。我需要彼此相互關聯時,才實作系統之間的同步處理資料的方法。

相互關聯模式,著重在交集的兩個資料集,而這兩個系統中有記錄存在時,才執行該範圍的資料集的同步處理。雖然與 GIS 伺服器之間的轉送通訊會建立新的記錄,如果系統中找不到物件,實作相互關聯模式為基礎的資料整合嚴格要求同步至這兩個系統中有相互關聯的記錄會發生問題。這適用於完全我我要與行銷協力電腦,但只有如果他們已經有這項資訊在其本身的系統中共用資料的大小寫。但是沒有清除的挑戰 — 如何識別代表相同的實體 (客戶) 跨系統的相關的記錄?這個條件會定義與外部合作夥伴是否可以同步處理的客戶記錄。

中所示圖 6,電子商務應用程式中的資料相互關聯工作流程會將某些行銷資訊的客戶記錄傳送至 Azure 服務匯流排主題。客戶記錄是彙總來自多個實體的資料。因為這樣會建立服務和資料模型之間的相依性的來源應用程式中不建議為資料傳輸物件 (DTO),使用相同的物件 (資料庫實體)。代理的訊息也都有識別主題的訂用帳戶; 中指定的記錄相互關聯識別碼此相互關聯識別碼將會稍後驗證是否已存在於客戶記錄的協力廠商應用程式很有用。

圖 6 相互關聯類別

public class Correlation
{
  private async Task Execute(CustomerEntity customer)
  {
    // Map the Customer entity in the e-commerce application (source)
    // to Customer record in the partner application (destination)
    CustomerRecord customerRecord = PrepareCustomerRecord(customer);
    // Create a connection to an Azure Service Bus Topic
    // Serialize the customer record and send the message to the Topic
    var client = TopicClient.CreateFromConnectionString(
      connectionString, topicName);
    var message = new BrokeredMessage(
      JsonConvert.SerializeObject(customerRecord));
    // Register the customer record with the Correlation Service
    // and obtain a Correlation ID
    message.Properties["CorrelationId}"] =
      new CorrelationService().RegisterCustomer(customerRecord, subscriptionName);
    await client.SendAsync(message);
  }

相互關聯服務只會公開方法以相符的客戶記錄特定的訂用帳戶,註冊新的客戶,並傳回其相互關聯識別碼:

public class CorrelationService
{
  public Guid RegisterCustomer(CustomerRecord record, string subscription)
  {
    return store.ContainsKey((record, subscription)) ?
      GetCustomerCorrelationId(record, subscription) :
      AddCustomer(record, subscription);
  }
  public bool CustomerExists(Guid correlationId)
  {
    return store.ContainsValue(correlationId);
  }

協力廠商應用程式訂閱該主題,並擷取的客戶記錄和相互關聯識別碼,如中所示圖 7。如果客戶記錄存在於系統中,您可以儲存最後。

圖 7 夥伴類別

class Partner
{
  public void ReceiveCustomerRecord()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessageAsync(async message =>
    {
      CustomerRecord customerRecord =
        JsonConvert.DeserializeObject<CustomerRecord>(message.GetBody<string>());
      Guid correlationId = (Guid)message.Properties["CorrelationId"];
      if (CustomerRecordExists(correlationId))
      {
        await SaveAsync(customerRecord);
      }
    });
  }

整個方案是免費下載我 GitHub 儲存機制從bit.ly/2s0FWow


Stefano Tempesta是 Microsoft MVP mct 規範,並章 CRMUG 瑞士的前置字元。國際會議,包括 Microsoft Ignite、 技術 Summit 和開發人員一週,在一般喇叭 Stefano 的興趣擴充 Office 和 Dynamics 365、 Blockchain 和 AI 相關技術。

非常感謝下列 Microsoft 技術專家檢閱這篇文章:Massimo Bonanni
Massimo Bonanni 是在 Microsoft 的現代化應用程式小組的資深顧問,並努力 20 多年來與 Microsoft 技術。他也是發起成員義大利的使用者群組 DomusDotNet 的 dotNET {播客}。他是 Microsoft MVP,並且現在是 Intel 軟體 Innovator 和 Intel 黑色輸送帶。


MSDN Magazine 論壇中的這篇文章的討論