Partager via


Azure Event Hubs bibliothèque cliente du magasin de points de contrôle pour Java - version 1.17.0

utilisation d’objets blob de stockage

Azure Event Hubs magasin de points de contrôle peut être utilisé pour stocker des points de contrôle lors du traitement des événements à partir de Azure Event Hubs. Ce package utilise des objets blob de stockage comme magasin persistant pour la gestion des points de contrôle et des informations de propriété de partition. Le BlobCheckpointStore fourni dans ce package peut être branché à EventProcessor.

Code source | Documentation de référence de l’API | Documentation du produit | Exemples

Prise en main

Prérequis

Inclure le package

Inclure le fichier de nomenclature

Incluez azure-sdk-bom dans votre projet pour dépendre de la version de disponibilité générale de la bibliothèque. Dans l’extrait de code suivant, remplacez l’espace réservé {bom_version_to_target} par le numéro de version. Pour en savoir plus sur la nomenclature, consultez le README BOM du KIT DE DÉVELOPPEMENT LOGICIEL AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

puis incluez la dépendance directe dans la section dépendances sans la balise de version, comme indiqué ci-dessous.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
  </dependency>
</dependencies>

Inclure une dépendance directe

Si vous souhaitez dépendre d’une version particulière de la bibliothèque qui n’est pas présente dans la nomenclature, ajoutez la dépendance directe à votre projet comme suit.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>1.17.0</version>
</dependency>

Authentifier le client de conteneur de stockage

Pour créer un instance de , un doit d’abord être créé avec un jeton SAS approprié avec un accès en écriture et une ContainerAsyncClient chaîne de BlobCheckpointStoreconnexion. Pour ce faire, vous aurez besoin de la chaîne SAS de compte (signature d’accès partagé) du compte de stockage. Pour plus d’informations , consultez Jeton SAS.

Concepts clés

Points de contrôle

Les points de contrôle constituent un processus par lequel les lecteurs marquent ou valident leur position dans une séquence d’événements de partition. La réalisation des points de contrôle est la responsabilité du consommateur et se produit sur une base par partition dans un groupe de consommateurs. Cette responsabilité signifie que pour chaque groupe de consommateurs, chaque lecteur de partition doit conserver une trace de sa position actuelle dans le flux d’événements. Il peut informer le service lorsqu’il considère que le flux de données est complet. Si un lecteur se déconnecte d'une partition, lorsqu'il se reconnecte il commence la lecture au point de contrôle qui a été précédemment soumis par le dernier lecteur de cette partition dans ce groupe de consommateurs. Lorsque le lecteur se connecte, il transmet le décalage à l’Event Hub pour spécifier l’emplacement où commencer la lecture. De cette façon, vous pouvez utiliser les points de contrôle pour marquer les événements comme « terminés » par les applications en aval et pour assurer la résilience si un basculement se produit entre des lecteurs en cours d’exécution sur des ordinateurs différents. Il est possible de revenir à des données plus anciennes en spécifiant un décalage inférieur à partir de ce processus de vérification. Grâce à ce mécanisme, les points de contrôle permettent une résilience au basculement renforcée, mais également la relecture du flux d’événements.

Décalages des & numéros de séquence

Les deux numéros de séquence de décalage & font référence à la position d’un événement dans une partition. Vous pouvez les considérer comme un curseur côté client. Le décalage est une numérotation en octets de l'événement. Le décalage/numéro de séquence permet à un consommateur d’événements (lecteur) de spécifier un point dans le flux d’événements à partir duquel il souhaite commencer la lecture des événements. Vous pouvez spécifier l’horodatage de sorte que vous recevez des événements qui ont été mis en file d’attente uniquement après l’horodatage donné. Les consommateurs sont responsables du stockage de leurs propres valeurs de décalage en dehors du service Event Hubs. Dans une partition, chaque événement inclut un décalage, un numéro de séquence et l’horodatage de la mise en file d’attente.

Exemples

Créer un instance de conteneur de stockage avec un jeton SAS

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

Consommer des événements à l’aide d’un client de processeur d’événements

Pour consommer des événements pour toutes les partitions d’un Event Hub, vous allez créer un EventProcessorClient pour un groupe de consommateurs spécifique. Lorsqu’un hub d’événements est créé, il fournit un groupe de consommateurs par défaut qui peut être utilisé pour commencer.

Délègue EventProcessorClient le traitement des événements à une fonction de rappel que vous fournissez, ce qui vous permet de vous concentrer sur la logique nécessaire pour fournir de la valeur, tandis que le processeur est responsable de la gestion des opérations de consommateur sous-jacentes.

Dans notre exemple, nous allons nous concentrer sur la création du EventProcessor, utiliser et BlobCheckpointStoreune fonction de rappel simple pour traiter les événements reçus d’Event Hubs, écrire dans la console et mettre à jour le point de contrôle dans le stockage Blob après chaque événement.

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .connectionString("<< EVENT HUB CONNECTION STRING >>")
    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
    .processEvent(eventContext -> {
        System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
            + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
    })
    .buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();

Dépannage

Activer la journalisation du client

Le Kit de développement logiciel (SDK) Azure pour Java offre un article de journalisation cohérent pour faciliter la résolution des erreurs d’application et accélérer leur résolution. Les journaux produits capturent le flux d’une application avant d’atteindre l’état terminal pour faciliter la localisation du problème racine. Consultez le wiki de journalisation pour obtenir des conseils sur l’activation de la journalisation.

Bibliothèque SSL par défaut

Toutes les bibliothèques de client utilisent par défaut la bibliothèque BoringSSL Tomcat native pour permettre des performances de niveau natif pour les opérations SSL. La bibliothèque BoringSSL est un fichier uber jar contenant des bibliothèques natives pour Linux/macOS/Windows. Elle offre de meilleures performances que l’implémentation SSL par défaut au sein du JDK. Pour plus d’informations, notamment sur la réduction de la taille des dépendances, consultez la section du wiki consacrée à l’optimisation des performances.

Étapes suivantes

Commencez par explorer les exemples ici.

Contribution

Si vous souhaitez devenir un contributeur actif de ce projet, reportez-vous à nos Lignes directrices sur les contributions pour plus d’informations.

Impressions