当前位置 : 主页 > 编程语言 > java >

flume 自定义拦截器 interceptor

来源:互联网 收集:自由互联 发布时间:2021-06-28
WordCountInterceptor.java package com.bd17;import java.util.Arrays;import java.util.List;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;/** * 自定义 interceptor 把接收到的
WordCountInterceptor.java
package com.bd17;

import java.util.Arrays;
import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

/**
 * 自定义 interceptor 把接收到的数据转换成文本,把 event 里面的内容替换成文本的单词数 定义一个 exclude
 * 参数,排除某些单词不计算在内
 * 
 * @author Administrator
 *
 */
public class WordCountInterceptor implements Interceptor {

	// 参数 excludeWords 中可以填写多个单词,当填写多个单词的时候用逗号分隔
	private String excludeWords;
	private String[] excludeWordsArray;
	private int eventCount;

	public WordCountInterceptor(String excludeWords) {

		this.excludeWords = excludeWords;
		if (excludeWords!=null && !excludeWords.equals("")) {
			
			this.excludeWordsArray = this.excludeWords.split(",");
		}
	}

	public void initialize() {
		// no-op
	}

	// 拦截过程数据处理逻辑
	public Event intercept(Event event) {

		this.eventCount=0;
		
		String[] words = new String(event.getBody()).split(",");
		
		if (excludeWordsArray == null || excludeWordsArray.length < 1) {
			
			eventCount = words.length;
		}else{
			
			List
 
   execludeList = Arrays.asList(excludeWordsArray);
			for(String word : words) {
				
				if (!execludeList.contains(word)) {
					
					eventCount += 1;
				}
			}
		}
		event.setBody(String.valueOf(eventCount).getBytes());
		return event;
	}

	// 使用单个 event 拦截处理过程逻辑来实现 list 列表 event 的处理过程
	public List
  
    intercept(List
   
     events) { for (Event event : events) { intercept(event); } return events; } public void close() { // no-op } /** * 定义 Interceptor.Builder 接口的实现类,并且是 interceptor 的内部类 * @author Administrator * */ public static class Builder implements Interceptor.Builder{ private String excludeWords; public void configure(Context context) { excludeWords = context.getString("excludeWords"); } public Interceptor build() { return new WordCountInterceptor(excludeWords); } } }
   
  
 
上一篇:Pipeline & Valve
下一篇:HashMap的工作原理
网友评论