Produced and consumed using Apache Kafka
Table of contents
- Step 1: Set Up Your Development Environment
- Step 2: Create a New .NET Core Project
- Step 3: Add Required NuGet Packages
- Step 4: Configure SQL Server Database
- Step 5: Implement Kafka Producer
- Step 6: Implement Kafka Consumer
- Step 7: Create API Endpoints
- Step 8: Run Migrations and Update Database
- Step 9: Run the Application
- Integrating with Azure
Creating a Kafka event-driven project involves setting up a system where events are produced and consumed using Apache Kafka. Here's a step-by-step guide to help you create a Kafka event-driven project using .NET Core:
Step 1: Set Up Your Development Environment
Install .NET Core SDK: Make sure you have the .NET Core SDK installed. You can download it from the .NET website.
Install Kafka: Download and install Apache Kafka from the Kafka website. Follow the instructions to start the Kafka server and Zookeeper.
Step 2: Create a New .NET Core Project
Open your terminal or command prompt.
Create a new .NET Core Web API project:
dotnet new webapi -n KafkaEventDrivenProject cd KafkaEventDrivenProject
Step 3: Add Required NuGet Packages
Add the necessary NuGet packages for Kafka and SQL Server:
dotnet add package Confluent.Kafka
dotnet add package Microsoft.EntityFrameworkCore.SqlServer
dotnet add package Microsoft.EntityFrameworkCore.Tools
Step 4: Configure SQL Server Database
Create the
EventMessage
model:public class EventMessage { public int Id { get; set; } public string Topic { get; set; } public string Message { get; set; } public DateTime Timestamp { get; set; } }
Create the
AppDbContext
class:using Microsoft.EntityFrameworkCore; public class AppDbContext : DbContext { public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { } public DbSet<EventMessage> EventMessages { get; set; } }
Configure the connection string in
appsettings.json
:{ "ConnectionStrings": { "DefaultConnection": "Server=your_server;Database=your_database;User Id=your_user;Password=your_password;" } }
Update
Startup.cs
to add the DbContext:public void ConfigureServices(IServiceCollection services) { services.AddDbContext<AppDbContext>(options => options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"))); services.AddControllers(); }
Step 5: Implement Kafka Producer
Create the
KafkaProducer
class:using Confluent.Kafka; using System.Threading.Tasks; public class KafkaProducer { private readonly IProducer<string, string> _producer; private readonly string _topic; public KafkaProducer(IConfiguration configuration) { var config = new ProducerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"] }; _producer = new ProducerBuilder<string, string>(config).Build(); _topic = configuration["Kafka:Topic"]; } public async Task ProduceAsync(string message) { await _producer.ProduceAsync(_topic, new Message<string, string> { Value = message }); } }
Step 6: Implement Kafka Consumer
Create the
KafkaConsumer
class:using Confluent.Kafka; using System; using System.Threading; using System.Threading.Tasks; public class KafkaConsumer { private readonly IConsumer<string, string> _consumer; private readonly AppDbContext _context; public KafkaConsumer(IConfiguration configuration, AppDbContext context) { var config = new ConsumerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"], GroupId = configuration["Kafka:GroupId"], AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<string, string>(config).Build(); _context = context; } public void Consume(CancellationToken cancellationToken) { _consumer.Subscribe(_context.Configuration["Kafka:Topic"]); try { while (true) { var consumeResult = _consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'."); // Save the consumed message to the database var eventMessage = new EventMessage { Topic = consumeResult.Topic, Message = consumeResult.Message.Value, Timestamp = DateTime.UtcNow }; _context.EventMessages.Add(eventMessage); _context.SaveChanges(); } } catch (OperationCanceledException) { _consumer.Close(); } } }
Step 7: Create API Endpoints
Create the
KafkaController
class:using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; using System.Linq; [Route("api/[controller]")] [ApiController] public class KafkaController : ControllerBase { private readonly KafkaProducer _producer; private readonly KafkaConsumer _consumer; private readonly AppDbContext _context; public KafkaController(KafkaProducer producer, KafkaConsumer consumer, AppDbContext context) { _producer = producer; _consumer = consumer; _context = context; } [HttpPost("publish")] public async Task<IActionResult> Publish([FromBody] string message) { await _producer.ProduceAsync(message); return Ok(); } [HttpGet("consume")] public IActionResult Consume() { var cancellationTokenSource = new CancellationTokenSource(); _consumer.Consume(cancellationTokenSource.Token); return Ok(); } [HttpGet("events")] public IActionResult GetEvents() { var events = _context.EventMessages.ToList(); return Ok(events); } [HttpGet("events/{id}")] public IActionResult GetEventById(int id) { var eventMessage = _context.EventMessages.Find(id); if (eventMessage == null) { return NotFound(); } return Ok(eventMessage); } [HttpPut("events/{id}")] public IActionResult UpdateEvent(int id, [FromBody] EventMessage updatedEvent) { var eventMessage = _context.EventMessages.Find(id); if (eventMessage == null) { return NotFound(); } eventMessage.Topic = updatedEvent.Topic; eventMessage.Message = updatedEvent.Message; eventMessage.Timestamp = updatedEvent.Timestamp; _context.EventMessages.Update(eventMessage); _context.SaveChanges(); return NoContent(); } [HttpDelete("events/{id}")] public IActionResult DeleteEvent(int id) { var eventMessage = _context.EventMessages.Find(id); if (eventMessage == null) { return NotFound(); } _context.EventMessages.Remove(eventMessage); _context.SaveChanges(); return NoContent(); } }
Step 8: Run Migrations and Update Database
Run the following commands to create and apply migrations:
dotnet ef migrations add InitialCreate
dotnet ef database update
Step 9: Run the Application
Run the application:
dotnet run
Integrating with Azure
To integrate with Azure, you can use Azure Event Hubs as a Kafka-compatible endpoint. Follow the steps mentioned in the previous response to set up Azure Event Hubs and configure your application to use it.