WordCountInterceptor.java package com.bd17;import java.util.Arrays;import java.util.List;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;/** * 自定义 interceptor 把接收到的
package com.bd17;
import java.util.Arrays;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
/**
* 自定义 interceptor 把接收到的数据转换成文本,把 event 里面的内容替换成文本的单词数 定义一个 exclude
* 参数,排除某些单词不计算在内
*
* @author Administrator
*
*/
public class WordCountInterceptor implements Interceptor {
// 参数 excludeWords 中可以填写多个单词,当填写多个单词的时候用逗号分隔
private String excludeWords;
private String[] excludeWordsArray;
private int eventCount;
public WordCountInterceptor(String excludeWords) {
this.excludeWords = excludeWords;
if (excludeWords!=null && !excludeWords.equals("")) {
this.excludeWordsArray = this.excludeWords.split(",");
}
}
public void initialize() {
// no-op
}
// 拦截过程数据处理逻辑
public Event intercept(Event event) {
this.eventCount=0;
String[] words = new String(event.getBody()).split(",");
if (excludeWordsArray == null || excludeWordsArray.length < 1) {
eventCount = words.length;
}else{
List
execludeList = Arrays.asList(excludeWordsArray);
for(String word : words) {
if (!execludeList.contains(word)) {
eventCount += 1;
}
}
}
event.setBody(String.valueOf(eventCount).getBytes());
return event;
}
// 使用单个 event 拦截处理过程逻辑来实现 list 列表 event 的处理过程
public List
intercept(List
events) { for (Event event : events) { intercept(event); } return events; } public void close() { // no-op } /** * 定义 Interceptor.Builder 接口的实现类,并且是 interceptor 的内部类 * @author Administrator * */ public static class Builder implements Interceptor.Builder{ private String excludeWords; public void configure(Context context) { excludeWords = context.getString("excludeWords"); } public Interceptor build() { return new WordCountInterceptor(excludeWords); } } }
