Udostępnij przez


Transakcje na platformie Apache Kafka dla usługi Azure Event Hubs

Ten artykuł zawiera szczegółowe informacje na temat używania transakcyjnego interfejsu API platformy Apache Kafka z usługą Azure Event Hubs.

Omówienie

Usługa Event Hubs udostępnia punkt końcowy platformy Kafka, który może być używany przez istniejące aplikacje klienckie platformy Kafka jako alternatywę dla uruchamiania własnego klastra platformy Kafka. Usługa Event Hubs współpracuje z wieloma istniejącymi aplikacjami platformy Kafka. Aby uzyskać więcej informacji, zobacz Event Hubs for Apache Kafka (Usługa Event Hubs dla platformy Apache Kafka).

Ten dokument koncentruje się na tym, jak bezproblemowo korzystać z transakcyjnego interfejsu API platformy Kafka z usługą Azure Event Hubs.

Uwaga

Kafka Transactions są obecnie dostępne w publicznej wersji zapoznawczej w warstwie Premium i Dedykowanej.

Transakcje na platformie Apache Kafka

W środowiskach chmurowych aplikacje muszą być odporne na zakłócenia sieci oraz ponowne uruchamianie i uaktualnienia przestrzeni nazw. Aplikacje wymagające rygorystycznych gwarancji przetwarzania muszą korzystać z platformy transakcyjnej lub interfejsu API, aby upewnić się, że wszystkie operacje są wykonywane lub żadna nie jest, tak by stan aplikacji i danych był niezawodnie zarządzany. Jeśli zestaw operacji nie powiedzie się, można je niezawodnie próbować ponownie niepodzielnie, aby zagwarantować właściwe gwarancje przetwarzania.

Uwaga

Gwarancje transakcyjne są zwykle wymagane, gdy istnieje wiele operacji, które należy przetworzyć w sposób "wszystkie lub nic".

W przypadku wszystkich innych operacji aplikacje klienckie są domyślnie odporne, aby ponowić próbę wykonania operacji z wycofywaniem wykładniczym, jeśli określona operacja nie powiodła się.

Platforma Apache Kafka udostępnia transakcyjny interfejs API w celu zapewnienia tego poziomu gwarancji przetwarzania w tym samym lub innym zestawie tematów/partycji.

Transakcje mają zastosowanie do następujących przypadków:

  • Producenci transakcyjni.
  • Dokładnie raz przetwarzania semantyki.

Producenci transakcyjni

Producenci transakcyjni zapewniają, że dane są zapisywane niepodzielnie w wielu partycjach w różnych tematach. Producenci mogą zainicjować transakcję, zapisać na wielu partycjach w tym samym temacie lub w różnych tematach, a następnie zatwierdzić lub przerwać transakcję.

Aby zapewnić, że producent jest transakcyjny, należy ustawić wartość true, enable.idempotence aby upewnić się, że dane są zapisywane dokładnie raz, unikając w ten sposób duplikatów po stronie wysyłania. transaction.id Ponadto należy ustawić opcję unikatowego identyfikowania producenta.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Po zainicjowaniu producenta następujące wywołanie gwarantuje, że producent rejestruje się w brokerze jako producent transakcyjny -

    producer.initTransactions();

Producent musi następnie jawnie rozpocząć transakcję, wykonać operacje wysyłania w różnych tematach i partycjach w zwykły sposób, a następnie zatwierdzić transakcję za pomocą następującego wywołania —

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Jeśli transakcja musi zostać przerwana z powodu błędu lub przekroczenia limitu czasu, producent może wywołać metodę abortTransaction() .

	producer.abortTransaction();

Dokładnie raz semantyka

Semantyka dokładnie raz opiera się na producentach transakcyjnych poprzez dodanie konsumentów do zakresu transakcyjnego producentów, aby każdy rekord był odczytywany, przetwarzany i zapisywany dokładnie raz.

Najpierw tworzone jest wystąpienie producenta transakcyjnego -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Następnie użytkownik musi być skonfigurowany do odczytu tylko nietransakcyjnych komunikatów lub zatwierdzonych komunikatów transakcyjnych, ustawiając następującą właściwość —


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Po utworzeniu wystąpienia konsument może subskrybować temat, z którego należy odczytywać rekordy —


    consumer.subscribe(singleton("inputTopic"));

Gdy konsument pobiera rekordy z tematu wejściowego, producent rozpoczyna kontekst transakcyjny, w którym rekord jest przetwarzany i zapisywany w temacie wyjściowym. Po zapisaniu rekordów zostanie utworzona zaktualizowana mapa przesunięć dla wszystkich partycji. Następnie producent wysyła tę zaktualizowaną mapę przesunięcia do transakcji przed zatwierdzeniem transakcji.

W przypadku dowolnego wyjątku transakcja zostaje przerwana, a producent ponawia próbę przetwarzania po raz kolejny niepodzielnie.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Ostrzeżenie

Jeśli transakcja nie zostanie zatwierdzona lub przerwana przed max.transaction.timeout.ms, transakcja zostanie przerwana automatycznie przez usługę Event Hubs. Wartość domyślna max.transaction.timeout.ms to 15 minut przez usługę Event Hubs, ale producent może zastąpić ją niższą wartością, ustawiając transaction.timeout.ms właściwość we właściwościach konfiguracji producenta.

Przewodnik migracji

Jeśli masz istniejące aplikacje platformy Kafka, których chcesz używać z usługą Azure Event Hubs, zapoznaj się z przewodnikiem migracji platformy Kafka dla usługi Azure Event Hubs , aby szybko uruchomić usługę Azure Event Hubs.

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: