2018 年 3 月

第 33 卷,第 3 期

此文章由机器翻译

Azure - 使用 Azure 服务总线的企业数据集成模式

通过Stefano Tempesta

中的大数据和机器学习的存留期,能够购买和管理信息至关重要,但这样做因此可以是一项复杂的任务因为数据通常是可能实现比你变得更加复杂。在考虑如何使应用程序与其他 IT 系统通信时,数据交换的有效设计是成功的关键。本文提供了概述和使用 Azure Service Bus 的应用程序之间的数据集成过程的实现。

数据集成设计模式

数据在多个方向流动跨网络、 应用程序和可变或持久存储库。逐记录或分批情况下,通过实时或计划的同步作业通过互相对话的系统,它可能进行交换。尽管的各种数据集成"旅程",则可以确定如何在企业环境中,以要求包含实现高可用性,保证传递和安全的位置中解决它们的常见设计模式。在软件工程的设计模式是最逻辑和经过验证的一系列步骤来解决任务。数据集成的四个最常见设计模式是广播、 聚合、 双向同步和相关。

在本文中,我将介绍每个这些数据集成设计模式,并描述 Azure Service Bus 的上下文中其应用程序。一种方法,利用企业服务总线 (ESB) 的数据集成简化这些模式的实现方式非常有效,只需定义源和目标系统的通信的频率和输入数据格式和输出。每个模式的说明,以及包含代码示例说明了使用 Azure 服务总线通信。

上下文和要求

我将从数据集成到我旅程开始通过定义企业环境中。在这种情况下,我们假设我生成一个电子商务平台,与联机目录的典型需求和购物车,,并可以我将在 Azure 中发布我的应用程序。电子商务应用程序是 IT 系统,有些公共云上公开,有些仍托管在专用数据中心的更大生态系统的一部分。因此,我将在真正混合的上下文中进行操作。数据集成要求包括以下功能:

  1. 到开票系统和社交媒体平台,请从电子商务应用程序传输产品销售信息。
  2. 接收来自的企业资源规划 (ERP) 应用程序,并且从第三方供应商系统产品说明产品可用性信息。
  3. 添加跟踪装运包的位置。
  4. 与伙伴组织的联合的市场营销活动共享客户数据。

可以使用前面所述的设计模式解决所有这些集成要求。让我们进一步了解一下。

广播的模式

对于第一个功能,我需要提取有关产品销售从电子商务应用程序的数据并将其传送到多个目标系统-颁发发票的财务系统和升级的一个或多个社交媒体平台。数据流方向为单向,应用程序中对外部系统。基本上,我将广播与外部世界的信息。

广播的集成模式描述如何为连续的实时或附近实时流中的多个目标系统的应用程序从传输数据。此过程应为事务:如果事务已成功完成后,将会在目标提交数据。如果事务失败,则中止数据传输。它是显而易见,此广播的集成通道必须高度可用且可靠,为了避免丢失在传输过程中的关键数据。作为队列数据包并保证传递目标位置的机制采用 ESB 变得至关重要。

密切实现广播的模式类似于在基于主题和订阅的 Azure 服务总线中实现发布/订阅模式 (bit.ly/2oOmTtM)。主题表示队列的消息接收方应用程序 (订阅服务器) 订阅发送一条消息时接收更新。我的电子商务应用程序发布到主题的消息。ESB 充当消息代理,并可在目标消息传递保证通过将"推送"消息到该目标,仅包含已订阅的收件人。

实质上从电子商务应用程序广播数据包意味着在主题中,发布消息和具有特定的订阅上侦听的目标应用程序。广播的模式将事务属性添加到数据流中,若要取消该事务,如果传递失败的可能性。因为事务跨系统边界,它们可以受益于的"状态机",保留的中转的消息传输之前的所有订阅的应用程序读取, 快照。如果任何订阅服务器无法检索消息时,整个事务中止时,以确保所涉及的所有系统保持一致。

下面的代码广播到 Azure 服务总线主题的消息,并实现的状态机 (bit.ly/29tKRT3) 用于跟踪消息的传递方式:

public class Broadcast
{
  public async Task Execute(Entity entity)
  {
    var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
    var message = new BrokeredMessage(JsonConvert.SerializeObject(entity));
    await client.SendAsync(message);
  }

如果传递的错误,状态机将消息移到 Azure 服务总线中的"死信"队列。此时,消息将不再有效的数据传输,并且将不会进一步处理。

向 Azure Service Bus 中的主题发送消息需要 TopicClient 连接和将包装原始实体,并将它,以异步方式发送到 bus BrokeredMessage。所有必需的对象,用于连接到 Azure 服务总线将采用 WindowsAzure.ServiceBus NuGet 包分发,并且在 Microsoft.ServiceBus.Messaging 命名空间中可用。

状态机是一个包含事务计数器按主题的单一实例异步字典。字典保留的活动事务数计数-订阅服务器-,正在等待的消息从 Service Bus 特定主题。字典是线程安全的以允许的并发请求:

private static StateMachine _instance;
public static StateMachine Current => _instance ?? (_instance = new StateMachine());
protected ConcurrentDictionary<string, int> transactions =
  new ConcurrentDictionary<string, int>();

中所示图 1,订阅服务器应用程序,读取一条消息服务总线主题中的状态机上的特定主题开始新事务 (使用 BeginTransactionAsync 方法),然后处理OnMessage 事件以获取实体的副本。然后内部; 处理实体例如,它可能由收件人系统保留。发生错误,事务都会被取消。

图 1 从服务总线主题读取消息

public async Task ReadMessageAsync()
{
  await StateMachine.Current.BeginTransactionAsync(topicName);
  var client = SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName,
    subscriptionName);
  client.OnMessageAsync(async message =>
  {
    var entity = JsonConvert.DeserializeObject(message.GetBody<string>());
    try
    {
      Save(entity);
      await StateMachine.Current.SuccessAsync(message , topicName);
    }
    catch
    {
      await StateMachine.Current.CancelAsync(message , topicName);
    }
  });
}

完成或中止的事务由使用两种方法之一的状态机-SuccessAsync 或 CancelAsync。SuccessAsync 调用 CompleteAsync 中转的消息,指示应将消息标记为处理并最终删除从主题。仅当完成所有并发活动的事务时,这会采用位置:

public async Task<bool> SuccessAsync(BrokeredMessage message, string topicName)
{
  bool done = await EndTransactionAsync(topicName);
  int count = Current.transactions[topicName];
  // All concurrent transactions are done
  if (done && count == 0)
  {
     await message.CompleteAsync();
  }
  return done;
}

CancelAsync,与此相反,中止消息广播通过重置主题的事务计数器。通过调用 DeadLetterAsync 方法,中转的消息然后将移动到"死信"队列中,未成功处理的消息的存储位置:

public async Task<bool> CancelAsync(BrokeredMessage message, string topicName)
{
  // Cancel the message broadcast -> Remove all concurrent transactions
  int count = Current.transactions[topicName];
  bool done = Current.transactions.TryUpdate(topicName, 0, count);
  if (done)
  {
    await message.DeadLetterAsync();
  }
  return done;
}

聚合模式

我的电子商务平台的第二个要求是共享来自外部系统的产品有关的信息并将它们整合到 Web 门户。相反,广播模式在这种情况下是数据流的方向。现在需要的聚合数据从各种源到一个位置。简单的方法是将从导入数据直接、 点到点到电子商务应用程序的每个源。但这不是可伸缩的解决方案,它可以看出生成将数据发送到目标存储库的每个外部系统的不同连接。相反,通过聚合 ESB 通过一个进程中的数据,我不需要用于多个单向集成,并且缓解数据准确性和一致性方面的顾虑,在原子事务中处理的数据。

内容中出现,不过,是将数据合并到单个实体,而不必复制信息,也更糟的,损坏的挑战。通常是实现自定义合并逻辑集成过程,以便跟踪聚合的记录源自不同系统和将其存储在应用程序中的一个或多个实体的一部分所必需的。我需要用于将不同的源数据库中的记录 Id 与目标数据库中的实体 ID 相关联的映射表。此映射表通常是保留在高事务数据库中,并在聚合过程中更新的数据合并自定义逻辑。

与广播模式中,此集成模式的实现还反映发布/订阅的 Azure 服务总线主题,不同的在这种情况下,我的电子商务应用程序接收数据 (订阅,目标系统到主题) 通过 ESB 其他从源系统。总体解决方案还需要使用一些数据合并逻辑和数据映射跟踪源记录 Id 和目标位置的实体 ID。

在中可以看到图 2,从主题读取消息组成创建订阅连接 (SubscriptionClient),并处理由订阅客户端提供一条新消息时引发的 OnMessage 事件在主题。收到的消息包含一个对象,发送应用程序外部,假设供应商发送产品详细信息。此对象然后映射到实体在我系统中,使用实体映射,并将我的数据库中已存在该实体,如果其更新;否则,我创建一个新。

图 2 从主题读取消息

public class Aggregation
{
  public void Execute()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessage(message => {
      ProductEntity product =
        EntityMap.Instance.MapToEntity<ProductEntity>(message);
      // Persist the product
      var exists = Find(product.Id) != null;
      if (exists)
        Update(product);
      else
        Create(product);
      });
  }

映射过程中,实体映射类中实现包括两个重要步骤:

  1. 在我的数据库中创建的外部系统中的对象和实体之间的映射。此映射通过具有主键值配对 (例如"ERP,""Vendor1,""Vendor2") 的系统名称标识的外部对象。映射的实体由其类型在我的应用程序 (产品、 客户、 顺序) 和其标识。 标识
  2. 它将生成一个使用外部对象的属性的实体记录。这是自定义合并逻辑,这可以是简单地使用 AutoMapper (automapper.org) 类似的库对象映射。

中所示图 3,对象实体映射是将相关联的系统名称主密钥对实体类型实体 id 对应到一个字典。外部系统中的对象是通过的系统名称和主密钥,组合唯一标识,而我的应用程序中的实体由组合实体类型和实体 id。

图 3 的对象实体映射

系统名称 主键 实体类型 实体 Id
ERP ABC012345 产品 FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
供应商 1 1000 产品 FAE04EC0-301F-11D3-BF4B-00C04F79EFBC
ERP ABD987655 产品 2110F684-C277-47E7-B8B9-6F17A579D0CE
供应商 2 1001 产品 2110F684-C277-47E7-B8B9-6F17A579D0CE

中所示,由从中转的消息的属性中检索系统名称和 primary key,然后生成一个实体具有 AutoMapper,填充地图图 4

图 4 对地图填充

public T MapToEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  string systemName = message.Properties["SystemName"] as string;
  string primaryKey = message.Properties["PrimaryKey"] as string;
  T entity = BuildEntity<T>(message);
  map.Add((systemName, primaryKey), (entity.GetType(), entity.Id));
  return entity;
}
private T BuildEntity<T>(BrokeredMessage message) where T : Entity, new()
{
  var source = JsonConvert.DeserializeObject(message.GetBody<string>());
  T entity = Mapper.Map<T>(source);
  return entity;
}

中转的消息可以增加了任何附加的属性,发布服务器应用程序应在将消息发送到 ESB 前设置:

public async Task SendMessageToServiceBus(object obj)
{
  var client = TopicClient.CreateFromConnectionString(connectionString, topicName);
  var message = new BrokeredMessage(JsonConvert.SerializeObject(obj));
  message.Properties["SystemName"] = "Publisher System Name";
  message.Properties["PrimaryKey"] = "Object Primary Key";
  await client.SendAsync(message);
}

双向同步模式

让我们看一下第三个要求的现在: 添加到跟踪的位置附带包。在更通用术语中,我想要增加的实体具有提供的专用化程度的外部应用程序的其他属性的属性。因此,此模式有时称为扩充模式。

在双向同步过程中与第三方业务线应用程序所涉及的系统可以扩展到其边界之外其功能。例如,Dynamics 365 是客户关系管理平台 (以及更多),以本机模式与 SharePoint 集成以实现集成企业文档管理。Dynamics 365 仍保持所有记录的主数据源但是文档存储在 SharePoint 作为实体的扩展 CRM 系统中。基本上,双向同步两个系统后,它们将作为一个系统同时仍然保留其自己数据集以及,显然,功能。数据分布在两个系统,但被视为单个实体中通过这种无缝集成。

大多数情况下,以及使用 Dynamics 365 和 SharePoint,此直接和实时的同步未实现用于服务总线。点对点连接器通常存在知道如何与具有最小配置的任一系统通信。但不在应用程序之间的本机连接器时,会发生什么情况?除了构建自定义的连接器,这可能不是工作量的一项重要任务在所有 (只是工作量的表示需要了解身份验证和 API 调用,以及提供高可用性和典型的 ESB 传递保证),你可以执行的操作是工作量的实现中继。

Azure Service Bus 中继 (bit.ly/2BNTBih) 是通过安全连接到系统无法访问从公有云便于在混合配置中的系统之间的通信的服务总线的扩展。假设我所连接到 GIS 系统承载在专用的企业网络。由本地服务启动的数据传输到通过出站端口,中继连接创建双向套接字用于绑定到特定集合地址的通信。在 Azure 中托管的电子商务应用程序,然后可以将消息发送到抵达会合地址的中继服务与在防火墙后面的 GIS 服务通信。中继服务然后"中继"到本地服务通过专用的双向套接字的数据。电子商务应用程序不需要 (以及无法建立) 直接连接到本地 GIS 服务,并甚至不需要了解服务所在的位置的整个通信只会应用到该中继。

只需要提到的实现基于 Azure 中继的解决方案的另一个优点,中继功能不同于网络级别集成技术,如 Vpn 因为它们可以作用于一台计算机上的单个应用程序终结点。VPN 技术,与此相反,是更具有侵入性依赖于更改网络环境。

NuGet 程序包 Microsoft.Azure.Relay 包含用于管理与 Azure Service Bus 中继之间的通信的相关对象的这些命名空间。但首先,让我们首先定义 GIS 服务器,其中包括:

  • GisObject:用于存储地理坐标 (纬度和经度) 和完全解析的位置地址的对象。
  • GisProcess:一个过程以维护与 GIS 服务器上,通过 Azure 中继的双向连接并 GIS 服务器和电子商务应用程序之间传输的 GisObject 实例。
  • ServerListener:对作为 GIS 服务器本身和 Azure 中继之间的桥梁的 GIS 服务器的扩展。

双向连接维护在多个步骤:

首先,我创建混合连接客户端到 Azure 中继,使用从 Azure 门户获得的访问安全密钥:

var tokenProvider =
  TokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key);
var client = new HybridConnectionClient(
  new Uri($"sb://{relayNamespace}/{connectionName}"), tokenProvider);
var relayConnection = await client.CreateConnectionAsync();

一旦建立连接时,在序列中运行两个异步任务:第一个任务将具有纬度和经度坐标的 GisObject 实例发送到中继;第二个任务重新从中继读取此对象。在这两项任务结束时,关闭混合连接:

await new Task(
  () => SendToRelay(relayConnection, gisObject)
  .ContinueWith(async (t) =>
  {
    GisObject resolved = await ReadFromRelay(relayConnection);
    ShowAddress(resolved);
  })
  .ContinueWith(async (t) =>
    await relayConnection.CloseAsync(CancellationToken.None))
  .Start());

将对象发送到 Azure 中继是一种向流中写入一条消息。可以在几种格式; 将对象序列化通常,这就是在 JSON 中完成:

private async Task SendToRelay(HybridConnectionStream relayConnection,
  GisObject gisObject)
{
  // Write the GIS object to the hybrid connection
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  string message = JsonConvert.SerializeObject(gisObject);
  await writer.WriteAsync(message);
}

同样,从 Azure 中继读取对象包括从流中读取和反序列的字符获取的字符串化到原始对象类型:

private async Task<GisObject> ReadFromRelay(HybridConnectionStream relayConnection)
{
  // Read the GIS object from the hybrid connection
  var reader = new StreamReader(relayConnection);
  string message = await reader.ReadToEndAsync();
  GisObject gisObject = JsonConvert.DeserializeObject<GisObject>(message);
  return gisObject;
}

GIS 服务器还于流量通过中继侦听、 读取传入消息包含实例的序列化 GisObject,,和通过调用特定 GIS 服务 (不建议的解决方案中所述) 来解析位置地址:

private async Task Listen(HybridConnectionListener listener,
  CancellationTokenSource cts)
{
  // Accept the next available, pending connection request
  HybridConnectionStream relayConnection;
  do
  {
    relayConnection = await listener.AcceptConnectionAsync();
    if (relayConnection != null)
    {
      ProcessMessage(relayConnection, cts);
    }
  } while (relayConnection != null);
}

连接是一个完全双向流。作为图 5所示,我流读取器和一个流写入器向其添加,这样我以读取 JSON 序列化 GIS 对象并写回到中继到位置地址解析提供的地理坐标后。

图 5 读取和写入 JSON 序列化 GIS 对象

private async void ProcessMessage(HybridConnectionStream relayConnection,
  CancellationTokenSource cts)
{
  // Bidirectional streams for reading and writing to the relay
  var reader = new StreamReader(relayConnection);
  var writer = new StreamWriter(relayConnection) { AutoFlush = true };
  while (!cts.IsCancellationRequested)
  {
    // Read a message in input from the relay
    var message = await reader.ReadToEndAsync();
    // Resolve address by invoking a service on the GIS server
    GisObject gisObject =
      JsonConvert.DeserializeObject<GisObject>(message);
    await new GisServer().ResolveAddressAsync(gisObject);
    // Write the message back to the relay
    message = JsonConvert.SerializeObject(gisObject);
    await writer.WriteLineAsync(message);
  }
  await relayConnection.CloseAsync(cts.Token);
}

相关模式

还有一个详细要求,以满足:我需要与合作伙伴组织共享客户数据。但我不想披露合作伙伴的信息未授权访问。我需要实现在系统之间同步数据的方法,仅当它们在与彼此相关。

相关模式中,主要关注的两个数据集的交集和执行的同步。 该作用域的数据集,仅当这两个系统中存在的记录。虽然与 GIS 服务器的中继通信将创建一条新记录,如果在系统中找不到对象,实现基于相关的模式的数据集成严格需要同步到这两个系统中存在相关的记录发生这种情况。这完全适用于我用例,我想要与市场营销合作伙伴,但仅当它们已将此信息在其自己的系统中共享数据。但没有清除质询-如何标识跨多个系统中表示相同的实体 (客户) 的相关的记录?此条件定义是否可以与外部合作伙伴同步客户记录。

中所示图 6,电子商务应用程序中的数据相关工作流将一些市场营销信息的客户记录发送到 Azure 服务总线主题。客户记录是从多个实体的数据的聚合。不建议你使用相同的对象 (数据库实体) 作为数据传输对象 (DTO),否则会在源应用程序中创建服务和数据模型之间的依赖项。使用标识主题订阅; 中的特定记录相关 ID 也修饰中转的消息此相关 ID 将更高版本在验证是否已存在的客户记录的合作伙伴应用程序中有用。

图 6 相关类

public class Correlation
{
  private async Task Execute(CustomerEntity customer)
  {
    // Map the Customer entity in the e-commerce application (source)
    // to Customer record in the partner application (destination)
    CustomerRecord customerRecord = PrepareCustomerRecord(customer);
    // Create a connection to an Azure Service Bus Topic
    // Serialize the customer record and send the message to the Topic
    var client = TopicClient.CreateFromConnectionString(
      connectionString, topicName);
    var message = new BrokeredMessage(
      JsonConvert.SerializeObject(customerRecord));
    // Register the customer record with the Correlation Service
    // and obtain a Correlation ID
    message.Properties["CorrelationId}"] =
      new CorrelationService().RegisterCustomer(customerRecord, subscriptionName);
    await client.SendAsync(message);
  }

相关服务只公开方法以匹配的特定订阅上的客户记录并注册新客户,并返回其相关 ID:

public class CorrelationService
{
  public Guid RegisterCustomer(CustomerRecord record, string subscription)
  {
    return store.ContainsKey((record, subscription)) ?
      GetCustomerCorrelationId(record, subscription) :
      AddCustomer(record, subscription);
  }
  public bool CustomerExists(Guid correlationId)
  {
    return store.ContainsValue(correlationId);
  }

合作伙伴应用程序对该主题订阅和检索客户记录和相关 ID,如所示图 7。如果其系统中存在的客户记录,它可以最终将进行保存。

图 7 合作伙伴类

class Partner
{
  public void ReceiveCustomerRecord()
  {
    var client = SubscriptionClient.CreateFromConnectionString(
      connectionString, topicName, subscriptionName);
    client.OnMessageAsync(async message =>
    {
      CustomerRecord customerRecord =
        JsonConvert.DeserializeObject<CustomerRecord>(message.GetBody<string>());
      Guid correlationId = (Guid)message.Properties["CorrelationId"];
      if (CustomerRecordExists(correlationId))
      {
        await SaveAsync(customerRecord);
      }
    });
  }

整个解决方案可免费从在我 GitHub 存储库下载bit.ly/2s0FWow


Stefano Tempesta是 CRMUG 瑞士 Microsoft MVP 和 MCT 和章领导者。在国际有关参加会议,包括 Microsoft Ignite、 技术峰和开发人员周,正则演讲 Stefano 的兴趣扩展到 Office 和 Dynamics 365、 Blockchain 和 AI 相关技术。

衷心感谢以下 Microsoft 技术专家对本文的审阅:Massimo Bonanni
Massimo Bonanni 是 Microsoft 的现代应用程序组中的高级顾问,并与 Microsoft 技术的 20 多年可以正常运行。他是创始成员意大利语用户组 DomusDotNet 和 dotNET {播客}。他是 Microsoft 最有价值,现在是一种软件创新,Intel 和 Intel 黑带。


在 MSDN 杂志论坛讨论这篇文章