Condividi tramite


Transazioni in Apache Kafka per Hub eventi di Azure

Questo articolo fornisce informazioni dettagliate su come usare l'API transazionale Apache Kafka con Hub eventi di Azure.

Panoramica

Hub eventi fornisce un endpoint Kafka che può essere usato dalle applicazioni client Kafka esistenti come alternativa all'esecuzione del proprio cluster Kafka. Hub eventi funziona con molte delle applicazioni Kafka esistenti. Per altre informazioni, vedere Hub eventi per Apache Kafka.

Questo documento è incentrato su come usare l'API transazionale di Kafka con Hub eventi di Azure senza problemi.

Nota

Le transazioni Kafka sono attualmente in anteprima pubblica nel livello Premium e Dedicato.

Transazioni in Apache Kafka

Negli ambienti nativi del cloud, le applicazioni devono essere rese resilienti alle interruzioni di rete e ai riavvii e agli aggiornamenti dello spazio dei nomi. Le applicazioni che richiedono garanzie di elaborazione rigorose devono usare un framework transazionale o un'API per garantire che tutte le operazioni vengano eseguite o che nessuno sia in modo che l'applicazione e lo stato dei dati siano gestiti in modo affidabile. Se il set di operazioni ha esito negativo, è possibile riprovare in modo affidabile in modo atomico per garantire le garanzie di elaborazione corrette.

Nota

Le garanzie transazionali sono in genere necessarie quando sono presenti più operazioni che devono essere elaborate in modo "tutto o niente".

Per tutte le altre operazioni, le applicazioni client sono resilienti per impostazione predefinita per ritentare l'operazione con un backoff esponenziale, se l'operazione specifica non è riuscita.

Apache Kafka fornisce un'API transazionale per garantire questo livello di garanzia di elaborazione nello stesso set di argomenti o partizioni diverso.

Le transazioni si applicano ai casi seguenti:

  • Producer transazionali.
  • Esattamente una volta la semantica di elaborazione.

Producer transazionali

I producer transazionali assicurano che i dati vengano scritti in modo atomico in più partizioni in diversi argomenti. I producer possono avviare una transazione, scrivere in più partizioni nello stesso argomento o in argomenti diversi e quindi eseguire il commit o interrompere la transazione.

Per assicurarsi che un producer sia transazionale, enable.idempotence deve essere impostato su true per assicurarsi che i dati vengano scritti esattamente una volta, evitando così duplicati sul lato di invio . Inoltre, transaction.id deve essere impostato per identificare in modo univoco il producer.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Dopo l'inizializzazione del producer, la chiamata seguente garantisce che il producer si registri con il broker come producer transazionale:

    producer.initTransactions();

Il producer deve quindi avviare una transazione in modo esplicito, eseguire operazioni di invio tra diversi argomenti e partizioni come di consueto e quindi eseguire il commit della transazione con la chiamata seguente:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Se la transazione deve essere interrotta a causa di un errore o di un timeout, il producer può chiamare il abortTransaction() metodo .

	producer.abortTransaction();

Semantica esattamente una volta

Esattamente una volta la semantica si basa sui producer transazionali aggiungendo consumer nell'ambito transazionale dei producer, in modo che ogni record sia garantito di essere letto, elaborato e scritto esattamente una volta.

Viene creata un'istanza del producer transazionale :


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Il consumer deve quindi essere configurato per leggere solo messaggi non transazionali o messaggi transazionali di cui è stato eseguito il commit impostando la proprietà seguente:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Dopo aver creato un'istanza del consumer, può sottoscrivere l'argomento da cui devono essere letti i record:


    consumer.subscribe(singleton("inputTopic"));

Dopo che il consumer esegue il polling dei record dall'argomento di input, il producer inizia l'ambito transazionale all'interno del quale viene elaborato e scritto nell'argomento di output. Dopo aver scritto i record, viene creata la mappa aggiornata degli offset per tutte le partizioni. Il producer invia quindi questo mapping di offset aggiornato alla transazione prima di eseguire il commit della transazione.

In qualsiasi eccezione, la transazione viene interrotta e il producer ritenta nuovamente l'elaborazione in modo atomico.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Avviso

Se non viene eseguito il commit o l'interruzione della transazione prima di max.transaction.timeout.ms, la transazione verrà interrotta automaticamente da Hub eventi. Il valore predefinito max.transaction.timeout.ms è impostato su 15 minuti da Hub eventi, ma il producer può eseguirne l'override su un valore inferiore impostando la transaction.timeout.ms proprietà nelle proprietà di configurazione del producer.

Guida alla migrazione

Se si dispone di applicazioni Kafka esistenti che si vuole usare con Hub eventi di Azure, vedere la guida alla migrazione di Kafka per Hub eventi di Azure per raggiungere rapidamente il livello di esecuzione.

Passaggi successivi

Per altre informazioni su Hub eventi e Hub eventi per Kafka, vedere gli articoli seguenti: