Bearbeiten

Freigeben über


.NET Aspire Apache Kafka integration

Includes: Hosting integration and Client integration

Apache Kafka is an open-source distributed event streaming platform. It's useful for building real-time data pipelines and streaming applications. The .NET Aspire Apache Kafka integration enables you to connect to existing Kafka instances, or create new instances from .NET with the docker.io/confluentinc/confluent-local container image.

Hosting integration

The Apache Kafka hosting integration models a Kafka server as the KafkaServerResource type. To access this type, install the 📦 Aspire.Hosting.Kafka NuGet package in the app host project, then add it with the builder.

dotnet add package Aspire.Hosting.Kafka

For more information, see dotnet add package or Manage package dependencies in .NET applications.

Add Kafka server resource

In your app host project, call AddKafka on the builder instance to add a Kafka server resource:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

When .NET Aspire adds a container image to the app host, as shown in the preceding example with the docker.io/confluentinc/confluent-local image, it creates a new Kafka server instance on your local machine. A reference to your Kafka server (the kafka variable) is added to the ExampleProject. The Kafka server resource includes default ports

The WithReference method configures a connection in the ExampleProject named "kafka". For more information, see Container resource lifecycle.

Tip

If you'd rather connect to an existing Kafka server, call AddConnectionString instead. For more information, see Reference existing resources.

Add Kafka UI

To add the Kafka UI to the Kafka server resource, call the WithKafkaUI method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The Kafka UI is a free, open-source web UI to monitor and manage Apache Kafka clusters. .NET Aspire adds another container image docker.io/provectuslabs/kafka-ui to the app host that runs the Kafka UI.

Change the Kafka UI host port

To change the Kafka UI host port, chain a call to the WithHostPort method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI()
                   .WithHostPort(9100);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The Kafka UI is accessible at http://localhost:9100 in the preceding example.

Add Kafka server resource with data volume

To add a data volume to the Kafka server resource, call the WithDataVolume method on the Kafka server resource:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

The data volume is used to persist the Kafka server data outside the lifecycle of its container. The data volume is mounted at the /var/lib/kafka/data path in the Kafka server container and when a name parameter isn't provided, the name is generated at random. For more information on data volumes and details on why they're preferred over bind mounts, see Docker docs: Volumes.

Add Kafka server resource with data bind mount

To add a data bind mount to the Kafka server resource, call the WithDataBindMount method:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Tip

Data bind mounts have limited functionality compared to volumes, which offer better performance, portability, and security, making them more suitable for production environments. However, bind mounts allow direct access and modification of files on the host system, ideal for development and testing where real-time changes are needed.

Data bind mounts rely on the host machine's filesystem to persist the Kafka server data across container restarts. The data bind mount is mounted at the C:\Kafka\Data on Windows (or /Kafka/Data on Unix) path on the host machine in the Kafka server container. For more information on data bind mounts, see Docker docs: Bind mounts.

Hosting integration health checks

The Kafka hosting integration automatically adds a health check for the Kafka server resource. The health check verifies that a Kafka producer with the specified connection name is able to connect and persist a topic to the Kafka server.

The hosting integration relies on the 📦 AspNetCore.HealthChecks.Kafka NuGet package.

Client integration

To get started with the .NET Aspire Apache Kafka integration, install the 📦 Aspire.Confluent.Kafka NuGet package in the client-consuming project, that is, the project for the application that uses the Apache Kafka client.

dotnet add package Aspire.Confluent.Kafka

Add Kafka producer

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters are used by AddKafkaProducer to create an instance of ProducerBuilder<TKey, TValue>. This method also takes connection name parameter.

builder.AddKafkaProducer<string, string>("messaging");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection. For example, to retrieve the producer from an IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

For more information on workers, see Worker services in .NET.

Add Kafka consumer

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method in the Program.cs file of your client-consuming project. The method takes two generic parameters corresponding to the type of the key and the type of the message to receive from the broker. These generic parameters are used by AddKafkaConsumer to create an instance of ConsumerBuilder<TKey, TValue>. This method also takes connection name parameter.

builder.AddKafkaConsumer<string, string>("messaging");

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection. For example, to retrieve the consumer from an IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Add keyed Kafka producers or consumers

There might be situations where you want to register multiple producer or consumer instances with different connection names. To register keyed Kafka producers or consumers, call the appropriate API:

For more information on keyed services, see .NET dependency injection: Keyed services.

Configuration

The .NET Aspire Apache Kafka integration provides multiple options to configure the connection based on the requirements and conventions of your project.

Use a connection string

When using a connection string from the ConnectionStrings configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer() or builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Then the connection string is retrieved from the ConnectionStrings configuration section:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

The connection string value is set to the BootstrapServers property of the produced IProducer<TKey, TValue> or IConsumer<TKey, TValue> instance. For more information, see BootstrapServers.

Use configuration providers

The .NET Aspire Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration by respectively using the Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer keys. The following snippet is an example of a appsettings.json file that configures some of the options:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

The Config properties of both Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer configuration sections respectively bind to instances of ProducerConfig and ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.

For the complete Kafka client integration JSON schema, see Aspire.Confluent.Kafka/ConfigurationSchema.json.

Use inline delegates

There are several inline delegates available to configure various options.

ConfigureKafkaProducerSettings and KafkaConsumerSettings

You can pass the Action<KafkaProducerSettings> configureSettings delegate to set up some or all the options inline, for example to disable health checks from code:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

You can configure inline a consumer from code:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configure ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue>

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>> (or Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

When registering producers and consumers, if you need to access a service registered in the DI container, you can pass an Action<IServiceProvider, ProducerBuilder<TKey, TValue>> or Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> respectively:

Consider the following producer registration example:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

For more information, see ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> API documentation.

Client integration health checks

By default, .NET Aspire integrations enable health checks for all services. For more information, see .NET Aspire integrations overview.

The .NET Aspire Apache Kafka integration handles the following health check scenarios:

Observability and telemetry

.NET Aspire integrations automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about integration observability and telemetry, see .NET Aspire integrations overview. Depending on the backing service, some integrations may only support some of these features. For example, some integrations support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.

Logging

The .NET Aspire Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

Tracing

The .NET Aspire Apache Kafka integration dos not emit distributed traces.

Metrics

The .NET Aspire Apache Kafka integration emits the following metrics using OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

See also