当前位置 博文首页 > python基于mysql实现的简单队列以及跨进程锁实例详解

    python基于mysql实现的简单队列以及跨进程锁实例详解

    作者:admin 时间:2021-07-10 18:42

    通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。

    举个例子如下:

    假设我们用mysql来实现一个任务队列,实现的过程如下:

    1. 在Mysql中创建Job表,用于储存队列任务,如下:

    create table jobs(
      id auto_increment not null primary key,
      message text not null,
      job_status not null default 0
    );
    
    

    message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 
     
    2. 有一个生产者进程,往job表中放新的数据,进行排队:

    insert into jobs(message) values('msg1');
    

    3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

    select * from jobs where job_status=0 order by id asc limit 1;
    update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
    

    4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

    =========================分割线=======================================

    说到跨进程的锁实现,我们主要有几种实现方式:

    (1)信号量
    (2)文件锁fcntl
    (3)socket(端口号绑定)
    (4)signal
    这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。
     
    查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈.
     
    对此用python实现了一个demo,如下:
     
    文件名:glock.py

    #!/usr/bin/env python2.7 
    # 
    # -*- coding:utf-8 -*- 
    # 
    #  Desc  : 
    # 
    import logging, time 
    import MySQLdb 
    class Glock: 
      def __init__(self, db): 
        self.db = db 
      def _execute(self, sql): 
        cursor = self.db.cursor() 
        try: 
          ret = None 
          cursor.execute(sql) 
          if cursor.rowcount != 1: 
            logging.error("Multiple rows returned in mysql lock function.") 
            ret = None 
          else: 
            ret = cursor.fetchone() 
          cursor.close() 
          return ret 
        except Exception, ex: 
          logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex)) 
          cursor.close() 
          return None 
      def lock(self, lockstr, timeout): 
        sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) 
        ret = self._execute(sql) 
     
        if ret[0] == 0: 
          logging.debug("Another client has previously locked '%s'.", lockstr) 
          return False 
        elif ret[0] == 1: 
          logging.debug("The lock '%s' was obtained successfully.", lockstr) 
          return True 
        else: 
          logging.error("Error occurred!") 
          return None 
      def unlock(self, lockstr): 
        sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) 
        ret = self._execute(sql) 
        if ret[0] == 0: 
          logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) 
          return False 
        elif ret[0] == 1: 
          logging.debug("The lock '%s' the lock was released.", lockstr) 
          return True 
        else: 
          logging.error("The lock '%s' did not exist.", lockstr) 
          return None 
    #Init logging 
    def init_logging(): 
      sh = logging.StreamHandler() 
      logger = logging.getLogger() 
      logger.setLevel(logging.DEBUG) 
      formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') 
      sh.setFormatter(formatter) 
      logger.addHandler(sh) 
      logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) 
    def main(): 
      init_logging() 
      db = MySQLdb.connect(host='localhost', user='root', passwd='') 
      lock_name = 'queue' 
     
      l = Glock(db) 
     
      ret = l.lock(lock_name, 10) 
      if ret != True: 
        logging.error("Can't get lock! exit!") 
        quit() 
      time.sleep(10) 
      logging.info("You can do some synchronization work across processes!") 
      ##TODO 
      ## you can do something in here ## 
      l.unlock(lock_name) 
    if __name__ == "__main__": 
      main() 
    
    

    在main函数里:

    l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。
     
    在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的.

    2.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

    select * from jobs where job_status=0 order by id asc limit 1;
    update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
    

    这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。
     
    测试的时候,启动两个glock.py, 结果如下:

    [@tj-10-47 test]# ./glock.py  
    2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 
    2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 
    2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 
    2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 
    
    

    可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

    [@tj-10-47 test]# ./glock.py 
    2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
    2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
    2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
    2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
    [@tj-10-47 test]#
    

    jsjbwy