博客信息

批量处理flume向HDFS写文件时产生未释放租约的异常文件

发布时间:『 2017-06-28 13:57』  博客类别:Hadoop/Spark  阅读(1613) 评论(0)
# encoding: utf-8
'''
@function:用于批量处理flume向HDFS写文件时产生未释放租约的异常文件

'''
import os

def get_unrealease_files(prefix = "/data/logs", tmp_suffix = ".tmp"):
    '''
    @function 获取过期未释放租约的文件列表
    @return: list []
    '''
    file_list = []

    cmd = "hdfs fsck / -openforwrite | grep /data"
    info = os.popen(cmd).readlines()

    for row in info:
        f = row.split(":")[0]
        if f.startswith(prefix) and not f.endswith(tmp_suffix):
            file_list.append(f)

    return file_list


def recover_lease(hdfs_path, retries = 3):
    '''
    @param hdfs_path : hdfs文件路径(全路径)
    @param retries :   重试次数
    '''
    cmd = "hdfs debug recoverLease -path %s -retries %s" %(hdfs_path, retries)
    try:
        ret = os.system(cmd)
        if ret == 0:
            print 'Recover lease of [%s] Successful!' %hdfs_path
        else:
            print 'Recover lease of [%s] unsuccessful,please check by manual!' %hdfs_path
    except Exception,e:
        print e

def batch_recover_lease(file_list):
    '''
    @param file_list: 待恢复文件列表
    '''
    if not file_list:
        print 'no file need to be recover lease, ended.'
        return

    for f in file_list:
        recover_lease(f)

if __name__ == '__main__':
    #获取指定条件未释放租约的文件列表,默认/data/logs目录下不是从.tmp结尾的文件
    file_list = get_unrealease_files()
    #批量释放租约,恢复文件为可读
    batch_recover_lease(file_list)


关键字:   无
评论信息
暂无评论
发表评论
验证码: 
Powered by IMZHANGJIE.CN Copyright © 2015-2025 粤ICP备14056181号