Gunakan Java untuk mengirim acara ke atau menerima acara dari Azure Event Hubs

Mulai cepat ini menunjukkan cara mengirim aktivitas ke dan menerima aktivitas dari pusat aktivitas menggunakan paket Java azure-messaging-eventhubs.

Tip

Jika Anda bekerja dengan sumber daya Azure Event Hubs dalam aplikasi Spring, kami sarankan Anda mempertimbangkan Spring Cloud Azure sebagai alternatif. Spring Cloud Azure adalah proyek sumber terbuka yang menyediakan integrasi Spring tanpa hambatan dengan layanan Azure. Untuk mempelajari selengkapnya tentang Spring Cloud Azure, dan untuk melihat contoh menggunakan Azure Event Hubs, lihat Spring Cloud Stream dengan Azure Event Hubs.

Prasyarat

Jika Anda baru menggunakan Azure Event Hubs, lihat Ringkasan Event Hubs sebelum Anda melakukan mulai cepat ini.

Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:

  • Langganan Microsoft Azure. Untuk menggunakan layanan Azure, termasuk Azure Event Hubs, Anda memerlukan langganan. Jika Anda belum memiliki akun Azure, Anda dapat mendaftar untuk coba gratis atau memanfaatkan pelanggan MSDN saat Anda membuat akun.
  • Lingkungan pengembangan Java. Mulai cepat ini menggunakan Eclipse. Java Development Kit (JDK) dengan versi 8 atau lebih tinggi diperlukan.
  • Membuat ruang nama Azure Event Hubs dan pusat aktivitas. Langkah pertama yaitu gunakan portal Microsoft Azure untuk membuat kumpulan nama Event Hubs jenis, dan dapatkan info masuk manajemen yang diperlukan aplikasi Anda untuk berkomunikasi dengan pusat aktivitas. Untuk membuat namespace layanan dan pusat aktivitas, ikuti prosedur dalam artikel ini. Kemudian, dapatkan string koneksi untuk namespace layanan Azure Event Hubs dengan mengikuti instruksi dari artikel: Mendapatkan string koneksi. Anda menggunakan string koneksi nanti dalam mulai cepat ini.

Mengirim aktivitas

Bagian ini menunjukkan kepada Anda cara membuat aplikasi Java untuk mengirim acara ke hub acara.

Menambahkan referensi ke pustaka Azure Event Hubs

Pertama, buat proyek Maven baru untuk aplikasi konsol/shell di lingkungan pengembangan Java favorit Anda. pom.xml Perbarui file sebagai berikut. Pustaka klien Java untuk Event Hubs tersedia di Repositori Pusat Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Catatan

Perbarui versi ke versi terbaru yang diterbitkan ke repositori Maven.

Mengautentikasi aplikasi ke Azure

Mulai cepat ini menunjukkan kepada Anda dua cara menyambungkan ke Azure Event Hubs: tanpa kata sandi dan string koneksi. Opsi pertama menunjukkan cara menggunakan prinsip keamanan Anda di ID Microsoft Entra dan kontrol akses berbasis peran (RBAC) untuk menyambungkan ke namespace Layanan Pusat Aktivitas. Anda tidak perlu khawatir tentang memiliki string koneksi yang dikodekan secara permanen dalam kode Anda atau dalam file konfigurasi atau dalam penyimpanan aman seperti Azure Key Vault. Opsi kedua menunjukkan cara menggunakan string koneksi untuk menyambungkan ke namespace Layanan Pusat Aktivitas. Jika Anda baru menggunakan Azure, Anda mungkin menemukan opsi string koneksi lebih mudah diikuti. Sebaiknya gunakan opsi tanpa kata sandi di aplikasi dunia nyata dan lingkungan produksi. Untuk informasi selengkapnya, lihat Autentikasi dan otorisasi. Anda juga dapat membaca selengkapnya tentang autentikasi tanpa kata sandi di halaman gambaran umum.

Menetapkan peran ke pengguna Microsoft Entra Anda

Saat mengembangkan secara lokal, pastikan bahwa akun pengguna yang tersambung ke Azure Event Hubs memiliki izin yang benar. Anda memerlukan peran Pemilik Data Azure Event Hubs untuk mengirim dan menerima pesan. Untuk menetapkan sendiri peran ini, Anda memerlukan peran Administrator Akses Pengguna, atau peran lain yang menyertakan tindakan.Microsoft.Authorization/roleAssignments/write Anda dapat menetapkan peran Azure RBAC kepada pengguna menggunakan portal Azure, Azure CLI, atau Azure PowerShell. Pelajari selengkapnya tentang cakupan yang tersedia untuk penetapan peran di halaman gambaran umum cakupan .

Contoh berikut menetapkan peran ke Azure Event Hubs Data Owner akun pengguna Anda, yang menyediakan akses penuh ke sumber daya Azure Event Hubs. Dalam skenario nyata, ikuti Prinsip Hak Istimewa Paling Sedikit untuk memberi pengguna hanya izin minimum yang diperlukan untuk lingkungan produksi yang lebih aman.

Peran bawaan Azure untuk Azure Event Hubs

Untuk Azure Event Hubs, pengelolaan namespace layanan dan semua sumber daya terkait melalui portal Azure dan API manajemen sumber daya Azure sudah dilindungi menggunakan model Azure RBAC. Azure menyediakan peran bawaan Azure di bawah ini untuk mengotorisasi akses ke namespace Layanan Pusat Aktivitas:

  • Pemilik Data Azure Event Hubs: Memungkinkan akses data ke namespace Layanan Pusat Aktivitas dan entitasnya (antrean, topik, langganan, dan filter)
  • Pengirim Data Azure Event Hubs: Gunakan peran ini untuk memberi pengirim akses ke namespace layanan Azure Event Hubs dan entitasnya.
  • Penerima Data Azure Event Hubs: Gunakan peran ini untuk memberi penerima akses ke namespace Layanan Azure Event Hubs dan entitasnya.

Jika Anda ingin membuat peran kustom, lihat Hak yang diperlukan untuk operasi Azure Event Hubs.

Penting

Dalam kebanyakan kasus, akan memakan waktu satu atau dua menit agar penetapan peran disebarluaskan di Azure. Dalam kasus yang jarang terjadi, mungkin perlu waktu hingga delapan menit. Jika Anda menerima kesalahan autentikasi saat pertama kali menjalankan kode, tunggu beberapa saat dan coba lagi.

  1. Di portal Azure, temukan namespace Layanan Pusat Aktivitas Anda menggunakan bilah pencarian utama atau navigasi kiri.

  2. Pada halaman gambaran umum, pilih Kontrol akses (IAM) dari menu sebelah kiri.

  3. Di halaman Kontrol akses (IAM), pilih tab Penetapan peran.

  4. Pilih + Tambahkan dari menu atas lalu Tambahkan penetapan peran dari menu drop-down yang dihasilkan.

    A screenshot showing how to assign a role.

  5. Gunakan kotak pencarian untuk memfilter hasil ke peran yang diinginkan. Untuk contoh ini, cari Azure Event Hubs Data Owner dan pilih hasil yang cocok. Kemudian pilih Berikutnya.

  6. Di bagian Tetapkan akses ke, pilih Pengguna, grup, atau perwakilan layanan, lalu pilih + Pilih anggota.

  7. Dalam dialog, cari nama pengguna Microsoft Entra Anda (biasanya alamat email user@domain Anda) lalu pilih Pilih di bagian bawah dialog.

  8. Pilih Tinjau + tetapkan untuk masuk ke halaman akhir, lalu Tinjau + tetapkan lagi untuk menyelesaikan proses.

Menulis kode untuk mengirim pesan ke pusat aktivitas

Tambahkan kelas bernama Sender, dan tambahkan kode berikut ke kelas:

Penting

  • Perbarui <NAMESPACE NAME> dengan nama namespace Layanan Pusat Aktivitas Anda.
  • Perbarui <EVENT HUB NAME> dengan nama pusat aktivitas Anda.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Buat program, dan pastikan tidak ada kesalahan. Anda akan menjalankan program ini setelah menjalankan program penerima.

Menerima peristiwa

Kode dalam tutorial ini didasarkan pada Sampel EventProcessorClient di GitHub, yang dapat Anda periksa untuk melihat aplikasi aktif selengkapnya.

Ikuti rekomendasi ini saat menggunakan Azure Blob Storage sebagai penyimpanan titik pemeriksaan:

  • Gunakan kontainer terpisah untuk setiap grup konsumen. Anda dapat menggunakan akun penyimpanan yang sama, tetapi menggunakan satu kontainer per setiap grup.
  • Jangan gunakan kontainer untuk hal lain, dan jangan gunakan akun penyimpanan untuk hal lain.
  • Akun penyimpanan harus berada di wilayah yang sama dengan aplikasi yang disebarkan berada. Jika aplikasi lokal, coba pilih wilayah terdekat yang mungkin.

Pada halaman Akun penyimpanan di portal Azure, di bagian Blob service, pastikan bahwa pengaturan berikut dinonaktifkan.

  • Namespace hierarkis
  • Penghapusan sementara blob
  • Penerapan versi

Membuat Azure Storage dan kontainer blob

Dalam mulai cepat ini, Anda menggunakan Azure Storage (khususnya, Blob Storage) sebagai penyimpanan titik pemeriksaan. Titik pemeriksaan adalah proses untuk prosesor aktivitas menandai atau menerapkan posisi kejadian terakhir yang berhasil diproses dalam partisi. Penandaan titik pemeriksaan biasanya diselesaikan di dalam fungsi yang memproses aktivitas. Untuk mempelajari selengkapnya tentang proses titik pemeriksaan, lihat Prosesor aktivitas.

Ikuti langkah-langkah berikut untuk membuat akun Azure Storage.

  1. Membuat akun Microsoft Azure Storage
  2. Membuat kontainer blob
  3. Mengautentikasi ke kontainer blob

Saat mengembangkan secara lokal, pastikan bahwa akun pengguna yang mengakses data blob memiliki izin yang benar. Anda akan memerlukan Kontributor Data Blob Penyimpanan untuk membaca dan menulis data blob. Untuk menetapkan sendiri peran ini, Anda harus diberi peran Administrator Akses Pengguna, atau peran lain yang menyertakan tindakan Microsoft.Authorization/roleAssignments/write . Anda dapat menetapkan peran Azure RBAC kepada pengguna menggunakan portal Azure, Azure CLI, atau Azure PowerShell. Anda dapat mempelajari selengkapnya tentang cakupan yang tersedia untuk penetapan peran di halaman gambaran umum cakupan.

Dalam skenario ini, Anda akan menetapkan izin ke akun pengguna, yang tercakup ke akun penyimpanan, untuk mengikuti Prinsip Hak Istimewa Paling Rendah. Praktik ini hanya memberi pengguna izin minimum yang diperlukan dan menciptakan lingkungan produksi yang lebih aman.

Contoh berikut akan menetapkan peran Kontributor Data Blob Penyimpanan ke akun pengguna Anda, yang menyediakan akses baca dan tulis ke data blob di akun penyimpanan Anda.

Penting

Dalam kebanyakan kasus, dibutuhkan satu atau dua menit agar penetapan peran disebarluaskan di Azure, tetapi dalam kasus yang jarang terjadi mungkin perlu waktu hingga delapan menit. Jika Anda menerima kesalahan autentikasi saat pertama kali menjalankan kode, tunggu beberapa saat dan coba lagi.

  1. Di portal Azure, temukan akun penyimpanan Anda menggunakan bilah pencarian utama atau navigasi kiri.

  2. Di halaman gambaran umum akun penyimpanan, pilih Kontrol akses (IAM) dari menu kiri.

  3. Di halaman Kontrol akses (IAM), pilih tab Penetapan peran.

  4. Pilih + Tambahkan dari menu atas lalu Tambahkan penetapan peran dari menu drop-down yang dihasilkan.

    A screenshot showing how to assign a storage account role.

  5. Gunakan kotak pencarian untuk memfilter hasil ke peran yang diinginkan. Untuk contoh ini, cari Kontributor Data Blob Penyimpanan dan pilih hasil yang cocok, lalu pilih Berikutnya.

  6. Di bagian Tetapkan akses ke, pilih Pengguna, grup, atau perwakilan layanan, lalu pilih + Pilih anggota.

  7. Dalam dialog, cari nama pengguna Microsoft Entra Anda (biasanya alamat email user@domain Anda) lalu pilih Pilih di bagian bawah dialog.

  8. Pilih Tinjau + tetapkan untuk masuk ke halaman akhir, lalu Tinjau + tetapkan lagi untuk menyelesaikan proses.

Menambahkan pustaka Event Hubs ke proyek Java Anda

Menambahkan dependensi berikut dalam file pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Tambahkan pernyataan berikut import di bagian atas file Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Buat kelas bernama Receiver, dan tambahkan variabel string berikut ke kelas. Ganti placeholder dengan nilai yang benar.

    Penting

    Ganti placeholder dengan nilai yang benar.

    • <NAMESPACE NAME> dengan nama namespace layanan Pusat Aktivitas Anda.
    • <EVENT HUB NAME> dengan nama pusat aktivitas Anda di dalam kumpulan nama.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Tambahkan metode main berikut ke kelas.

    Penting

    Ganti placeholder dengan nilai yang benar.

    • <STORAGE ACCOUNT NAME> dengan nama akun Azure Storage Anda.
    • <CONTAINER NAME> dengan nama kontainer blob di akun penyimpanan
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Tambahkan dua metode pembantu (PARTITION_PROCESSOR dan ERROR_HANDLER) yang memproses aktivitas dan kesalahan ke kelas Receiver.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Buat program, dan pastikan tidak ada kesalahan.

Menjalankan aplikasi

  1. Jalankan aplikasi Penerima terlebih dahulu.

  2. Kemudian, jalankan aplikasi Pengirim.

  3. Di jendela aplikasi Penerima, konfirmasikan bahwa Anda melihat aktivitas yang diterbitkan oleh aplikasi Pengirim.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Tekan ENTER di jendela aplikasi penerima untuk menghentikan aplikasi.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Langkah berikutnya

Lihat sampel berikut di GitHub: