Partilhar via


Hubs de Eventos do Azure biblioteca de cliente para JavaScript - versão 5.12.0

Hubs de Eventos do Azure é um serviço de publicação-subscrição altamente dimensionável que pode ingerir milhões de eventos por segundo e transmiti-los em fluxo para vários consumidores. Isto permite-lhe processar e analisar grandes quantidades de dados produzidos pelos seus dispositivos e aplicações ligados. Se quiser saber mais sobre Hubs de Eventos do Azure, poderá querer rever: O que são os Hubs de Eventos?

A biblioteca de cliente Hubs de Eventos do Azure permite-lhe enviar e receber eventos na sua aplicação Node.js.

Ligações principais:

NOTA: se estiver a utilizar a versão 2.1.0 ou inferior e quiser migrar para a versão mais recente deste pacote, veja o nosso guia de migração para mudar do EventHubs V2 para o EventHubs V5

Os exemplos para v2 e documentação ainda estão disponíveis aqui:

Código fonte para v2.1.0 | Pacote para v2.1.0 (npm) | Exemplos para v2.1.0

Introdução

Instalar o pacote

Instalar a biblioteca de cliente do Hubs de Eventos do Azure com o npm

npm install @azure/event-hubs

Ambientes atualmente suportados

Veja a nossa política de suporte para obter mais detalhes.

Pré-requisitos

Configurar TypeScript

Os utilizadores do TypeScript precisam de ter definições de tipo de Nó instaladas:

npm install @types/node

Também tem de ativar compilerOptions.allowSyntheticDefaultImports no seu tsconfig.json. Tenha em atenção que, se tiver ativado compilerOptions.esModuleInterop, allowSyntheticDefaultImports está ativado por predefinição. Veja o manual de opções do compilador do TypeScript para obter mais informações.

Pacote JavaScript

Para utilizar esta biblioteca de cliente no browser, primeiro tem de utilizar um bundler. Para obter detalhes sobre como fazê-lo, veja a nossa documentação de agrupamento.

Além do que é descrito, esta biblioteca também precisa de polifills adicionais para os seguintes módulos incorporados no nodeJS core para funcionar corretamente nos browsers:

  • buffer
  • os
  • path
  • process

Agrupamento com Webpack

Se estiver a utilizar o Webpack v5, pode instalar as seguintes dependências de programador

  • npm install --save-dev os-browserify path-browserify

em seguida, adicione o seguinte à sua webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Agrupamento com Rollup

Se estiver a utilizar o Pacote de rollup, instale as seguintes dependências de programador

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Em seguida, inclua o seguinte no seu rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Consulte a documentação do seu bundler favorito para obter mais informações sobre como utilizar polifills.

Suporte do React Native

Semelhante aos browsers, React Native não suporta alguma API JavaScript utilizada por esta biblioteca do SDK, pelo que tem de fornecer polifills para os mesmos. Veja o exemplo React Native mensagens com a Expo para obter mais detalhes.

Autenticar o cliente

A interação com os Hubs de Eventos começa com uma instância da classe EventHubConsumerClient ou uma instância da classe EventHubProducerClient . Existem sobrecargas de construtores para suportar diferentes formas de instanciar estas classes, conforme mostrado abaixo:

Utilizar cadeia de ligação para o espaço de nomes dos Hubs de Eventos

Uma das sobrecargas do construtor leva uma cadeia de ligação do nome do formulário Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; e da entidade para a instância do Hub de Eventos. Pode criar um grupo de consumidores e obter o cadeia de ligação, bem como o nome da entidade do portal do Azure.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Utilizar cadeia de ligação para a política no Hub de Eventos

Outra sobrecarga do construtor utiliza a cadeia de ligação correspondente à política de acesso partilhado que definiu diretamente na instância do Hub de Eventos (e não no espaço de nomes dos Hubs de Eventos). Este cadeia de ligação será do formato Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. A principal diferença no formato cadeia de ligação da sobrecarga do construtor anterior é o ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Utilizar o espaço de nomes dos Hubs de Eventos e a Identidade do Azure

Esta sobrecarga do construtor utiliza o nome do anfitrião e o nome da entidade da instância e da credencial do Hub de Eventos que implementa a interface TokenCredential. Isto permite-lhe autenticar com um principal do Azure Active Directory. Existem implementações da TokenCredential interface disponíveis no pacote de @azure/identidade . O nome do anfitrião tem o formato <yournamespace>.servicebus.windows.net. Ao utilizar o Azure Active Directory, tem de ser atribuída uma função ao principal que permita o acesso aos Hubs de Eventos, como a função de Proprietário de Dados Hubs de Eventos do Azure. Para obter mais informações sobre como utilizar a autorização do Azure Active Directory com os Hubs de Eventos, veja a documentação associada.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Conceitos-chave

  • Um produtor do Hub de Eventos é uma origem de dados telemétricos, informações de diagnóstico, registos de utilização ou outros dados de registo, como parte de uma solução de dispositivo incorporado, uma aplicação de dispositivo móvel, um título de jogo em execução numa consola ou outro dispositivo, alguma solução empresarial baseada em cliente ou servidor ou um site.

  • Um consumidor do Hub de Eventos recolhe essas informações do Hub de Eventos e processa-as. O processamento pode envolver agregação, computação e filtragem complexas. O processamento também pode envolver a distribuição ou o armazenamento das informações de forma não processada ou transformada. Os consumidores do Hub de Eventos são, muitas vezes, peças de infraestrutura de plataforma robustas e de grande escala com capacidades de análise incorporadas, como o Azure Stream Analytics, o Apache Spark ou o Apache Storm.

  • Uma partição é uma sequência ordenada de eventos que é realizada num Hub de Eventos. As partições são um meio de organização de dados associada ao paralelismo exigido pelos consumidores de eventos. Hubs de Eventos do Azure fornece a transmissão de mensagens através de um padrão de consumidor particionado no qual cada consumidor lê apenas um subconjunto específico, ou partição, do fluxo de mensagens. À medida que chegam novos eventos, eles são adicionados ao final desta sequência. O número de partições é especificado no momento em que um Hub de Eventos é criado e não pode ser alterado.

  • Um grupo de consumidores é uma vista de um Hub de Eventos inteiro. Os grupos de consumidores permitem que cada uma tenha uma vista separada do fluxo de eventos e leia o fluxo de forma independente ao seu próprio ritmo e a partir da sua própria posição. Pode haver, no máximo, 5 leitores simultâneos numa partição por grupo de consumidores; no entanto, recomenda-se que exista apenas um consumidor ativo para uma determinada criação de partições e emparelhamento de grupos de consumidores. Cada leitor ativo recebe todos os eventos da sua partição; Se existirem vários leitores na mesma partição, receberão eventos duplicados.

Para obter mais conceitos e debates mais aprofundados, veja: Funcionalidades dos Hubs de Eventos

Documentação de orientação sobre repetições

E EventHubConsumerClient aceite options onde pode definir o retryOptions que lhe permite ajustar a forma como o SDK processa erros EventHubProducerClient transitórios. Exemplos de erros transitórios incluem problemas temporários de rede ou serviço.

Repetições ao consumir eventos

Se for encontrado um erro transitório (por exemplo, um problema de rede temporário) enquanto o SDK está a receber eventos, repetirá a receção de eventos com base nas opções de repetição transmitidas para o EventHubConsumerClient. Se as tentativas de repetição máximas estiverem esgotadas, a processError função será invocada.

Pode utilizar as definições de repetição para controlar a rapidez com que é informado sobre problemas temporários, como um problema de ligação de rede. Por exemplo, se precisar de saber quando existe um problema de rede imediatamente, pode reduzir os valores para maxRetries e retryDelayInMs.

Depois de executar a processError função, o cliente continua a receber eventos da partição, desde que o erro tenha sido reativado. Caso contrário, o cliente invoca a função fornecida pelo processClose utilizador. Esta função também é invocada quando para a subscrição ou quando o cliente deixa de ler eventos da partição atual devido à sua recolha por outra instância da sua aplicação como parte do balanceamento de carga.

A processClose função proporciona uma oportunidade para atualizar os pontos de verificação, se necessário. Depois de executar processClose, o cliente (ou, no caso do balanceamento de carga, um cliente de outra instância da sua aplicação) invocará a função fornecida pelo processInitialize utilizador para retomar a leitura de eventos do último ponto de verificação atualizado para a mesma partição.

Se quiser parar de tentar ler eventos, tem de chamar close() o subscription devolvido pelo subscribe método .

Exemplos

As secções seguintes fornecem fragmentos de código que abrangem algumas das tarefas comuns com Hubs de Eventos do Azure

Inspecionar um Hub de Eventos

Muitas operações do Hub de Eventos ocorrem no âmbito de uma partição específica. Uma vez que as partições pertencem ao Hub de Eventos, os respetivos nomes são atribuídos no momento da criação. Para compreender que partições estão disponíveis, consulte o Hub de Eventos com um dos dois clientes disponíveis: EventHubProducerClient ou EventHubConsumerClient

No exemplo abaixo, estamos a utilizar um EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publicar eventos num Hub de Eventos

Para publicar eventos, terá de criar um EventHubProducerClient. Enquanto o exemplo abaixo mostra uma forma de criar o cliente, veja a secção Autenticar o cliente para aprender outras formas de instanciar o cliente.

Pode publicar eventos numa partição específica ou permitir que o serviço Hubs de Eventos decida em que eventos de partição devem ser publicados. É recomendado utilizar o encaminhamento automático quando a publicação de eventos tem de estar altamente disponível ou quando os dados de eventos devem ser distribuídos uniformemente entre as partições. No exemplo abaixo, vamos tirar partido do encaminhamento automático.

  • Create um EventDataBatch objeto com o createBatch
  • Adicione eventos ao lote com o método tryAdd . Pode fazê-lo até que o limite máximo de tamanho do lote seja atingido ou até terminar de adicionar o número de eventos de que gostou, o que ocorrer primeiro. Este método regressaria false para indicar que não podem ser adicionados mais eventos ao lote devido ao tamanho máximo do lote ser atingido.
  • Envie o lote de eventos com o método sendBatch .

No exemplo abaixo, tentamos enviar 10 eventos para Hubs de Eventos do Azure.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Existem opções que pode transmitir em diferentes fases para controlar o processo de envio de eventos para Hubs de Eventos do Azure.

  • O EventHubProducerClient construtor utiliza um parâmetro opcional do tipo EventHubClientOptions que pode utilizar para especificar opções como o número de repetições.
  • O createBatch método utiliza um parâmetro opcional do tipo CreateBatchOptions que pode utilizar para aumentar o tamanho máximo do lote suportado pelo lote que está a ser criado.
  • O sendBatch método utiliza um parâmetro opcional do tipo SendBatchOptions que pode utilizar para especificar abortSignal para cancelar a operação atual.
  • Caso pretenda enviar para uma partição específica, uma sobrecarga do sendBatch método permite-lhe transmitir o ID da partição para onde enviar eventos. O exemplo Inspecionar um Hub de Eventos acima mostra como obter os IDs de partições disponíveis.

Nota: ao trabalhar com o Azure Stream Analytics, o corpo do evento que está a ser enviado também deve ser um objeto JSON. Por exemplo: body: { "message": "Hello World" }

Consumir eventos de um Hub de Eventos

Para consumir eventos de uma instância do Hub de Eventos, também precisa de saber que grupo de consumidores pretende direcionar. Assim que souber, está pronto para criar um EventHubConsumerClient. Enquanto o exemplo abaixo mostra uma forma de criar o cliente, veja a secção Autenticar o cliente para aprender outras formas de instanciar o cliente.

O subscribe método no cliente tem sobrecargas que, combinadas com o construtor, podem atender a várias formas de consumir eventos:

O subscribe método utiliza um parâmetro opcional do tipo SubscriptionOptions que pode utilizar para especificar opções como maxBatchSize (número de eventos a aguardar) e maxWaitTimeInSeconds (quantidade de tempo para aguardar a chegada dos eventos maxBatchSize).

Consumir eventos num único processo

Comece por criar uma instância do EventHubConsumerCliente, em seguida, chame o subscribe() método para começar a consumir eventos.

O subscribe método recebe chamadas de retorno para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para parar de receber eventos, pode chamar close() o objeto devolvido pelo subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Consumir eventos com balanceamento de carga em vários processos

Hubs de Eventos do Azure é capaz de lidar com milhões de eventos por segundo. Para dimensionar a sua aplicação de processamento, pode executar várias instâncias da sua aplicação e fazer com que equilibre a carga entre si.

Comece por criar uma instância do EventHubConsumerClient com uma das sobrecargas do construtor que utiliza um CheckpointStoree, em seguida, chame o subscribe() método para começar a consumir eventos. O arquivo de pontos de verificação permitirá que os subscritores dentro de um grupo de consumidores coordenem o processamento entre várias instâncias da sua aplicação.

Neste exemplo, vamos utilizar o BlobCheckpointStore do @azure/eventhubs-checkpointstore-blob pacote que implementa as leituras/escritas necessárias num arquivo durável com Armazenamento de Blobs do Azure.

O subscribe método recebe chamadas de retorno para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para parar de receber eventos, pode chamar close() o objeto devolvido pelo subscribe() método .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Veja Balancear a carga de partição em várias instâncias da sua aplicação para saber mais.

Consumir eventos de uma única partição

Comece por criar uma instância do EventHubConsumerCliente, em seguida, chame o subscribe() método para começar a consumir eventos. Transmita o ID da partição que pretende direcionar para o subscribe() método para consumir apenas a partir dessa partição.

No exemplo abaixo, estamos a utilizar a primeira partição.

O subscribe método recebe chamadas de retorno para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para parar de receber eventos, pode chamar close() o objeto devolvido pelo subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Utilizar EventHubConsumerClient para trabalhar com o IotHub

Também pode utilizar EventHubConsumerClient para trabalhar com o IotHub. Isto é útil para receber dados telemétricos do IotHub a partir do EventHub ligado. A cadeia de ligação associada não terá afirmações de envio, pelo que não é possível enviar eventos.

  • Tenha em atenção que o cadeia de ligação tem de ser para um ponto final compatível com o Hub de Eventos (por exemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Resolução de problemas

Dependências AMQP

A biblioteca dos Hubs de Eventos depende da biblioteca rhea-promise para gerir ligações, enviar e receber eventos através do protocolo AMQP .

Registo

Pode definir a variável de ambiente para ativar o AZURE_LOG_LEVEL registo como stderr:

export AZURE_LOG_LEVEL=verbose

Para obter instruções mais detalhadas sobre como ativar os registos, pode ver os documentos do pacote @azure/logger.

Em alternativa, pode definir a DEBUG variável de ambiente para obter registos ao utilizar esta biblioteca. Isto pode ser útil se também pretender emitir registos das dependências rhea-promise e rhea também.

Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG. Não especifique quaisquer azure bibliotecas através de DEBUG ao especificar também AZURE_LOG_LEVEL ou chamar setLogLevel.

  • Obter apenas registos de depuração ao nível das informações a partir do SDK dos Hubs de Eventos.
export DEBUG=azure:*:info
  • Obter registos de depuração do SDK dos Hubs de Eventos e da biblioteca de nível de protocolo.
export DEBUG=azure*,rhea*
  • Se não estiver interessado em ver os dados de eventos não processados (que consomem uma grande quantidade de espaço em disco/consola), pode definir a variável de ambiente da DEBUG seguinte forma:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Se estiver interessado apenas em erros e avisos do SDK, pode definir a variável de ambiente da DEBUG seguinte forma:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Passos seguintes

Mais código de exemplo

Veja o diretório de exemplos para obter exemplos detalhados sobre como utilizar esta biblioteca para enviar e receber eventos de/para os Hubs de Eventos.

Contribuir

Se quiser contribuir para esta biblioteca, leia o guia de contribuição para saber mais sobre como criar e testar o código.

Impressões