Aracılığıyla paylaş


Apache Kafka uygulamalarıyla JSON Şeması kullanma

Bu öğreticide, Event Hubs'da Azure Schema Registry kullanarak olayı seri hale getirmek ve seri durumdan çıkarmak için JSON Şemalarını kullandığınız bir senaryo boyunca size yol gösterilir.

Bu kullanım örneğinde Bir Kafka üretici uygulaması, Azure Schema Registry'de depolanan JSON şemasını kullanarak olayı seri hale getirerek Azure Event Hubs'daki bir Kafka konu/olay hub'ında yayımlar. Kafka tüketicisi Event Hubs'dan tükettiği olayları seri durumdan kaldırır. Bunun için olayın şema kimliğini ve Azure Schema Registry'de depolanan JSON şemasını kullanır. JSON şemasını kullanan Kafka uygulamaları için şema serileştirme/serileştirmeyi kaldırmayı gösteren diyagram.

Önkoşullar

Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:

Olay hub’ı oluşturma

Hızlı başlangıçtaki yönergeleri izleyin: Event Hubs ad alanı ve olay hub'ı oluşturmak için Event Hubs ad alanı ve olay hub'ı oluşturma. Ardından, Event Hubs ad alanınıza bir bağlantı dizesi almak için bağlantı dizesi alma başlığındaki yönergeleri izleyin.

Geçerli hızlı başlangıçta kullandığınız aşağıdaki ayarları not edin:

  • Event Hubs ad alanı için Bağlan ion dizesi
  • Olay hub'ının adı

Şema oluşturma

Şema grubu ve şema oluşturmak için Şema Kayıt Defteri'ni kullanarak şema oluşturma başlığındaki yönergeleri izleyin.

  1. Schema Registry portalını kullanarak contoso-sg adlı bir şema grubu oluşturun. Serileştirme türü olarak JSON Şeması'nı kullanın.

  2. Bu şema grubunda, aşağıdaki şema içeriğini kullanarak şema adıyla Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice yeni bir JSON şeması oluşturun.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Şema kayıt defterine erişmek için bir uygulama kaydetme

Kafka üretici ve tüketici uygulamanızı Azure Schema Registry kaynaklarına erişme yetkisi vermek için Microsoft Entra Id kullanabilirsiniz. Bunu etkinleştirmek için istemci uygulamanızı Azure portalından bir Microsoft Entra kiracısına kaydetmeniz gerekir.

adlı example-app bir Microsoft Entra uygulamasını kaydetmek için bkz . Uygulamanızı Bir Microsoft Entra kiracısına kaydetme.

  • tenant.id - uygulamanın kiracı kimliğini ayarlar
  • client.id - uygulamanın istemci kimliğini ayarlar
  • client.secret - kimlik doğrulaması için istemci gizli dizisini ayarlar

Yönetilen kimlik kullanıyorsanız şunları yapmanız gerekir:

  • use.managed.identity.credential - MSI kimlik bilgilerinin kullanılması gerektiğini, MSI özellikli VM için kullanılması gerektiğini gösterir
  • managed.identity.clientId - belirtilirse, belirtilen istemci kimliği managed.identity.resourceId ile MSI kimlik bilgilerini oluşturur - belirtilirse, belirtilen kaynak kimliğiyle MSI kimlik bilgileri oluşturur

Şema Kayıt Defteri Okuyucusu rolüne kullanıcı ekleme

Kullanıcı hesabınızı ad alanı düzeyinde Şema Kayıt Defteri Okuyucusu rolüne ekleyin. Şema Kayıt Defteri Katkıda Bulunanı rolünü de kullanabilirsiniz, ancak bu hızlı başlangıç için gerekli değildir.

  1. Event Hubs Ad Alanı sayfasında, soldaki menüden Erişim denetimi (IAM) öğesini seçin.
  2. Erişim denetimi (IAM) sayfasında, menüde + Ekle ->Rol ataması ekle'yi seçin.
  3. Atama türü sayfasında İleri'yi seçin.
  4. Roller sayfasında Şema Kayıt Defteri Okuyucusu'na tıklayın ve ardından sayfanın alt kısmındaki İleri'ye tıklayın.
  5. Önceki adımda oluşturduğunuz uygulamayı role eklemek example-app için + Üye seç bağlantısını kullanın ve ardından İleri'yi seçin.
  6. Gözden geçir ve ata sayfasında Gözden geçir ve ata'yı seçin.

Kafka uygulamalarının istemci uygulama yapılandırmasını güncelleştirme

Kafka üretici ve tüketici uygulamalarının istemci yapılandırmasını Microsoft Entra uygulama ayrıntılarıyla ve şema kayıt defteri bilgileriyle güncelleştirmeniz gerekir.

Kafka Üretici yapılandırmasını güncelleştirmek için azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer adresine gidin.

  1. Event Hubs için Kafka Hızlı Başlangıç kılavuzunu izleyerek src/main/resources/app.properties içindeki Kafka uygulamasının yapılandırmasını güncelleştirin.

  2. Şema kayıt defteriyle ilgili yapılandırmayı ve önceki adımda oluşturduğunuz Microsoft Entra uygulamasını kullanarak src/main/resources/app.properties içindeki üreticinin yapılandırma ayrıntılarını aşağıdaki gibi güncelleştirin:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Aynı yönergeleri izleyin ve azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer yapılandırmasını da güncelleştirin.

  4. Hem Kafka üretici hem de tüketici uygulamaları için aşağıdaki JSON şeması kullanılır:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

JSON şema doğrulaması ile Kafka üreticisini kullanma

Kafka üretici uygulamasını çalıştırmak için azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer adresine gidin.

  1. JSON Şemasına özgü kayıtlar veya genel kayıtlar üretebilmesi için üretici uygulamasını çalıştırabilirsiniz. Belirli kayıt modu için önce aşağıdaki maven komutunu kullanarak üretici şemasında sınıfları oluşturmanız gerekir:

    mvn generate-sources
    
  2. Ardından aşağıdaki komutları kullanarak üretici uygulamasını çalıştırabilirsiniz.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. Üretici uygulamasının başarıyla yürütülmesinden sonra, üretici senaryosunu seçmeniz istenir. Bu hızlı başlangıç için 1 seçeneğini belirleyebilirsiniz: SpecificRecords üretin.

    Enter case number:
    1 - produce SpecificRecords
    
  4. Başarılı veri serileştirme ve yayımlama sonrasında üretici uygulamanızda aşağıdaki konsol günlüklerini görmeniz gerekir:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

JSON şema doğrulaması ile Kafka tüketicisi kullanma

Kafka tüketici uygulamasını çalıştırmak için azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer adresine gidin.

  1. JSON Şemasına özgü kayıtları veya genel kayıtları tüketmesi için tüketici uygulamasını çalıştırabilirsiniz. Belirli kayıt modu için önce aşağıdaki maven komutunu kullanarak üretici şemasında sınıfları oluşturmanız gerekir:

    mvn generate-sources
    
  2. Ardından aşağıdaki komutu kullanarak tüketici uygulamasını çalıştırabilirsiniz.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. Tüketici uygulamasının başarıyla yürütülmesinden sonra, üretici senaryosunu seçmeniz istenir. Bu hızlı başlangıç için 1 - SpecificRecords'u kullanma seçeneğini belirleyebilirsiniz.

    Enter case number:
    1 - consume SpecificRecords
    
  4. Başarılı veri tüketimi ve seri durumdan çıkarma işleminden sonra üretici uygulamanızda aşağıdaki konsol günlüklerini görmeniz gerekir:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Kaynakları temizleme

Event Hubs ad alanını silin veya ad alanını içeren kaynak grubunu silin.