Code-amp;amp;gt;GitHubhttps:github.comliufengjikafka_api.git1、需求分析实时处理单词带有”amp;a Code -> GitHub https://github.com/liufengji/kafka_api.git 1、需求分析 实时处理单词带有”>>>”前缀的内容。例如输
Code-amp;amp;gt;GitHubhttps:github.comliufengjikafka_api.git1、需求分析实时处理单词带有”amp;a
Code -> GitHub
https://github.com/liufengji/kafka_api.git
1、需求分析
实时处理单词带有”>>>”前缀的内容。例如输入”victor>>>mayy”,最终处理成“mayy”
2、创建主类 (创建一个工程,并添加jar包)
import java.util.Properties;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder;public class Application { public static void main(String[] args) { // 定义输入的topic String from = "first"; // 定义输出的topic String to = "second"; // 设置参数 Properties settings = new Properties(); //给应用程序设计一个名字 settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); //连接哪台主机 settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); StreamsConfig cOnfig= new StreamsConfig(settings); // 构建拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", from) .addProcessor("PROCESS", new ProcessorSupplier() { @Override public Processor get() { // 具体分析处理 return new LogProcessor(); } }, "SOURCE") .addSink("SINK", to, "PROCESS"); // 创建kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }}
3、具体业务处理
import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor { private ProcessorContext context;台 @Override public void init(ProcessorContext context) { this.cOntext= context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); // 如果包含“>>>”则只保留该标记后面的内容 if (input.contains(">>>")) { input = input.split(">>>")[1].trim(); // 输出到下一个topic context.forward("logProcessor".getBytes(), input.getBytes()); }else{ context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void punctuate(long timestamp) { } @Override public void close() { }}
4、运行程序,在node3上启动生产者
[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 \--topic first> hello>>>world> hei>>>victor> h>>>victor> z>>>victor
5、在node2上启动消费者
[[email protected] kafka]$ bin/kafka-console-consumer.sh \--zookeeper node1:2181 \--from-beginning \--topic secondworldvictorvictorvictor