Produced and consumed using Apache Kafka

Creating a Kafka event-driven project involves setting up a system where events are produced and consumed using Apache Kafka. Here's a step-by-step guide to help you create a Kafka event-driven project using .NET Core:

Step 1: Set Up Your Development Environment

  1. Install .NET Core SDK: Make sure you have the .NET Core SDK installed. You can download it from the .NET website.

  2. Install Kafka: Download and install Apache Kafka from the Kafka website. Follow the instructions to start the Kafka server and Zookeeper.

Step 2: Create a New .NET Core Project

  1. Open your terminal or command prompt.

  2. Create a new .NET Core Web API project:

     dotnet new webapi -n KafkaEventDrivenProject
     cd KafkaEventDrivenProject
    

Step 3: Add Required NuGet Packages

Add the necessary NuGet packages for Kafka and SQL Server:

dotnet add package Confluent.Kafka
dotnet add package Microsoft.EntityFrameworkCore.SqlServer
dotnet add package Microsoft.EntityFrameworkCore.Tools

Step 4: Configure SQL Server Database

  1. Create the EventMessage model:

     public class EventMessage
     {
         public int Id { get; set; }
         public string Topic { get; set; }
         public string Message { get; set; }
         public DateTime Timestamp { get; set; }
     }
    
  2. Create the AppDbContext class:

     using Microsoft.EntityFrameworkCore;
    
     public class AppDbContext : DbContext
     {
         public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
    
         public DbSet<EventMessage> EventMessages { get; set; }
     }
    
  3. Configure the connection string in appsettings.json:

     {
         "ConnectionStrings": {
             "DefaultConnection": "Server=your_server;Database=your_database;User Id=your_user;Password=your_password;"
         }
     }
    
  4. Update Startup.cs to add the DbContext:

     public void ConfigureServices(IServiceCollection services)
     {
         services.AddDbContext<AppDbContext>(options =>
             options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));
         services.AddControllers();
     }
    

Step 5: Implement Kafka Producer

  1. Create the KafkaProducer class:

     using Confluent.Kafka;
     using System.Threading.Tasks;
    
     public class KafkaProducer
     {
         private readonly IProducer<string, string> _producer;
         private readonly string _topic;
    
         public KafkaProducer(IConfiguration configuration)
         {
             var config = new ProducerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"] };
             _producer = new ProducerBuilder<string, string>(config).Build();
             _topic = configuration["Kafka:Topic"];
         }
    
         public async Task ProduceAsync(string message)
         {
             await _producer.ProduceAsync(_topic, new Message<string, string> { Value = message });
         }
     }
    

Step 6: Implement Kafka Consumer

  1. Create the KafkaConsumer class:

     using Confluent.Kafka;
     using System;
     using System.Threading;
     using System.Threading.Tasks;
    
     public class KafkaConsumer
     {
         private readonly IConsumer<string, string> _consumer;
         private readonly AppDbContext _context;
    
         public KafkaConsumer(IConfiguration configuration, AppDbContext context)
         {
             var config = new ConsumerConfig
             {
                 BootstrapServers = configuration["Kafka:BootstrapServers"],
                 GroupId = configuration["Kafka:GroupId"],
                 AutoOffsetReset = AutoOffsetReset.Earliest
             };
             _consumer = new ConsumerBuilder<string, string>(config).Build();
             _context = context;
         }
    
         public void Consume(CancellationToken cancellationToken)
         {
             _consumer.Subscribe(_context.Configuration["Kafka:Topic"]);
             try
             {
                 while (true)
                 {
                     var consumeResult = _consumer.Consume(cancellationToken);
                     Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
    
                     // Save the consumed message to the database
                     var eventMessage = new EventMessage
                     {
                         Topic = consumeResult.Topic,
                         Message = consumeResult.Message.Value,
                         Timestamp = DateTime.UtcNow
                     };
                     _context.EventMessages.Add(eventMessage);
                     _context.SaveChanges();
                 }
             }
             catch (OperationCanceledException)
             {
                 _consumer.Close();
             }
         }
     }
    

Step 7: Create API Endpoints

  1. Create the KafkaController class:

     using Microsoft.AspNetCore.Mvc;
     using System.Threading.Tasks;
     using System.Linq;
    
     [Route("api/[controller]")]
     [ApiController]
     public class KafkaController : ControllerBase
     {
         private readonly KafkaProducer _producer;
         private readonly KafkaConsumer _consumer;
         private readonly AppDbContext _context;
    
         public KafkaController(KafkaProducer producer, KafkaConsumer consumer, AppDbContext context)
         {
             _producer = producer;
             _consumer = consumer;
             _context = context;
         }
    
         [HttpPost("publish")]
         public async Task<IActionResult> Publish([FromBody] string message)
         {
             await _producer.ProduceAsync(message);
             return Ok();
         }
    
         [HttpGet("consume")]
         public IActionResult Consume()
         {
             var cancellationTokenSource = new CancellationTokenSource();
             _consumer.Consume(cancellationTokenSource.Token);
             return Ok();
         }
    
         [HttpGet("events")]
         public IActionResult GetEvents()
         {
             var events = _context.EventMessages.ToList();
             return Ok(events);
         }
    
         [HttpGet("events/{id}")]
         public IActionResult GetEventById(int id)
         {
             var eventMessage = _context.EventMessages.Find(id);
             if (eventMessage == null)
             {
                 return NotFound();
             }
             return Ok(eventMessage);
         }
    
         [HttpPut("events/{id}")]
         public IActionResult UpdateEvent(int id, [FromBody] EventMessage updatedEvent)
         {
             var eventMessage = _context.EventMessages.Find(id);
             if (eventMessage == null)
             {
                 return NotFound();
             }
    
             eventMessage.Topic = updatedEvent.Topic;
             eventMessage.Message = updatedEvent.Message;
             eventMessage.Timestamp = updatedEvent.Timestamp;
    
             _context.EventMessages.Update(eventMessage);
             _context.SaveChanges();
    
             return NoContent();
         }
    
         [HttpDelete("events/{id}")]
         public IActionResult DeleteEvent(int id)
         {
             var eventMessage = _context.EventMessages.Find(id);
             if (eventMessage == null)
             {
                 return NotFound();
             }
    
             _context.EventMessages.Remove(eventMessage);
             _context.SaveChanges();
    
             return NoContent();
         }
     }
    

Step 8: Run Migrations and Update Database

Run the following commands to create and apply migrations:

dotnet ef migrations add InitialCreate
dotnet ef database update

Step 9: Run the Application

Run the application:

dotnet run

Integrating with Azure

To integrate with Azure, you can use Azure Event Hubs as a Kafka-compatible endpoint. Follow the steps mentioned in the previous response to set up Azure Event Hubs and configure your application to use it.