.NET Aspire Apache Kafka integration
Includes: Hosting integration and Client integration
Apache Kafka is an open-source distributed event streaming platform. It's useful for building real-time data pipelines and streaming applications. The .NET Aspire Apache Kafka integration enables you to connect to existing Kafka instances, or create new instances from .NET with the docker.io/confluentinc/confluent-local
container image.
Hosting integration
The Apache Kafka hosting integration models a Kafka server as the KafkaServerResource type. To access this type, install the 📦 Aspire.Hosting.Kafka NuGet package in the app host project, then add it with the builder.
dotnet add package Aspire.Hosting.Kafka
For more information, see dotnet add package or Manage package dependencies in .NET applications.
Add Kafka server resource
In your app host project, call AddKafka on the builder
instance to add a Kafka server resource:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
When .NET Aspire adds a container image to the app host, as shown in the preceding example with the docker.io/confluentinc/confluent-local
image, it creates a new Kafka server instance on your local machine. A reference to your Kafka server (the kafka
variable) is added to the ExampleProject
. The Kafka server resource includes default ports
The WithReference method configures a connection in the ExampleProject
named "kafka"
. For more information, see Container resource lifecycle.
Tip
If you'd rather connect to an existing Kafka server, call AddConnectionString instead. For more information, see Reference existing resources.
Add Kafka UI
To add the Kafka UI to the Kafka server resource, call the WithKafkaUI method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
The Kafka UI is a free, open-source web UI to monitor and manage Apache Kafka clusters. .NET Aspire adds another container image docker.io/provectuslabs/kafka-ui
to the app host that runs the Kafka UI.
Change the Kafka UI host port
To change the Kafka UI host port, chain a call to the WithHostPort method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI()
.WithHostPort(9100);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
The Kafka UI is accessible at http://localhost:9100
in the preceding example.
Add Kafka server resource with data volume
To add a data volume to the Kafka server resource, call the WithDataVolume method on the Kafka server resource:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
The data volume is used to persist the Kafka server data outside the lifecycle of its container. The data volume is mounted at the /var/lib/kafka/data
path in the Kafka server container and when a name
parameter isn't provided, the name is generated at random. For more information on data volumes and details on why they're preferred over bind mounts, see Docker docs: Volumes.
Add Kafka server resource with data bind mount
To add a data bind mount to the Kafka server resource, call the WithDataBindMount method:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Tip
Data bind mounts have limited functionality compared to volumes, which offer better performance, portability, and security, making them more suitable for production environments. However, bind mounts allow direct access and modification of files on the host system, ideal for development and testing where real-time changes are needed.
Data bind mounts rely on the host machine's filesystem to persist the Kafka server data across container restarts. The data bind mount is mounted at the C:\Kafka\Data
on Windows (or /Kafka/Data
on Unix) path on the host machine in the Kafka server container. For more information on data bind mounts, see Docker docs: Bind mounts.
Hosting integration health checks
The Kafka hosting integration automatically adds a health check for the Kafka server resource. The health check verifies that a Kafka producer with the specified connection name is able to connect and persist a topic to the Kafka server.
The hosting integration relies on the 📦 AspNetCore.HealthChecks.Kafka NuGet package.
Client integration
To get started with the .NET Aspire Apache Kafka integration, install the 📦 Aspire.Confluent.Kafka NuGet package in the client-consuming project, that is, the project for the application that uses the Apache Kafka client.
dotnet add package Aspire.Confluent.Kafka
Add Kafka producer
In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue>
for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters are used by AddKafkaProducer
to create an instance of ProducerBuilder<TKey, TValue>
. This method also takes connection name parameter.
builder.AddKafkaProducer<string, string>("messaging");
You can then retrieve the IProducer<TKey, TValue>
instance using dependency injection. For example, to retrieve the producer from an IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
For more information on workers, see Worker services in .NET.
Add Kafka consumer
To register an IConsumer<TKey, TValue>
for use via the dependency injection container, call the AddKafkaConsumer extension method in the Program.cs file of your client-consuming project. The method takes two generic parameters corresponding to the type of the key and the type of the message to receive from the broker. These generic parameters are used by AddKafkaConsumer
to create an instance of ConsumerBuilder<TKey, TValue>
. This method also takes connection name parameter.
builder.AddKafkaConsumer<string, string>("messaging");
You can then retrieve the IConsumer<TKey, TValue>
instance using dependency injection. For example, to retrieve the consumer from an IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Add keyed Kafka producers or consumers
There might be situations where you want to register multiple producer or consumer instances with different connection names. To register keyed Kafka producers or consumers, call the appropriate API:
- AddKeyedKafkaProducer: Registers a keyed Kafka producer.
- AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.
For more information on keyed services, see .NET dependency injection: Keyed services.
Configuration
The .NET Aspire Apache Kafka integration provides multiple options to configure the connection based on the requirements and conventions of your project.
Use a connection string
When using a connection string from the ConnectionStrings
configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer()
or builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Then the connection string is retrieved from the ConnectionStrings
configuration section:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
The connection string value is set to the BootstrapServers
property of the produced IProducer<TKey, TValue>
or IConsumer<TKey, TValue>
instance. For more information, see BootstrapServers.
Use configuration providers
The .NET Aspire Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration by respectively using the Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
keys. The following snippet is an example of a appsettings.json file that configures some of the options:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
The Config
properties of both Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
configuration sections respectively bind to instances of ProducerConfig
and ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requires the ClientId
property to be set to let the broker track consumed message offsets.
For the complete Kafka client integration JSON schema, see Aspire.Confluent.Kafka/ConfigurationSchema.json.
Use inline delegates
There are several inline delegates available to configure various options.
ConfigureKafkaProducerSettings
and KafkaConsumerSettings
You can pass the Action<KafkaProducerSettings> configureSettings
delegate to set up some or all the options inline, for example to disable health checks from code:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
You can configure inline a consumer from code:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configure ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
To configure Confluent.Kafka
builders, pass an Action<ProducerBuilder<TKey, TValue>>
(or Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
When registering producers and consumers, if you need to access a service registered in the DI container, you can pass an Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
or Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
respectively:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Consider the following producer registration example:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
For more information, see ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
API documentation.
Client integration health checks
By default, .NET Aspire integrations enable health checks for all services. For more information, see .NET Aspire integrations overview.
The .NET Aspire Apache Kafka integration handles the following health check scenarios:
- Adds the
Aspire.Confluent.Kafka.Producer
health check when KafkaProducerSettings.DisableHealthChecks isfalse
. - Adds the
Aspire.Confluent.Kafka.Consumer
health check when KafkaConsumerSettings.DisableHealthChecks isfalse
. - Integrates with the
/health
HTTP endpoint, which specifies all registered health checks must pass for app to be considered ready to accept traffic.
Observability and telemetry
.NET Aspire integrations automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about integration observability and telemetry, see .NET Aspire integrations overview. Depending on the backing service, some integrations may only support some of these features. For example, some integrations support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.
Logging
The .NET Aspire Apache Kafka integration uses the following log categories:
Aspire.Confluent.Kafka
Tracing
The .NET Aspire Apache Kafka integration dos not emit distributed traces.
Metrics
The .NET Aspire Apache Kafka integration emits the following metrics using OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received