Azure Event Hubs için Apache Kafka'daki işlemler
Bu makalede, Azure Event Hubs ile Apache Kafka işlem API'sinin nasıl kullanılacağı hakkında ayrıntılı bilgi sağlanır.
Genel bakış
Event Hubs, mevcut Kafka istemci uygulamalarınız tarafından kendi Kafka kümenizi çalıştırmaya alternatif olarak kullanılabilecek bir Kafka uç noktası sağlar. Event Hubs, mevcut Kafka uygulamalarınızın çoğuyla çalışır. Daha fazla bilgi için bkz . Apache Kafka için Event Hubs.
Bu belge, Kafka'nın işlem API'sini Azure Event Hubs ile sorunsuz bir şekilde kullanma konusuna odaklanır.
Not
Kafka İşlemleri şu anda Premium ve Ayrılmış katmanda Genel önizleme aşamasındadır.
Apache Kafka'daki işlemler
Bulutta yerel ortamlarda, uygulamalar ağ kesintilerine ve ad alanı yeniden başlatmalarına ve yükseltmelerine dayanıklı hale getirilmelidir. Katı işleme garantileri gerektiren uygulamalar, tüm işlemlerin yürütülmesini veya hiçbirinin uygulama ve veri durumunun güvenilir bir şekilde yönetilmesini sağlamak için işlem çerçevesi veya API kullanmalıdır. İşlem kümesi başarısız olursa, doğru işleme garantilerini sağlamak için atomik olarak güvenilir bir şekilde yeniden denenebilir.
Not
İşlem garantileri genellikle "tümü veya hiçbir şey" biçiminde işlenmesi gereken birden çok işlem olduğunda gereklidir.
Diğer tüm işlemler için istemci uygulamaları , belirli bir işlem başarısız olursa işlemi üstel geri alma ile yeniden denemek için varsayılan olarak dayanıklıdır.
Apache Kafka, aynı veya farklı konu/bölüm kümesinde bu işlem düzeyi garantilerini sağlamak için bir işlem API'si sağlar.
İşlemler aşağıdaki durumlar için geçerlidir:
- İşlem üreticileri.
- Tam olarak bir kez semantiği işleme.
İşlem Üreticileri
İşlem üreticileri, verilerin farklı konulardaki birden çok bölüme atomik olarak yazılmasını sağlar. Üreticiler bir işlem başlatabilir, aynı konuda veya farklı konularda birden çok bölüme yazabilir ve ardından işlemi işleyebilir veya iptal edebilir.
Bir üreticinin işlemsel olduğundan emin olmak için, enable.idempotence
verilerin tam olarak bir kez yazılmasını sağlamak için true olarak ayarlanmalıdır, böylece gönderme tarafında yinelenen öğelerden kaçınılmalıdır. Ayrıca, transaction.id
üreticiyi benzersiz olarak tanımlamak için ayarlanmalıdır.
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
Üretici başlatıldıktan sonra aşağıdaki çağrı, üreticinin aracıyı işlem üreticisi olarak kaydetmesini sağlar -
producer.initTransactions();
Üreticinin daha sonra açıkça bir işlem başlatması, farklı konu başlıkları ve bölümler arasında gönderme işlemlerini normal şekilde gerçekleştirmesi ve ardından işlemi aşağıdaki çağrıyla işlemesi gerekir:
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
Bir hata veya zaman aşımı nedeniyle işlemin durdurulması gerekiyorsa üretici yöntemini çağırabilir abortTransaction()
.
producer.abortTransaction();
Tam olarak bir kez semantik
Tam olarak bir kez semantik, üreticilerin işlem kapsamına tüketici ekleyerek işlemsel üreticileri temel alır, böylece her kaydın tam olarak bir kez okunması, işlenmesi ve yazılması garanti edilir.
İlk olarak işlem üreticisi örneği oluşturulur -
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
Ardından, tüketicinin aşağıdaki özelliği ayarlayarak yalnızca işlem dışı iletileri veya işlenen işlem iletilerini okuyacak şekilde yapılandırılması gerekir:
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
Tüketici örneği oluşturulurken, kayıtların okunması gereken konuya abone olabilir –
consumer.subscribe(singleton("inputTopic"));
Tüketici giriş konusundan kayıtları yokladıktan sonra, üretici kaydın işlendiği ve çıkış konusuna yazıldığı işlem kapsamını başlatır. Kayıtlar yazıldıktan sonra tüm bölümler için güncelleştirilmiş uzaklık eşlemesi oluşturulur. Ardından üretici, işlemi işlemeden önce bu güncelleştirilmiş uzaklık eşlemesini işlemeye gönderir.
Herhangi bir özel durumda işlem durduruldu ve üretici işlemi atomik olarak bir kez daha yeniden deneniyor.
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord(“outputTopic”, record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
Uyarı
İşlem işleminden önce max.transaction.timeout.ms
işlenmez veya durdurulmazsa, işlem Event Hubs tarafından otomatik olarak durdurulacaktır. Event Hubs tarafından varsayılan değer max.transaction.timeout.ms
15 dakika olarak ayarlanır, ancak üretici, üretici yapılandırma özelliklerinde özelliğini ayarlayarak transaction.timeout.ms
bunu daha düşük bir değerle geçersiz kılabilir.
Geçiş Kılavuzu
Azure Event Hubs ile kullanmak istediğiniz mevcut Kafka uygulamalarınız varsa azure event hubs için Kafka geçiş kılavuzunu gözden geçirerek hızla çalışmaya başlayın.
Sonraki adımlar
Kafka için Event Hubs ve Event Hubs hakkında daha fazla bilgi edinmek için aşağıdaki makalelere bakın: