探索生活百科

Kafka入门实战教程(7):Kafka Streams

探索生活百科

1 关于流处理

流系统是数据处理引擎,处理无限数据集(Unbounded Dataset),流处理与批处理相关。 对应于批处理。所谓无线数据,是指永无止境的数据。流处理平台是专门处理这类数据集的系统或框架。下图形象地展示了流处理和批处理的区别:

总体而言,流处理给人的印象是低延迟,但结果可能不太精确。另一方面,批处理可以提供准确的结果,但经常会出现混乱。

最简单的Streaming结构如下图:

从一个Topic读取数据,经过一些处理操作后,写入到另一个Topic。好了,这就是最简单的Streaming计算了。其中,Source Topic中的数据会不断产生新的数据。

那么,我们对上面的结构进行扩展。假设定义了多个Source Topic和Destination Topic,将会形成一个更复杂的拓扑,如下图所示:

2 关于Kafka Streams

近年来,开源流处理领域涌现出许多优秀的框架。仅Apache基金会孵化的项目中,就有十几个流处理的大数据框架,比如早期的Apache Samza、Apache Storm,以及近年来流行的Spark、Flink等。

Kafka Streams的特点

与其他流处理平台相比,Kafka Streams最大的特点是它不是一个平台,至少它不是一个功能齐全(Full-Fledged)的平台,比如那些其他框架附带的调度器和资源管理器不是 Kafka Streams 提供的。 Kafka官网明确将Kafka Streams定义为客户端库。我们可以使用这个库来构建高度可扩展、弹性和容错的分布式应用程序和微服务。使用Kafka Streams API构建的应用程序是一个普通的应用程序。我们可以选择任何熟悉的技术或框架来编译、打包、部署并上线。遗憾的是,目前除 Java 之外的其他主流开发语言的 SDK 上均未提供 Kafka Streams。 Kafka Streams最大的特点就是上下游数据源的限制。目前,Kafka Streams仅支持与Kafka集群交互,并且不提供开箱即用的外部数据源连接器。

Kafka Streams应用执行

Kafka Streams号称实现了Exactly Once Semantics(EOS,下文将使用EOS的缩写)。所谓EOS是指一条消息或事件只影响应用程序状态一次。 。事实上,对于Kafka Streams来说,它天然支持端到端的EOS,因为它本质上与Kafka紧密相连。下图展示了一个典型的Kafka Streams应用程序的执行逻辑:

通常,Kafka Streams 需要 5 个步骤:

  • 读取最新处理的消息位移;

  • 读取消息数据;

  • 执行处理逻辑;

  • 将处理结果写回Kafka;

  • 保存位置信息。

这五个步骤的执行必须是原子的,否则无法实现精确的一次性处理语义。在设计上,Kafka Streams利用Kafka的事务机制和底层的幂等Producer来实现多分区写入。而且由于只能读写Kafka,Kafka Streams可以轻松实现端到端的EOS。 。

3 Kafka Streams客户端

Confluence.Kafka是.NET圈目前主流的Kafka客户端,不提供Streams功能。事实上,目前Kafka Streams只提供了Java客户端上的Streams功能,并没有其他语言提供。

画外音:毕竟Kafka是用JVM语言(Scala+Java)编写的,Java是嫡系,是一等公民。

那么Confluence.Kafka团队有计划提供这个功能吗?我在issue list里找到了一些评论,结果是目前没有这方面的计划,涉及的工作量太大了,WTF。那么,.NET 真的没有可用的 Kafka Streams 客户端吗?事实上,是的,我在Confluence.Kafka的issue内容中发现了以下Kafka Streams客户端:m.scooter-sidecars.com。

m.scooter-sidecars.com:https://m.scooter-sidecars.com/LGouellec/kafka-streams-dotnet

目前m.scooter-sidecars.com项目还处于不断开发和完善的阶段。星星数为278,估计不能直接用于生产环境,但可以用于学习和实践。最新版本是:1.3.0。其实m.scooter-sidecars.com也是基于Confluence.Kafka开发的,相当于对Confluence.Kafka做了一些DSL扩展。其接口名称和用法与Java API几乎相同。

4 第一篇 Streaming 应用

如果您不了解 Streaming 的概念,建议先阅读上一篇文章。

应用程序部分

首先,创建 .NET Core 或 .NET 5/6 控制台应用程序。

然后,通过 Nuget 安装 m.scooter-sidecars.com 包:

PM>Install-Package m.scooter-sidecars.com

然后,开始编写你的第一个流应用程序:

使用m.scooter-sidecars.com;使用Streamiz.Kafka.Net.SerDes;使用m.scooter-sidecars.com;使用 Streamiz.Kafka.Net.Table;使用系统;使用System.Threading.Tasks;命名空间EDT.Kafka.Streams.Demo{public  班级 计划 { 公共 静态 a同步 任务主(string[] args) {  //流配置 var config = new StreamConfig(); config.ApplicationId = \"test-streams-app\"; config.BootstrapServers = \"kafka1:9091,kafka2:9092,kafka3:9093\"; StreamBuilder 构建器 = new StreamBuilder(); // 使用 filterNot 条件流式传输 \"test-stream-input\" 主题,并保留在 \"test-stream-output\" 主题中。 m.scooter-sidecars.com<string , 字符串>(\"测试流输入\") .FilterNot((k, v) => v.Contains(\"测试 \ ")) .To(\"测试流输出\"); // 创建一个包含“test-ktable”主题的表,并在名为“test-store”的内存存储中实现该表 builder.Table(\"测试流-ktable\",内存中<字符串,字符串>.As( \"测试流存储\")); //构建拓扑拓扑t = m.scooter-sidecars.com(); // 创建具有拓扑和配置的流实例 KafkaStream Stream = new KafkaStream(t, config); //订阅CTRL + C退出流应用程序 Console.CancelKeyPress += (o, e) => {stream.Dispose(); }; // 使用可取消令牌启动流实例 awaitstream.StartAsync(); } }}

这个示例流应用程序非常简单。它实现了最简单的处理流程如下图所示:

Source Topic 为 test-stream-input,Destination Topic 为 test-stream-output,分别对应输入源和输出地。在处理过程中,将创建一个名为 test-stream-ktable 的表,它将作为输入流和输出流之间的中间状态。在Kafka Streams中,流被聚合成时间维度的表,表不断更新成时间维度的流。也就是说,表会被转换成流,流又会被转换成表,如此循环往复,完成所谓的流式计算。

这个test-stream-ktable将被存储在内存中一个名为test-stream-kstore的区域中。这足以让我们明白。最后回到最关键的那句代码,如下图。在处理输入源时,使用DSL进行快速过滤,即判断输入消息中是否包含字符串test。如果是,则不会被过滤。如果没有,则会进行处理,即传递给test-stream-output。 。

最后回到最关键的那行代码,如下图。在处理输入源时,使用DSL进行快速过滤,即判断输入消息中是否包含字符串test。如果是,则不会被过滤。如果没有,则会进行处理,即传递给test-stream-output。 。

m.scooter-sidecars.com<string, string>(\"测试流输入\") .FilterNot(( k, v) => v.包含(\"测试\")) .To(\"测试流输出\");

Broker部分

为了完成这个demo,我们提前在Kafka Broker端创建了下图红线框内的几个topic。

为了演示和验证方便,我们暂时将它们设置为单个分区,不再额外复制。

测试效果

首先我们启动.NET控制台程序。

然后,我们在Broker端打开一个Producer命令行,依次手动输入一些数据源:

# kafka-console- m.scooter-sidecars.com --topic=test-stream-input --broker-list kafka1:9091,kafka2:9092,kafka3:9093>哈哈   > test9898>xifejlrkewl>xjkfldsjoifdsfjods >xjoijfosifjlkdsjflkds> jlfjdslkjdslfjds> ,输入数据源包含3 条包含 test 关键字的字符串消息。期望的结果是,在 Streams 应用程序处理逻辑中,这 3 条消息被过滤掉,其余消息被处理并传递到输出。 

然后,我们可以使用Kafka Tool查看两个主题输入和输出的数据:

(1)测试流输入

(2)测试流输出

可以看到test-stream-output中没有包含test关键字的消息,第一个Streaming应用程序运行成功。

5 经典WordCount应用程序

所谓wordcount就是一款经典的字数统计应用程序,可以统计每个单词在指定数据源中出现的次数。在Streaming流式计算和MapReduce分布式计算中,经常出现在示例代码中。

应用部分

重写上面的演示示例代码:

var配置=newStreamConfig();config.ApplicationId=\" 测试字数统计应用程序\";config.BootstrapServers = \"kafka1:9091,kafka2:9092,kafka3:9093\";StreamBuilder 生成器=  StreamBuilder();m.scooter-sidecars.com<string, string>(\"测试-单词输入\") .FlatMapValues(值 => value.Split(\" \", StringSplitOptions.RemoveEmptyEntries).ToList()) //以空格分隔多个单词.map(((键,value)=> keyvaluepair.create(value,\\“1\”))//转换为(word,1 word,1 ) 键值对形式  .GroupByKey() // 根据单词进行分组  .Count() // 计算值的个数​​每组中。 ToStream() .Map((key, value) => KeyValuePair.Create(key, $\"{key} : {value.ToString()}\" )) .To(\"test-word-out\");// 创建一个包含 \"test-ktable\" 主题的表,并在名为的内存存储中实现这一点\"test-store\"builder.Table(\"test-word-ktable\",内存中<string ,  string >.As(\"test-word-store\"));//构建拓扑 拓扑 t = m.scooter-sidecars.com();//创建具有拓扑和配置的流实例KafkaStream Stream = new KafkaStream(t, config);//订阅CTRL + C退出流应用程序Console.CancelKeyPress += (o, e) =>{stream.Dispose();};//启动可取消的流实例tokenawaitstream.StartAsync();

Broker侧部分

添加示例代码中需要使用的几个主题:test-word-in、test-word-out和test-字-ktable 。

测试效果

首先我们启动.NET控制台程序。

然后,我们在Broker端打开一个Producer命令行,依次手动输入一些数据源:

# kafka-console- m.scooter-sidecars.com --topic=test-word-in --broker-list kafka1:9091,kafka2:9092,kafka3:9093>hello world>hello jav^H>hello csharp>hello golang

是如您所见,我们的hello在这里出现了4次,其他的话是只出现过1次。

那么,我们可以直接进入主题test-word-out来验证:

6 总结

本文总结了Kafka Streams的基本概念和执行流程,并给出了Kafka Streams应用程序与.NET客户端结合的示例。

参考资料

kafka-streams-dotnet:https://m.scooter-sidecars.com/kafka-streams-dotnet

极客时间胡夕《Kafka核心技术与实战》

Bilibili,硅谷《Kafka 3.x入门到精通教程》

作者:周旭龙

来源:https://m.scooter-sidecars.com

本文版权归作者及博客园所有。欢迎转载,但未经作者同意必须保留本声明,并须在文章页面明显位置提供原文链接。

发表评论 (已有0条评论)

还木有评论哦,快来抢沙发吧~