python操作hdfs
发布日期:2022-02-14 23:02:59 浏览次数:18 分类:技术文章

本文共 2555 字,大约阅读时间需要 8 分钟。

from pyhdfs import HdfsClientimport timefrom config import configclass OperaHdfs(object):    def __init__(self):        self.hosts = 'nn1.example.com:50070,nn2.example.com:50070'        self.fs = HdfsClient(hosts=self.hosts, user_name='hdfs')    def jug(self, file):        """        判断路径/文件是否存在        :param file:        :return:        """        return self.fs.exists(file)    def mkpath(self, path):        """        创建路径        :param path:        :return:        """        if not self.jug(path):            return self.fs.mkdirs(path)    def delpath(self, path):        """        删除文件夹        :param path:        :return:        """        if self.jug(file=path):            return self.fs.delete(path, recursive=True)    def get_file_path(self, hdfs_path, partition_dict):        """        获取目标文件存储的路径(文件前一级)        :param hdfs_path:        :param partition_dict:        :return:        """        if partition_dict:            for partition_name, partition_value in partition_dict.items():                hdfs_path += '/{}={}'.format(partition_name, partition_value)        self.mkpath(path=hdfs_path)        return hdfs_path    def write_hdfs(self, info, hdfs_path, file_name, partition_dict=None, write_mode='append'):        """        :param info: 即将写入hdfs的数据        :param hdfs_path:  the path of file on hdfs        :param file_name:  the file name on hdfs        :param partition_dict:  hive table partition        :param write_mode:  write mode        :return:        """        file_path = self.get_path(hdfs_path, file_name, partition_dict)        file = '{}/{}'.format(file_path, file_name)        if self.jug(file=file):            if write_mode == 'append':                timestamp = round(time.time() * 1000)                if '.' in file_name:                    file_name_name, file_name_tail = file_name.split('.')                    file_name_name = '{}_{}'.format(file_name_name, timestamp)                    file_name = '{}.{}'.format(file_name_name, file_name_tail)                else:                    file_name = '{}_{}'.format(file_name, timestamp)                file = '{}/{}'.format(file_apth, file_name)            if write_mode == 'nonConflict':                raise Exception('file {} is exists'.format(file))            if write_mode == 'overwrite':                self.delpath(path=file_path)                file_path = self.get_path(hdfs_path, file_name, partition_dict)                file = '{}/{}'.format(file_path, file_name)        self.fs.create(file, info.encode('utf-8'))opera_hdfs = OperaHdfs()

转载地址:https://blog.csdn.net/fish2009122/article/details/90204119 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:keras layers笔记
下一篇:hive新建db、授权及权限回收

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年03月31日 19时06分39秒