博客信息

​Flume自定义拦截器下沉数据到Hive分区目录

发布时间:『 2017-05-24 19:49』  博客类别:Hadoop/Spark  阅读(1148) 评论(0)

实现Flume的拦截器

implements org.apache.flume.interceptor.Interceptor

实现Event intercept(Event arg0)方法和Listintercept(Listarg0)方法

eg:实现拦截器获取并传递数据所在的分区名

Java实现代码

public class TimestampInterceptor implements Interceptor{
	private static final Logger logger = LoggerFactory.getLogger(Md5ConvertInterceptor.class);
	private DateFormat df=new SimpleDateFormat("yyyyMMdd");
	@Override
	public void close() {
		
	}

	@Override
	public void initialize() {
		
	}

	@Override
	public Event intercept(Event e) {
		byte[] bs=e.getBody();
		String time;
		if(bs!=null){
			try {
				String line=new String(bs,"UTF-8");
				time = line.substring(0,8);
				Map<String, String>p=new HashMap<String, String>();
				p.put("day",time);
				e.setHeaders(p);
			} catch (Exception e1) {
				return null;
			}
		}
		return e;
	}

	@Override
	public List<Event> intercept(List<Event> events) {
		List<Event> list = Lists.newArrayListWithCapacity(events.size());  
       for (Event event : events) {  
            Event e = intercept(event);  
            if (e != null) {  
                list.add(e);  
            }  
        }     
        return list;  
	}
	public static class Builder implements Interceptor.Builder {  
        //使用Builder初始化Interceptor  
        @Override  
        public Interceptor build() {  
            return new TimestampInterceptor();  
        }  
  
		@Override
		public void configure(Context arg0) {
			
		}  
    }

在Flume中配置拦截器:

producer01.sources.sourcename.interceptors=i3
producer01.sources.sourcename.interceptors.i3.type=org.apache.flume.interceptor.TimestampInterceptor$Builder
producer01.sinks.sink_name.type=hdfs
producer01.sinks.sink_name.hdfs.path=hdfs://dmp/data/logs/name/%{day}
producer01.sinks.sink_name.hdfs.rollInterval=0
producer01.sinks.sink_name.hdfs.rollSize=0
producer01.sinks.sink_name.hdfs.idleTimeout=54000
producer01.sinks.sink_name.hdfs.filePrefix=pad
producer01.sinks.sink_name.hdfs.rollCount=50000000
producer01.sinks.sink_name.hdfs.writeFormat=Text
producer01.sinks.sink_name.hdfs.batchSize=1000
producer01.sinks.sink_name.hdfs.fileType=DataStream
producer01.sinks.sink_name.channel=channel_name
关键字:   flume拦截器  
评论信息
暂无评论
发表评论
验证码: 
Powered by IMZHANGJIE.CN Copyright © 2015-2025 粤ICP备14056181号