Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ten artykuł zawiera szczegółowe informacje na temat używania transakcyjnego interfejsu API platformy Apache Kafka z usługą Azure Event Hubs.
Omówienie
Usługa Event Hubs udostępnia punkt końcowy platformy Kafka, który może być używany przez istniejące aplikacje klienckie platformy Kafka jako alternatywę dla uruchamiania własnego klastra platformy Kafka. Usługa Event Hubs współpracuje z wieloma istniejącymi aplikacjami platformy Kafka. Aby uzyskać więcej informacji, zobacz Event Hubs for Apache Kafka (Usługa Event Hubs dla platformy Apache Kafka).
Ten dokument koncentruje się na tym, jak bezproblemowo korzystać z transakcyjnego interfejsu API platformy Kafka z usługą Azure Event Hubs.
Uwaga
Kafka Transactions są obecnie dostępne w publicznej wersji zapoznawczej w warstwie Premium i Dedykowanej.
Transakcje na platformie Apache Kafka
W środowiskach chmurowych aplikacje muszą być odporne na zakłócenia sieci oraz ponowne uruchamianie i uaktualnienia przestrzeni nazw. Aplikacje wymagające rygorystycznych gwarancji przetwarzania muszą korzystać z platformy transakcyjnej lub interfejsu API, aby upewnić się, że wszystkie operacje są wykonywane lub żadna nie jest, tak by stan aplikacji i danych był niezawodnie zarządzany. Jeśli zestaw operacji nie powiedzie się, można je niezawodnie próbować ponownie niepodzielnie, aby zagwarantować właściwe gwarancje przetwarzania.
Uwaga
Gwarancje transakcyjne są zwykle wymagane, gdy istnieje wiele operacji, które należy przetworzyć w sposób "wszystkie lub nic".
W przypadku wszystkich innych operacji aplikacje klienckie są domyślnie odporne, aby ponowić próbę wykonania operacji z wycofywaniem wykładniczym, jeśli określona operacja nie powiodła się.
Platforma Apache Kafka udostępnia transakcyjny interfejs API w celu zapewnienia tego poziomu gwarancji przetwarzania w tym samym lub innym zestawie tematów/partycji.
Transakcje mają zastosowanie do następujących przypadków:
- Producenci transakcyjni.
- Dokładnie raz przetwarzania semantyki.
Producenci transakcyjni
Producenci transakcyjni zapewniają, że dane są zapisywane niepodzielnie w wielu partycjach w różnych tematach. Producenci mogą zainicjować transakcję, zapisać na wielu partycjach w tym samym temacie lub w różnych tematach, a następnie zatwierdzić lub przerwać transakcję.
Aby zapewnić, że producent jest transakcyjny, należy ustawić wartość true, enable.idempotence aby upewnić się, że dane są zapisywane dokładnie raz, unikając w ten sposób duplikatów po stronie wysyłania.
transaction.id Ponadto należy ustawić opcję unikatowego identyfikowania producenta.
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
Po zainicjowaniu producenta następujące wywołanie gwarantuje, że producent rejestruje się w brokerze jako producent transakcyjny -
producer.initTransactions();
Producent musi następnie jawnie rozpocząć transakcję, wykonać operacje wysyłania w różnych tematach i partycjach w zwykły sposób, a następnie zatwierdzić transakcję za pomocą następującego wywołania —
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
Jeśli transakcja musi zostać przerwana z powodu błędu lub przekroczenia limitu czasu, producent może wywołać metodę abortTransaction() .
producer.abortTransaction();
Dokładnie raz semantyka
Semantyka dokładnie raz opiera się na producentach transakcyjnych poprzez dodanie konsumentów do zakresu transakcyjnego producentów, aby każdy rekord był odczytywany, przetwarzany i zapisywany dokładnie raz.
Najpierw tworzone jest wystąpienie producenta transakcyjnego -
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
Następnie użytkownik musi być skonfigurowany do odczytu tylko nietransakcyjnych komunikatów lub zatwierdzonych komunikatów transakcyjnych, ustawiając następującą właściwość —
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
Po utworzeniu wystąpienia konsument może subskrybować temat, z którego należy odczytywać rekordy —
consumer.subscribe(singleton("inputTopic"));
Gdy konsument pobiera rekordy z tematu wejściowego, producent rozpoczyna kontekst transakcyjny, w którym rekord jest przetwarzany i zapisywany w temacie wyjściowym. Po zapisaniu rekordów zostanie utworzona zaktualizowana mapa przesunięć dla wszystkich partycji. Następnie producent wysyła tę zaktualizowaną mapę przesunięcia do transakcji przed zatwierdzeniem transakcji.
W przypadku dowolnego wyjątku transakcja zostaje przerwana, a producent ponawia próbę przetwarzania po raz kolejny niepodzielnie.
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord(“outputTopic”, record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
Ostrzeżenie
Jeśli transakcja nie zostanie zatwierdzona lub przerwana przed max.transaction.timeout.ms, transakcja zostanie przerwana automatycznie przez usługę Event Hubs. Wartość domyślna max.transaction.timeout.ms to 15 minut przez usługę Event Hubs, ale producent może zastąpić ją niższą wartością, ustawiając transaction.timeout.ms właściwość we właściwościach konfiguracji producenta.
Przewodnik migracji
Jeśli masz istniejące aplikacje platformy Kafka, których chcesz używać z usługą Azure Event Hubs, zapoznaj się z przewodnikiem migracji platformy Kafka dla usługi Azure Event Hubs , aby szybko uruchomić usługę Azure Event Hubs.
Następne kroki
Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: