Java kullanarak Azure Event Hubs’daki olayları gönderme veya alma
Bu hızlı başlangıçta azure-messaging-eventhubs Java paketini kullanarak olay hub'ına olay gönderme ve olay hub'ından olay alma işlemleri gösterilmektedir .
İpucu
Bir Spring uygulamasında Azure Event Hubs kaynaklarıyla çalışıyorsanız Spring Cloud Azure'ı alternatif olarak değerlendirmenizi öneririz. Spring Cloud Azure, Azure hizmetleriyle sorunsuz Spring tümleştirmesi sağlayan açık kaynak bir projedir. Spring Cloud Azure hakkında daha fazla bilgi edinmek ve Event Hubs kullanarak bir örnek görmek için bkz . Azure Event Hubs ile Spring Cloud Stream.
Önkoşullar
Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:
- Microsoft Azure aboneliği. Azure Event Hubs da dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.
- Java geliştirme ortamı. Bu hızlı başlangıçta Eclipse kullanılır. Sürüm 8 veya üzeri olan Java Development Kit (JDK) gereklidir.
- Event Hubs ad alanı ve olay hub'ı oluşturun. İlk adımda Azure portalını kullanarak Event Hubs türünde bir ad alanı oluşturun, ardından uygulamanızın olay hub’ı ile iletişim kurması için gereken yönetim kimlik bilgilerini edinin. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin. Ardından, şu makaledeki yönergeleri izleyerek Event Hubs ad alanının bağlantı dizesi alın: bağlantı dizesi alma. Bu hızlı başlangıcın ilerleyen bölümlerinde bağlantı dizesi kullanacaksınız.
Olayları gönderme
Bu bölümde, olayları bir olay hub'ına göndermek için nasıl Java uygulaması oluşturulacağı gösterilmektedir.
Azure Event Hubs kitaplığına başvuru ekleme
İlk olarak, sık kullandığınız Java geliştirme ortamında bir konsol/kabuk uygulaması için yeni bir Maven projesi oluşturun. Dosyayı aşağıdaki gibi güncelleştirin pom.xml
. Event Hubs için Java istemci kitaplığı Maven Central Repository'de kullanılabilir.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.2</version>
<scope>compile</scope>
</dependency>
Not
Sürümü Maven deposunda yayımlanan en son sürüme güncelleştirin.
Azure'da uygulamanın kimliğini doğrulama
Bu hızlı başlangıçta Azure Event Hubs'a bağlanmanın iki yolu gösterilmektedir: parolasız ve bağlantı dizesi. İlk seçenek, Bir Event Hubs ad alanına bağlanmak için Microsoft Entra Id ve rol tabanlı erişim denetiminde (RBAC) güvenlik sorumlunuzu nasıl kullanacağınızı gösterir. Kodunuzda, yapılandırma dosyasında veya Azure Key Vault gibi güvenli bir depolama alanında sabit kodlanmış bağlantı dizesi olması konusunda endişelenmeniz gerekmez. İkinci seçenek, event hubs ad alanına bağlanmak için bir bağlantı dizesi nasıl kullanacağınızı gösterir. Azure'da yeniyseniz bağlantı dizesi seçeneğini daha kolay takip edebilirsiniz. Gerçek dünyadaki uygulamalarda ve üretim ortamlarında parolasız seçeneği kullanmanızı öneririz. Daha fazla bilgi için bkz . Kimlik doğrulaması ve yetkilendirme. Ayrıca, genel bakış sayfasında parolasız kimlik doğrulaması hakkında daha fazla bilgi edinebilirsiniz.
Microsoft Entra kullanıcınıza rol atama
Yerel olarak geliştirme yaparken, Azure Event Hubs'a bağlanan kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. İleti gönderip almak için Azure Event Hubs Veri Sahibi rolüne ihtiyacınız vardır. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya eylemi içeren Microsoft.Authorization/roleAssignments/write
başka bir role ihtiyacınız olacaktır. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Kapsam genel bakış sayfasında rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgi edinin.
Aşağıdaki örnekte rol, Azure Event Hubs kaynaklarına tam erişim sağlayan kullanıcı hesabınıza atanır Azure Event Hubs Data Owner
. Gerçek bir senaryoda, kullanıcılara yalnızca daha güvenli bir üretim ortamı için gereken minimum izinleri vermek için En Az Ayrıcalık İlkesi'ni izleyin.
Azure Event Hubs için Azure yerleşik rolleri
Azure Event Hubs için, Azure portalı ve Azure kaynak yönetimi API'sini kullanarak ad alanlarının ve tüm ilgili kaynakların yönetimi Azure RBAC modeli kullanılarak zaten korunur. Azure, Event Hubs ad alanına erişim yetkisi vermek için aşağıdaki Azure yerleşik rollerini sağlar:
- Azure Event Hubs Veri Sahibi: Event Hubs ad alanına ve varlıklarına (kuyruklar, konular, abonelikler ve filtreler) veri erişimini etkinleştirir
- Azure Event Hubs Veri Göndereni: Gönderene Event Hubs ad alanına ve varlıklarına erişim vermek için bu rolü kullanın.
- Azure Event Hubs Veri Alıcısı: Alıcıya Event Hubs ad alanına ve varlıklarına erişim vermek için bu rolü kullanın.
Özel bir rol oluşturmak istiyorsanız bkz . Event Hubs işlemleri için gereken haklar.
Önemli
Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.
Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak Event Hubs ad alanınızı bulun.
Genel bakış sayfasında, sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.
Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.
Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.
Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için eşleşen sonucu arayın
Azure Event Hubs Data Owner
ve seçin. Ardından İleri'yi seçin.Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.
İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.
Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.
Olay hub'ına ileti göndermek için kod yazma
adlı Sender
bir sınıf ekleyin ve sınıfına aşağıdaki kodu ekleyin:
Önemli
- Event Hubs ad alanınızın adıyla güncelleştirin
<NAMESPACE NAME>
. - Olay hub'ınızın adıyla güncelleştirin
<EVENT HUB NAME>
.
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
Programı oluşturun ve hata olmadığından emin olun. Alıcı programını çalıştırdıktan sonra bu programı çalıştıracaksınız.
Olayları alma
Bu öğreticideki kod, GitHub'daki EventProcessorClient örneğini temel alır ve tam çalışan uygulamayı görmek için inceleyebilirsiniz.
Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:
- Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
- Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
- Depolama hesabın dağıtılan uygulamanın bulunduğu bölgede olması gerekir. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.
Azure portalındaki Depolama hesabı sayfasında, Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.
- Hiyerarşik ad alanı
- Blob geçici silme
- Sürüm oluşturma
Azure Depolama ve blob kapsayıcısı oluşturma
Bu hızlı başlangıçta denetim noktası deposu olarak Azure Depolama (özellikle Blob Depolama) kullanacaksınız. Denetim noktası oluşturma, bir olay işlemcisinin bir bölümde başarıyla işlenen son olayın konumunu işaretlediği veya işlediği bir işlemdir. Bir denetim noktasını işaretleme işlemi genellikle olayları işleyen işlev içinde yapılır. Denetim noktası oluşturma hakkında daha fazla bilgi edinmek için bkz . Olay işlemcisi.
Azure Depolama hesabı oluşturmak için bu adımları izleyin.
- Azure Depolama hesabı oluşturma
- Blob kapsayıcısı oluşturma
- Blob kapsayıcısı için kimlik doğrulaması
Yerel olarak geliştirme yaparken blob verilerine erişen kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. Blob verilerini okumak ve yazmak için Depolama Blob Veri Katkıda Bulunanı gerekir. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya Microsoft.Authorization/roleAssignments/write eylemini içeren başka bir role atanmalısınız. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgiyi kapsam genel bakış sayfasından öğrenebilirsiniz.
Bu senaryoda, En Az Ayrıcalık İlkesi'ni izlemek için depolama hesabı kapsamındaki kullanıcı hesabınıza izinler atayacaksınız. Bu uygulama kullanıcılara yalnızca gereken minimum izinleri verir ve daha güvenli üretim ortamları oluşturur.
Aşağıdaki örnek, depolama hesabınızdaki blob verilerine hem okuma hem de yazma erişimi sağlayan Depolama Blob Verileri Katkıda Bulunanı rolünü kullanıcı hesabınıza atar.
Önemli
Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer, ancak nadir durumlarda sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.
Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak depolama hesabınızı bulun.
Depolama hesabına genel bakış sayfasında sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.
Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.
Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.
Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için Depolama Blob Veri Katkıda Bulunanı'nı arayın, eşleşen sonucu seçin ve ardından İleri'yi seçin.
Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.
İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.
Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.
Java projenize Event Hubs kitaplıkları ekleme
pom.xml dosyasına aşağıdaki bağımlılıkları ekleyin.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
Java dosyasının en üstüne aşağıdaki
import
deyimleri ekleyin.import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
adlı
Receiver
bir sınıf oluşturun ve sınıfına aşağıdaki dize değişkenlerini ekleyin. Yer tutucuları doğru değerlerle değiştirin.Önemli
Yer tutucuları doğru değerlerle değiştirin.
<NAMESPACE NAME>
yerine Event Hubs ad alanını yazın.<EVENT HUB NAME>
ad alanında olay hub'ınızın adını yazın.
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
Sınıfına aşağıdaki
main
yöntemi ekleyin.Önemli
Yer tutucuları doğru değerlerle değiştirin.
<STORAGE ACCOUNT NAME>
azure Depolama hesabınızın adıyla birlikte.<CONTAINER NAME>
depolama hesabındaki blob kapsayıcısının adıyla
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
Olayları ve hataları işleyen iki yardımcı yöntemi (
PARTITION_PROCESSOR
veERROR_HANDLER
) sınıfınaReceiver
ekleyin.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
Programı oluşturun ve hata olmadığından emin olun.
Uygulamaları çalıştırma
Önce Alıcı uygulamasını çalıştırın.
Ardından Sender uygulamasını çalıştırın.
Alıcı uygulaması penceresinde, Gönderen uygulaması tarafından yayımlanan olayları gördüğünüzden emin olun.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
Uygulamayı durdurmak için alıcı uygulama penceresinde ENTER tuşuna basın.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Sonraki adımlar
GitHub'da aşağıdaki örneklere bakın: