共用方式為


Azure 事件中樞 JavaScript 的用戶端連結庫 - 5.12.0 版

Azure 事件中樞 是可高度調整的發佈訂閱服務,每秒可以擷取數百萬個事件,並將其串流處理至多個取用者。 這可讓您處理和分析連線裝置和應用程式所產生的大量數據。 如果您想要深入瞭解 Azure 事件中樞,您可能會想要檢閱:什麼是事件中樞

Azure 事件中樞 客戶端連結庫可讓您在 Node.js 應用程式中傳送和接收事件。

重要連結:

注意:如果您使用 2.1.0 版或更低版本,而且想要移轉至此套件的最新版本,請參閱我們的 移轉指南,以從 EventHubs V2 移至 EventHubs V5

v2 和文件的範例仍可在這裡取得:

v2.1.0 | 的原始程式碼v2.1.0 (npm) | 套件v2.1.0 的範例

開始使用

安裝套件

使用 npm 安裝 Azure 事件中樞 客戶端連結庫

npm install @azure/event-hubs

目前支援的環境

如需詳細資訊,請參閱我們的支援原則

必要條件

設定 TypeScript

TypeScript 用戶必須安裝節點類型定義:

npm install @types/node

您也需要在tsconfig.json中開啟 compilerOptions.allowSyntheticDefaultImports 。 請注意,如果您已啟用 compilerOptions.esModuleInteropallowSyntheticDefaultImports 預設會啟用 。 如需詳細資訊,請參閱 TypeScript 的編譯程式選項手冊

JavaScript 套件組合

若要在瀏覽器中使用此用戶端連結庫,您必須先使用套件組合器。 如需如何執行這項操作的詳細資訊,請參閱我們的 統合檔

除了在此描述的內容之外,此連結庫還需要下列 NodeJS 核心內建模組的額外聚合填滿,才能在瀏覽器中正常運作:

  • buffer
  • os
  • path
  • process

使用 Webpack 統合

如果您使用 Webpack v5,您可以安裝下列開發人員相依性

  • npm install --save-dev os-browserify path-browserify

然後將下列內容新增至您的 webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

使用匯總組合

如果您使用匯總套件組合器,請安裝下列開發人員相依性

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

然後在您的 rollup.config.js 中包含下列內容

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

如需使用 polyfill 的詳細資訊,請參閱您最愛的配套工具檔。

React Native支援

與瀏覽器類似,React Native 不支援此 SDK 連結庫所使用的一些 JavaScript API,因此您必須為其提供聚合填滿。 如需詳細資訊,請參閱使用 Expo 的傳訊 React Native 範例

驗證用戶端

與事件中樞的互動會從 EventHubConsumerClient 類別的實例或 EventHubProducerClient 類別的實例開始。 有建構函式多載可支援具現化這些類別的不同方式,如下所示:

針對事件中樞命名空間使用 連接字串

其中一個建構函式多載會將表單Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;和實體名稱 連接字串 至事件中樞實例。 您可以建立取用者群組,並從 Azure 入口網站 取得 連接字串 和實體名稱。

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

針對事件中樞的原則使用 連接字串

另一個建構函式多載會採用對應至直接在事件中樞實例上定義的共用存取原則 (連接字串,而不是事件中樞命名空間) 。 此 連接字串 格式為 Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name。 前任建構函式多載 連接字串 格式的主要差異是 ;EntityPath=my-event-hub-name

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

使用事件中樞命名空間和 Azure 身分識別

此建構函式多載會採用事件中樞實例的主機名和實體名稱,以及實作 TokenCredential 介面的認證。 這可讓您使用 Azure Active Directory 主體進行驗證。 TokenCredential@azure /身分識別套件中有可用的介面實作。 主機名稱的格式為 <yournamespace>.servicebus.windows.net。 使用 Azure Active Directory 時,您的主體必須獲指派允許存取事件中樞的角色,例如 Azure 事件中樞 數據擁有者角色。 如需搭配事件中樞使用 Azure Active Directory 授權的詳細資訊,請參閱 相關聯的檔

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

重要概念

  • 事件中樞產生者是遙測數據、診斷資訊、使用記錄或其他記錄數據的來源,作為內嵌裝置解決方案、行動裝置應用程式、在控制台或其他裝置上執行的遊戲標題、某些用戶端或伺服器型商務解決方案或網站。

  • 事件中樞取用者會從事件中樞挑選這類資訊並加以處理。 處理時可能涉及彙總、複雜的計算和篩選。 處理也可能涉及以未經處理的方式或經過轉換的方式散發或儲存資訊。 事件中樞取用者通常是強固且大規模的平台基礎結構組件,且具有內建分析功能,例如 Azure 串流分析、Apache Spark 或 Apache Storm。

  • 資料分割是經過排序且保存在事件中樞內的事件序列。 資料分割是與事件取用者所需平行處理原則相關聯的資料組織方式。 Azure 事件中樞透過分割取用者模式來提供訊息串流,每位取用者只會讀取訊息串流的特定子集 (即資料分割)。 當較新的事件送達時,系統會將它們加入序列的結尾。 建立事件中樞時會指定資料分割數目,且無法加以變更。

  • 取用者群組可讓您檢視整個事件中樞。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調和從自己的位置自行讀取串流。 每一取用者群組的一個資料分割上最多可以有 5 個並行讀取器;不過,建議給定的資料分割和取用者群組配對只有一個作用中的取用者。 每個作用中讀取器都會從其分割區接收所有事件;如果相同分割區上有多個讀取器,它們將會收到重複的事件。

如需更多概念和更深入的討論,請參閱: 事件中樞功能

重試的指引

EventHubConsumerClientEventHubProducerClient 接受 options ,您可以在其中設定 retryOptions ,讓您微調 SDK 處理暫時性錯誤的方式。 暫時性錯誤的範例包括暫時性網路或服務問題。

取用事件時重試

如果暫時性錯誤 (例如,SDK 接收事件時遇到暫時性網路問題) ,它會根據傳入 EventHubConsumerClient的重試選項重試接收事件。 如果重試嘗試次數上限已用盡,則會叫用函 processError 式。

您可以使用重試設定來控制有關暫時性問題的通知速度,例如網路連線問題。 例如,如果您需要知道何時有網路問題,您可以降低 和 retryDelayInMs的值maxRetries

執行函 processError 式之後,只要錯誤是可重試的事件,用戶端就會繼續從分割區接收事件。 否則,用戶端會叫用使用者提供的 processClose 函式。 當您停止訂用帳戶,或客戶端因為應用程式的另一個實例在負載平衡時,停止讀取目前分割區中的事件時,也會叫用此函式。

processClose 式可讓您視需要更新檢查點。 執行 processClose之後,用戶端 (或在負載平衡的情況下,來自另一個應用程式實例的用戶端) 會叫用使用者提供的 processInitialize 函式,以繼續從相同分割區的上次更新檢查點讀取事件。

如果您要停止嘗試讀取事件,您必須在 方法傳回的 subscribesubscription呼叫 close()

範例

下列各節提供代碼段,這些代碼段涵蓋使用 Azure 事件中樞的一些常見工作

检查事件中心

许多事件中心操作都在特定分区范围内进行。 由於分割區是由事件中樞所擁有,因此會在建立時指派其名稱。 若要瞭解可用的分割區,您可以使用兩個可用的其中一個用戶端來查詢事件中樞: EventHubProducerClientEventHubConsumerClient

在下列範例中,我們使用 EventHubProducerClient

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

将事件发布到事件中心

為了發佈事件,您將必須建立 EventHubProducerClient。 雖然下列範例顯示建立用戶端的其中一種方式,請參閱 驗證客戶端 一節,以瞭解如何具現化用戶端的其他方式。

您可以將事件發佈至特定分割區,或允許事件中樞服務決定應該發行至哪個分割區事件。 建議在發佈事件需要高度可用時,或事件數據應該平均分散到分割區時,使用自動路由。 在下列範例中,我們將利用自動路由。

  • 使用 createBatch Create EventDataBatch 物件
  • 使用 tryAdd 方法將事件新增至批次。 您可以執行此動作,直到達到最大批次大小限制,或直到您完成新增您想要的事件數目為止,不論第一個事件為何。 這個方法會傳回 false ,指出由於達到最大批次大小,無法再將事件新增至批次。
  • 使用 sendBatch 方法傳送事件的批次。

在下列範例中,我們嘗試將10個事件傳送至 Azure 事件中樞。

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

您可以選擇在不同的階段傳遞,以控制將事件傳送至 Azure 事件中樞 的程式。

  • EventHubProducerClient 構函式會採用類型的 EventHubClientOptions 選擇性參數,可用來指定重試次數之類的選項。
  • 方法 createBatch 會採用類型的 CreateBatchOptions 選擇性參數,可用來指定所建立批次所支援的批次大小上限。
  • 方法 sendBatch 會採用型 SendBatchOptions 別的選擇性參數,您可以用來指定 abortSignal 來取消目前的作業。
  • 如果您想要傳送至特定數據分割,方法的多 sendBatch 載可讓您傳遞要傳送事件之分割區的標識符。 上述 檢查事件中樞 範例示範如何擷取可用的分割區標識碼。

注意:使用 Azure Stream Analytics 時,所傳送事件的本文也應該是 JSON 物件。 例如:body: { "message": "Hello World" }

從事件中樞取用事件

若要從事件中樞實例取用事件,您也必須知道您想要鎖定的取用者群組。 一旦您知道這一點,您就可以開始建立 EventHubConsumerClient。 雖然下列範例顯示建立用戶端的其中一種方式,請參閱 驗證客戶端 一節,以瞭解如何具現化用戶端的其他方式。

subscribe用戶端上的 方法具有多載,與建構函式結合,可因應數種方式來取用事件:

方法 subscribe 會採用類型的 SubscriptionOptions 選擇性參數,您可以用來指定 maxBatchSize 之類的選項 (要等候) 和 maxWaitTimeInSeconds (等候 maxBatchSize 事件到達) 的時間量。

在單一進程中取用事件

首先建立的 EventHubConsumerClient實例,然後在其上呼叫 subscribe() 方法來開始取用事件。

方法subscribe會接受回呼來處理事件,因為它們是從 Azure 事件中樞 接收。 若要停止接收事件,您可以在 方法所subscribe()傳回的物件上呼叫 close()

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

在多個進程之間使用負載平衡的事件

Azure 事件中樞 每秒可以處理數百萬個事件。 若要調整處理應用程式,您可以執行應用程式的多個實例,並讓它自行平衡負載。

首先,使用採用 CheckpointStore的其中一個建構函式多載來建立 的EventHubConsumerClient實例,然後呼叫 subscribe() 方法來開始取用事件。 檢查點存放區可讓取用者群組內的訂閱者協調應用程式多個實例之間的處理。

在此範例中,我們將使用BlobCheckpointStore封裝中的 @azure/eventhubs-checkpointstore-blob ,其會使用 Azure Blob 儲存體,實作必要的讀取/寫入至長期存放區。

方法subscribe會接受回呼來處理事件,因為它們是從 Azure 事件中樞 接收。 若要停止接收事件,您可以在 方法所subscribe()傳回的物件上呼叫 close()

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

如需詳細資訊,請參閱 平衡應用程式多個實例的數據分割負載

從單一分割區取用事件

首先建立的 EventHubConsumerClient實例,然後在其上呼叫 subscribe() 方法來開始取用事件。 將您想要設為目標的分割區標識碼傳遞至 subscribe() 方法,以便只從該分割區取用。

在下列範例中,我們使用第一個數據分割。

方法subscribe會接受回呼來處理事件,因為它們是從 Azure 事件中樞 接收。 若要停止接收事件,您可以在 方法所subscribe()傳回的物件上呼叫 close()

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

使用 EventHubConsumerClient 來處理 IotHub

您也可以使用 EventHubConsumerClient 來使用 IotHub。 這適用於從連結的 EventHub 接收 IotHub 的遙測數據。 相關聯的 連接字串 不會有傳送宣告,因此無法傳送事件。

  • 請注意,連接字串 必須是事件中樞兼容端點 (例如 “Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name“)
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

疑難排解

AMQP 相依性

事件中樞連結庫取決於 rhea-promise 連結庫來管理連線、透過 AMQP 通訊協定傳送和接收事件。

記錄

您可以將環境變數設定 AZURE_LOG_LEVEL 為 ,以啟用記錄至 stderr

export AZURE_LOG_LEVEL=verbose

如需如何啟用記錄的詳細指示,可參閱 @azure/logger 套件文件

您也可以設定 DEBUG 環境變數,以在使用這個連結庫時取得記錄。 如果您也想要從相依性 rhea-promise 發出記錄, rhea 這也很有用。

注意: 如果設定,AZURE_LOG_LEVEL優先順序高於 DEBUG。 當同時指定AZURE_LOG_LEVEL或呼叫 setLogLevel 時,請勿透過 DEBUG 指定任何 azure 連結庫。

  • 只從事件中樞 SDK 取得資訊層級偵錯記錄。
export DEBUG=azure:*:info
  • 從事件中樞 SDK 和通訊協定層級連結庫取得偵錯記錄。
export DEBUG=azure*,rhea*
  • 如果您 不想要檢視原始事件數據 (耗用大量控制台/磁碟空間) ,您可以如下所示設定 DEBUG 環境變數:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • 如果您只對 錯誤 和 SDK 警告感興趣,則可以設定 DEBUG 環境變數,如下所示:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

下一步

更多的程式碼範例

如需如何使用此連結庫來傳送和接收事件中樞事件的詳細範例,請參閱範例目錄。

參與

如果您希望向此程式庫投稿,請參閱投稿指南,深入瞭解如何組建與測試程式碼。

曝光數