作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。
因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:
接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。
2 基于Confluent.Kafka的示例要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。
安装相关组件在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:
PM>Install-Package Confluent.Kafka编写KafkaService
编写IKafkaService接口:
namespace EDT.Kafka.Core { public interface IKafkaService { Task PublishAsync<T>(string topicName, T message) where T : class; Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class; } }
编写KafkaService实现类:
namespace EDT.Kafka.Core { public class KafkaService : IKafkaService { public static string KAFKA_SERVERS = "127.0.0.1:9091"; public async Task PublishAsync<T>(string topicName, T message) where T : class { var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS, BatchSize = 16384, // 修改批次大小为16K LingerMs = 20 // 修改等待时间为20ms }; using (var producer = new ProducerBuilder<string, string>(config).Build()) { await producer.ProduceAsync(topicName, new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = JsonConvert.SerializeObject(message) }); ; } } public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class { var config = new ConsumerConfig { BootstrapServers = KAFKA_SERVERS, GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommit Acks = Acks.Leader, // 假设只需要Leader响应即可 AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起 }; using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) { consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } T messageResult = null; try { messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value); } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } } await Task.CompletedTask; } } }
为了方便后续的演示,在此项目中再创建一个类 EventData:
public class EventData { public string TopicName { get; set; } public string Message { get; set; } public DateTime EventTime { get; set; } }编写Producer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:
namespace EDT.Kafka.Demo.Producer { public class Program { static async Task Main(string[] args) { KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093"; var kafkaService = new KafkaService(); for (int i = 0; i < 50; i++) { var eventData = new EventData { TopicName = "testtopic", Message = $"This is a message from Producer, Index : {i + 1}", EventTime = DateTime.Now }; await kafkaService.PublishAsync(eventData.TopicName, eventData); } Console.WriteLine("Publish Done!"); Console.ReadKey(); } } }编写Consumer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:
namespace EDT.Kafka.Demo.Consumer { public class Program { static async Task Main(string[] args) { KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093"; var kafkaService = new KafkaService(); var topics = new List<string> { "testtopic" }; await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => { Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理"); }); } } }测试Pub/Sub效果
将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。
3 基于DotNetCore.CAP的示例 模拟场景说明假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。
Catalog API新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:
PM>Install Package DotNetCore.CAP PM>Install Package DotNetCore.CAP.MongoDB PM>Install Package DotNetCore.CAP.Kafka
在Startup中的ConfigureServices方法中注入CAP:
public void ConfigureServices(IServiceCollection services) { ...... services.AddCap(x => { x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin"); x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093"); }); }
新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:
namespace EDT.Demo.Catalog.API.Controllers { [ApiController] [Route("[controller]")] public class ProductController : ControllerBase { private static readonly IList<Product> Products = new List<Product> { new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M, Introduction = "暂无介绍" }, new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M, Introduction = "暂无介绍" }, new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M, Introduction = "暂无介绍" }, new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M, Introduction = "暂无介绍" }, new Product { Id = "0005", Name = "电视机A", Price = 1899.90M, Introduction = "暂无介绍" }, }; private readonly ICapPublisher _publisher; private readonly IMapper _mapper; public ProductController(ICapPublisher publisher, IMapper mapper) { _publisher = publisher; _mapper = mapper; } [HttpGet] public IList<ProductDTO> Get() { return _mapper.Map<IList<ProductDTO>>(Products); ; } [HttpPut] public async Task<IActionResult> UpdatePrice(string id, decimal newPrice) { // 业务代码 var product = Products.FirstOrDefault(p => p.Id == id); product.Price = newPrice; // 发布消息 await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price}); return NoContent(); } } }Basket API
参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。
namespace EDT.Demo.Basket.API.Controllers { [ApiController] [Route("[controller]")] public class BasketController : ControllerBase { private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO> { new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog> { new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 }, new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 }, } }, new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog> { new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 }, new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 }, } } }; [HttpGet] public IList<MyBasketDTO> Get() { return Baskets; } [NonAction] [CapSubscribe("ProductPriceChanged")] public async Task RefreshBasketProductPrice(ProductDTO productDTO) { if (productDTO == null) return; foreach (var basket in Baskets) { foreach (var catalog in basket.Catalogs) { if (catalog.Product.Id == productDTO.Id) { catalog.Product.Price = productDTO.Price; break; } } } await Task.CompletedTask; } } }测试效果
同时启动Catalog API 和 Basket API两个项目。首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。
最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。
4 总结本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。
参考资料阿星Plus,《.NET Core下使用Kafka》
麦比乌斯皇,《.NET使用Kafka小结》
极客时间,胡夕《Kafka核心技术与实战》
B站,尚硅谷《Kafka 3.x入门到精通教程》