WordCountInterceptor.java package com.bd17;import java.util.Arrays;import java.util.List;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;/** * 自定义 interceptor 把接收到的
package com.bd17; import java.util.Arrays; import java.util.List; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; /** * 自定义 interceptor 把接收到的数据转换成文本,把 event 里面的内容替换成文本的单词数 定义一个 exclude * 参数,排除某些单词不计算在内 * * @author Administrator * */ public class WordCountInterceptor implements Interceptor { // 参数 excludeWords 中可以填写多个单词,当填写多个单词的时候用逗号分隔 private String excludeWords; private String[] excludeWordsArray; private int eventCount; public WordCountInterceptor(String excludeWords) { this.excludeWords = excludeWords; if (excludeWords!=null && !excludeWords.equals("")) { this.excludeWordsArray = this.excludeWords.split(","); } } public void initialize() { // no-op } // 拦截过程数据处理逻辑 public Event intercept(Event event) { this.eventCount=0; String[] words = new String(event.getBody()).split(","); if (excludeWordsArray == null || excludeWordsArray.length < 1) { eventCount = words.length; }else{ ListexecludeList = Arrays.asList(excludeWordsArray); for(String word : words) { if (!execludeList.contains(word)) { eventCount += 1; } } } event.setBody(String.valueOf(eventCount).getBytes()); return event; } // 使用单个 event 拦截处理过程逻辑来实现 list 列表 event 的处理过程 public List intercept(List events) { for (Event event : events) { intercept(event); } return events; } public void close() { // no-op } /** * 定义 Interceptor.Builder 接口的实现类,并且是 interceptor 的内部类 * @author Administrator * */ public static class Builder implements Interceptor.Builder{ private String excludeWords; public void configure(Context context) { excludeWords = context.getString("excludeWords"); } public Interceptor build() { return new WordCountInterceptor(excludeWords); } } }