implements org.apache.flume.interceptor.Interceptor
实现Event intercept(Event arg0)方法和List
eg:实现拦截器获取并传递数据所在的分区名
Java实现代码
public class TimestampInterceptor implements Interceptor{ private static final Logger logger = LoggerFactory.getLogger(Md5ConvertInterceptor.class); private DateFormat df=new SimpleDateFormat("yyyyMMdd"); @Override public void close() { } @Override public void initialize() { } @Override public Event intercept(Event e) { byte[] bs=e.getBody(); String time; if(bs!=null){ try { String line=new String(bs,"UTF-8"); time = line.substring(0,8); Map<String, String>p=new HashMap<String, String>(); p.put("day",time); e.setHeaders(p); } catch (Exception e1) { return null; } } return e; } @Override public List<Event> intercept(List<Event> events) { List<Event> list = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event e = intercept(event); if (e != null) { list.add(e); } } return list; } public static class Builder implements Interceptor.Builder { //使用Builder初始化Interceptor @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context arg0) { } }
在Flume中配置拦截器:
producer01.sources.sourcename.interceptors=i3 producer01.sources.sourcename.interceptors.i3.type=org.apache.flume.interceptor.TimestampInterceptor$Builder
producer01.sinks.sink_name.type=hdfs producer01.sinks.sink_name.hdfs.path=hdfs://dmp/data/logs/name/%{day} producer01.sinks.sink_name.hdfs.rollInterval=0 producer01.sinks.sink_name.hdfs.rollSize=0 producer01.sinks.sink_name.hdfs.idleTimeout=54000 producer01.sinks.sink_name.hdfs.filePrefix=pad producer01.sinks.sink_name.hdfs.rollCount=50000000 producer01.sinks.sink_name.hdfs.writeFormat=Text producer01.sinks.sink_name.hdfs.batchSize=1000 producer01.sinks.sink_name.hdfs.fileType=DataStream producer01.sinks.sink_name.channel=channel_name