Menerima peristiwa menggunakan pengiriman pull dengan Java
Artikel ini menyediakan panduan langkah demi langkah cepat untuk menerima CloudEvents menggunakan pengiriman penarikan Event Grid. Ini menyediakan kode sampel untuk menerima, mengakui (menghapus peristiwa dari Event Grid).
Prasyarat
Prasyarat yang perlu Anda miliki sebelum melanjutkan adalah:
Pahami apa itu pengiriman penarikan. Untuk informasi selengkapnya, lihat menarik konsep pengiriman dan menarik gambaran umum pengiriman.
Namespace, topik, dan langganan peristiwa.
- Membuat dan mengelola namespace
- Membuat dan mengelola topik namespace
- Membuat dan mengelola langganan peristiwa
Paket SDK beta terbaru. Jika Anda menggunakan maven, Anda dapat berkonsultasi dengan repositori pusat maven.
Penting
Dukungan SDK sarana data pengiriman pull tersedia dalam paket beta. Anda harus menggunakan paket beta terbaru dalam proyek Anda.
IDE yang mendukung Java seperti IntelliJ IDEA, Eclipse IDE, atau Visual Studio Code.
Java JRE menjalankan tingkat bahasa Java 8.
Anda harus memiliki peristiwa yang tersedia pada topik. Lihat menerbitkan peristiwa ke topik namespace layanan.
Menerima peristiwa menggunakan pengiriman pull
Anda membaca peristiwa dari Event Grid dengan menentukan topik namespace layanan dan langganan peristiwa antrean dengan operasi terima . Langganan peristiwa adalah sumber daya yang secara efektif mendefinisikan kumpulan CloudEvents yang dapat dibaca klien konsumen. Kode sampel ini menggunakan autentikasi berbasis kunci karena menyediakan pendekatan cepat dan sederhana untuk autentikasi. Untuk skenario produksi, Anda harus menggunakan autentikasi MICROSOFT Entry ID karena menyediakan mekanisme autentikasi yang jauh lebih kuat.
package com.azure.messaging.eventgrid.samples;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
* This code samples should use Java 1.8 level or above to avoid compilation errors.
* You should consult the resources below to use the client SDK and set up your project using maven.
* @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
* @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
* @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
* @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
*</p>
*/
public class NamespaceTopicConsumer {
private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSusbcriptionName>";
public static final String ENDPOINT = "<yourFullHttpsUrlToTheNamespaceEndpoint>";
public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);
private static EventGridClient eventGridClient;
private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();
//TODO Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
/**
* For security concerns, you should not have keys or any other secret in any part of the application code.
* You should use services like Azure Key Vault for managing your keys.
*/
public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
public static void main(String[] args) {
//TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
//https://learn.microsoft.com/en-us/rest/api/eventgrid/.
eventGridClient = new EventGridClientBuilder()
.httpClient(HttpClient.createDefault()) // Requires Java 1.8 level
.endpoint(ENDPOINT)
.serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
.credential(CREDENTIAL).buildClient(); // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.
System.out.println("Waiting " + MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();
for (ReceiveDetails detail : receiveDetails) {
// Add order message received to a tracking list
CloudEvent orderCloudEvent = detail.getEvent();
receivedCloudEvents.add(orderCloudEvent);
// Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
String lockToken = metadataForCloudEventReceived.getLockToken();
receivedCloudEventLockTokens.add(lockToken);
}
System.out.println("<-- Number of events received: " + receivedCloudEvents.size());
Mengakui peristiwa
Untuk mengakui peristiwa, gunakan kode yang sama yang digunakan untuk menerima peristiwa dan tambahkan baris berikut untuk memanggil metode privat pengakuan:
// Acknowledge (i.e. delete from Event Grid the) events
acknowledge(receivedCloudEventLockTokens);
Contoh implementasi metode pengakuan bersama dengan metode utilitas untuk mencetak informasi tentang token kunci yang gagal berikut:
private static void acknowledge(List<String> lockTokens) {
AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Acknowledged event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not acknowledged (deleted from Event Grid)");
writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
}
}
private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
for (FailedLockToken failedLockToken : failedLockTokens) {
System.out.println(" Failed lock token: " + failedLockToken.getLockToken());
System.out.println(" Error code: " + failedLockToken.getErrorCode());
System.out.println(" Error description: " + failedLockToken.getErrorDescription());
}
}
Merilis peristiwa
Rilis peristiwa untuk membuatnya tersedia untuk redelivery. Mirip dengan apa yang Anda lakukan untuk mengakui peristiwa, Anda dapat menambahkan metode statis berikut dan baris untuk memanggilnya untuk merilis peristiwa yang diidentifikasi oleh token kunci yang diteruskan sebagai argumen. Anda memerlukan writeFailedLockTokens
metode untuk mengkompilasi metode ini.
private static void release(List<String> lockTokens) {
ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Released event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not released back to Event Grid.");
writeFailedLockTokens(releaseResult.getFailedLockTokens());
}
}
Menolak peristiwa
Tolak peristiwa yang tidak dapat diproses oleh aplikasi konsumen Anda. Kondisi di mana Anda menolak peristiwa termasuk peristiwa cacat yang tidak dapat diurai atau masalah dengan aplikasi yang memproses peristiwa.
private static void reject(List<String> lockTokens) {
RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Rejected event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not rejected.");
writeFailedLockTokens(rejectResult.getFailedLockTokens());
}
}
Langkah berikutnya
- Lihat Referensi Java API.
- Untuk mempelajari selengkapnya tentang model pengiriman penarikan, lihat Gambaran umum pengiriman pull.