手记

使用Python构造一个简单的数据库持久化工具包

想法

写过Java的同学应该用过hibernate和mybatis之类的数据库持久化框架,
这里我主要是想实现类似mybatis中数据库相关注解的部分功能,后续的化时间充分会深入去研究实现,目前这个版本我只是尝试实现一个简单的类似的工具包.

  • 下面看一下Java代码中的应用:

        @Select("select resource_key,sum(a.count) as count " +
                "from (select resource_key, count(1) as count " +
                "from t_alarm_log " +
                "where is_deleted=0 and status=0 and cloud_os = #{cloudId} group by resource_key  " +
                "union  " +
                "select t_resource.resource_key, 0 as count " +
                "from t_resource inner join t_resource_strategy  " +
                "on t_resource.id = t_resource_strategy.resource_id  " +
                "where cloud_os = #{cloudId} " +
                "group by resource_key) a group by resource_key")
        List<AlarmLogCountDTO> selectCloudOSAlarmLogs(@Param("cloudId") String cloudId);
    
  • 我使用python实现:

    @select('select * from t_school WHERE id="#{id}"')
    def get_school_by_id(id):
        pass
    
    @select('select * from t_school WHERE name like "%#{name}%"')
    def get_school_by_name(name):
        pass
    
    print get_school_by_id(id='60')
    print get_school_by_name(name='乌克兰')
    

设计

  • 1.构造select, udpate,delete,insert几种装饰器

  • 2.通过装饰器获取模板sql, 根据函数预留参数来获取传入的参数值,并构造完整sql, 调用sql驱动获取数据库操作信息

  • 待完善:

    • 支持多种数据库,比如MongoDB,mysql,sqlite等等.
    • 自动封装对象.
  • 相关代码(函数抽象):

    def cell(*args):
        """ 
        :param args:
        :return:
        """
        pass
    
    
    def select(sql):
        return cell(sql, 'select')
    
    
    def insert(sql):
        return cell(sql, 'persistent')
    
    
    def update(sql):
        return cell(sql, 'persistent')
    
    
    def delete(sql):
        return cell(sql, 'delete')
    
  • 关于驱动部分的设计:

    • 抽象持久化操作接口:其实区别无非是否commit操作.

    • 简单封装(其他具体驱动实现都要继承自该类):

      class Database(object):
          def __init__(self, *args, **kwargs):
              for k, v in kwargs.iteritems():
                  setattr(self, k, v)
      
              self.args = args
              self.kwargs = kwargs
      
          def select(self, *args, **kwargs):
              raise NotImplementedError
      
          def persistent(self, *args, **kwargs):
              raise NotImplementedError
      
          def delete(self, *args, **kwargs):
              raise NotImplementedError
      
    • 举例MySQL:

      class MySQLUtils(Database):
          def __init__(self, *args, **kwargs):
              for k, v in kwargs.iteritems():
                  setattr(self, k, v)
      
              if not kwargs.get('charset'):
                  kwargs['charset'] = 'utf8'
              super(MySQLUtils, self).__init__(*args, **kwargs)
              self.__connection = None
              self.__cursor = None
      
          def __enter__(self):
              self.open()
              return self
      
          def __exit__(self, exc_type, exc_val, exc_tb):
              self.close()
              if exc_tb:
                  logger.error('[%s]%s' % (exc_type, exc_val))
      
          def open(self):
              if self.__connection:
                  raise MySQLdb.MySQLError("connection already connected.")
              try:
                  self.__connection = MySQLdb.connect(*self.args, **self.kwargs)
              except Exception:
                  logger.error("数据库连接异常, 请设置:sql_annotation.conn.connection 的连接信息.")
                  raise DatabaseConnectionError
      
              if self.__cursor:
                  raise MySQLdb.MySQLError("cursor already opened.")
              self.__cursor = self.__connection.cursor(MySQLdb.cursors.DictCursor)
              # logger.info("connection opened.")
      
          def close(self):
              with _Closing(self.__cursor) as _:
                  pass
              with _Closing(self.__connection) as _:
                  pass
              self.__cursor = None
              self.__connection = None
              # logger.info("connection close success.")
      
          def __execute(self, sql, commit=False):
              if not (self.__connection and self.__cursor):
                  raise MySQLdb.MySQLError("connection already closed.")
              count = self.__cursor.execute(sql)
              result = self.__cursor.fetchall()
              self.__connection.commit() if commit else None
              return count if commit else result
      
          def select(self, sql, formatter_func=None):
              logger.info("Execute SQL: {}".format(sql))
              if formatter_func:
                  return map(formatter_func, self.__execute(sql))
              return self.__execute(sql)
      
          def persistent(self, sql):
              return self.__execute(sql, True)
      
          def delete(self, sql):
              return self.__execute(sql, True)
      
    • 基本上实现这些操作就可以了,一个简单使用装饰器来操作sql语句的工具包就构建完成了. 有兴趣的可以了解一下源代码: https://github.com/tomoncle/sql-annotation, 安装尝试一下

示例:

from sql_annotation.annotation import select
from sql_annotation.conn import connection

connection(username='tom', password='123456', db='test')


@select('select * from t_school WHERE id="#{id}"')
def get_school_by_id(id):
    pass


@select('select * from t_school WHERE name like "%#{name}%"')
def get_school_by_name(name):
    pass


print get_school_by_id(id='60')
print get_school_by_name(name='乌克兰')

为什么要写这个东西?

有时候django这种框架封装的过于复杂,可能你就想通过学生获得所在学校的学生总数,完全可以通过一个sql搞定,不用调用对象了.当是提供一个灵活的方式吧.

0人推荐
随时随地看视频
慕课网APP