当前位置 主页 > 网站技术 > 代码类 >

    python3.7通过thrift操作hbase的示例代码

    栏目:代码类 时间:2020-01-14 12:10

    HBase是一个分布式的、面向列的开源数据库,其是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。其数据结构类似与Redis的key-value模式。

     

    python3.7 通过 thrift , rpc 接口操作 hbase ,指定依赖库为: thrift 和 hbase-thrift 。 然而我们 在 python3.7 环境中发现 hbase-thrift-0.20.4 无法被支持, hbase-thrift 官方仅推荐用于 python2.x 。 于是有了下边的 patch 版本 和 patch 版本写法的客户端。

    patch 版本下载,适用于 python 3.x : http://dl.cpp.la/Archive/hbase-thrift-0.20.4.patch.tgz

    卸载 hbase-thrift-0.20.4 版本

    # pip3 list | grep hbase-thrift
    >> hbase-thrift    0.20.4
    # pip3 uninstall hbase-thrift -y
    >> Successfully uninstalled hbase-thrift-0.20.4

    安装 hbase-thrift-0.20.4.patch 版本(支持 python3.x )

    wget http://dl.cpp.la/Archive/hbase-thrift-0.20.4.patch.tgz
    tar -zxvf hbase-thrift-0.20.4.patch.tgz
    cd hbase-thrift-0.20.4.patch
    python3 setup.py install

    检测安装是否成功

    # pip3 list | grep hbase-thrift
    >> hbase-thrift    0.20.4.patch
    Python3.7 操作 hbase-thrift-patch 客户端代码示例
    from thrift.transport import TSocket
    from thrift.transport.TTransport import TBufferedTransport
    from thrift.protocol import TBinaryProtocol
     
    from hbase import Hbase
    from hbase.ttypes import ColumnDescriptor
    from hbase.ttypes import Mutation
     
    class HBaseClient(object):
     
      def __init__(self):
        self.__ip = HBASE_URI.get("HOST")
        self.__port = HBASE_URI.get("PORT")
        self.__transport = self.createSocket
        protocol = TBinaryProtocol.TBinaryProtocol(self.__transport)
        self.__client = Hbase.Client(protocol)
        self.__transport.open()
     
      @property
      def createSocket(self):
        CS = TSocket.TSocket(self.__ip, self.__port)
        CS.setTimeout(60*1000)
        return TBufferedTransport(CS)
     
      def __del__(self):
        self.__transport.close()
     
      def get_tables(self):
        """
        get all table name
        :return: table name list
        """
        return self.__client.getTableNames()
     
      def create_table(self, table, *columns):
        """
        create table
        :param table: table name
        :param columns: columns name , variable parameter
        """
        func = lambda col: ColumnDescriptor(col)
        column_families = list(map(func, columns))
        self.__client.createTable(table, column_families)
     
      def delete_table(self, table):
        '''
        delete table in hbase
        :param table: tableName
        :return:
        '''
        if self.__client.isTableEnabled(table):
          self.__client.disableTable(table)
        self.__client.deleteTable(table)
     
      def put(self, table, row, columns):
        """
        add record
        :param table: table name
        :param row:
        :param columns:
        :return:
        """
        self.__client.mutateRow(table, row, [Mutation(column=k, value=v) for k, v in columns.items()])
     
      def delete(self, table, row, column):
        """
        delete record
        :param table: table name
        :param row:
        """
        self.__client.deleteAll(table, row, column)
     
      def scan(self, table, start_row="", columns=None):
        """
        get record
        :param table: table name
        :param start_row:
        :param columns:
        """
        scanner = self.__client.scannerOpen(table, start_row, columns)
        while True:
          r = self.__client.scannerGet(scanner)
          if not r:
            break
          yield dict([(k, v.value) for k, v in r[0].columns.items()])
    if __name__ == "__main__":
      client = HBaseClient()
      for v in client.scan('studentd', columns={"cpp.la":"https://cpp.la"}):
        print(v)
    by:cpp.la