# encoding: utf-8 ''' @function:用于批量处理flume向HDFS写文件时产生未释放租约的异常文件 ''' import os def get_unrealease_files(prefix = "/data/logs", tmp_suffix = ".tmp"): ''' @function 获取过期未释放租约的文件列表 @return: list [] ''' file_list = [] cmd = "hdfs fsck / -openforwrite | grep /data" info = os.popen(cmd).readlines() for row in info: f = row.split(":")[0] if f.startswith(prefix) and not f.endswith(tmp_suffix): file_list.append(f) return file_list def recover_lease(hdfs_path, retries = 3): ''' @param hdfs_path : hdfs文件路径(全路径) @param retries : 重试次数 ''' cmd = "hdfs debug recoverLease -path %s -retries %s" %(hdfs_path, retries) try: ret = os.system(cmd) if ret == 0: print 'Recover lease of [%s] Successful!' %hdfs_path else: print 'Recover lease of [%s] unsuccessful,please check by manual!' %hdfs_path except Exception,e: print e def batch_recover_lease(file_list): ''' @param file_list: 待恢复文件列表 ''' if not file_list: print 'no file need to be recover lease, ended.' return for f in file_list: recover_lease(f) if __name__ == '__main__': #获取指定条件未释放租约的文件列表,默认/data/logs目录下不是从.tmp结尾的文件 file_list = get_unrealease_files() #批量释放租约,恢复文件为可读 batch_recover_lease(file_list)