当前位置 博文首页 > python实现多进程并发控制Semaphore与互斥锁LOCK

    python实现多进程并发控制Semaphore与互斥锁LOCK

    作者:知无涯学无尽 时间:2021-06-02 17:43

    一、了解锁

    应用场景举例描述: Lock 互斥锁:举例说明–有三个同事同时需要上厕所,但是只有一间厕所,将同事比作进程,多个进程并打抢占一个厕所,我们要保证顺序优先, 一个个来,那么就必须串行,先到先得,有人使用了,就锁住,结束后剩余的人继续争抢

    1、利用Lock来处理

    模拟三个同事抢占厕所

    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    
    def task1(p, lock):
        # 上锁
        lock.acquire()
        print(f'{p} 开始排泄')
        time.sleep(random.randint(1, 3))
        print(f'{p} 排泄结束')
        # 解锁
        lock.release()
    
    def task2(p, lock):
        lock.acquire()
        print(f'{p} 开始排泄')
        time.sleep(random.randint(1, 3))
        print(f'{p} 排泄结束')
        lock.release()
    
    def task3(p, lock):
        lock.acquire()
        print(f'{p} 开始排泄')
        time.sleep(random.randint(1, 3))
        print(f'{p} 排泄结束')
        lock.release()
    
    
    if __name__ == '__main__':
        # 实例化一个锁对象
        mutex = Lock()
        # 将锁以参数的形式传入进程对象
        p1 = Process(target=task1, args=('task1', mutex,))
        p2 = Process(target=task2, args=('task2', mutex,))
        p3 = Process(target=task3, args=('task3', mutex,))
    
        p1.start()
        p2.start()
        p3.start()
    

    执行结果:

    # 输出结果:三个进程开始争抢互斥锁,先抢到的先执行,执行结束后,释放掉锁,剩下的继续争抢
    task1 开始排泄
    task1 排泄结束
    task2 开始排泄
    task2 排泄结束
    task3 开始排泄
    task3 排泄结束

    1、 注意:

    • 互斥锁在函数中,lock.acquire()上锁一次就要lock.release()解锁一次,在上锁与解锁之间写需要执行的代码。
    • 如果连续上锁两次以上,就会出现死锁现象,代码将不继续执行下去。必须是锁一次解一次。

    2、 lock和join比较:

    • 共同点------都可以把并行变成串行,保证了执行顺序
    • 不同点------join是人为设定了顺序,lock是让其争抢顺序,保证了公平性

    2、利用反射,来优化上面的代码

    上面的代码虽然起到了先进先出,一进一出的效果,但是并不完美。总所周知,我们上厕所是谁先抢到谁先上,并不是说按照代码里start()顺序执行。应该由先抢占到的进程限制性才更合理。

    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    import sys
    
    def task1(p, lock):
        # 上锁
        lock.acquire()
        print(f'{p} 开始打印')
        time.sleep(random.randint(1, 3))
        print(f'{p} 打印结束')
        # 解锁
        lock.release()
    
    def task2(p, lock):
        lock.acquire()
        print(f'{p} 开始打印')
        time.sleep(random.randint(1, 3))
        print(f'{p} 打印结束')
        lock.release()
    
    def task3(p, lock):
        lock.acquire()
        print(f'{p} 开始打印')
        time.sleep(random.randint(1, 3))
        print(f'{p} 打印结束')
        lock.release()
    
    
    if __name__ == '__main__':
        slock = Lock()
        for i in range(1,4):
           p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(f'task{i}', slock))
           p.start()

    输出结果:

    task2 开始打印
    task2 打印结束
    task3 开始打印
    task3 打印结束
    task1 开始打印
    task1 打印结束

    二、进程并发控制 semaphore

    semaphore(信号量):用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数
    信号量: 也是一把锁,但是不保证数据安全性,同时开启多个线程,但是规定了同时并发执行的上限,后面走多少,进多少。(用于控制并发数量)

    1.多进程控制示例(1)

    # 举例说明:一间厕所有5个坑位,最多只能同时有5个人上厕所,当前时刻有20个人想上厕所,但是只能让5个人进去,然后出来多少个,才能进去多少个上厕所
    
    # 从一个模块引用多个功能的时候,用逗号隔开
    from threading import Semaphore, Thread, currentThread
    import time
    import random
    
    sem = Semaphore(3)             # 并发执行数设置为5
    
    def task():
        sem.acquire()
        print(f'{currentThread().name}')
        time.sleep(random.randint(1,3))
        sem.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task)
            t.start()

    执行结果:首次并发量是3,后面先抢到锁先执行

    Thread-1
    Thread-2
    Thread-3

    Thread-4
    Thread-5

    Thread-6
    Thread-7

    Thread-8

    Process finished with exit code 0

    2.多进程控制示例(2)

    import multiprocessing
    import time
    
    def worker(s, i):
        s.acquire()
        print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 抢占并获得锁,运行")
        time.sleep(i)
        print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 运行结束,释放锁")
        s.release()
    
    if __name__ == '__main__':
        s = multiprocessing.Semaphore(2)
        for i in range(8):
            p = multiprocessing.Process(target=worker, args=(s, 1))
            p.start()
    
    

    执行结果:

    在执行结果输出的终端,每阻塞一次,按下回车键,可以更加清晰的看出进程的并发执行。
    由下面执行结果可以看出,同一时刻,有两个进程在执行
    2021-05-18 22:50:37 Process-1 抢占并获得锁,运行
    2021-05-18 22:50:37 Process-2 抢占并获得锁,运行

    2021-05-18 22:50:38 Process-1 运行结束,释放锁
    2021-05-18 22:50:38 Process-3 抢占并获得锁,运行
    2021-05-18 22:50:38 Process-2 运行结束,释放锁
    2021-05-18 22:50:38 Process-4 抢占并获得锁,运行

    2021-05-18 22:50:39 Process-3 运行结束,释放锁
    2021-05-18 22:50:39 Process-5 抢占并获得锁,运行
    2021-05-18 22:50:39 Process-4 运行结束,释放锁
    2021-05-18 22:50:39 Process-6 抢占并获得锁,运行

    2021-05-18 22:50:40 Process-5 运行结束,释放锁
    2021-05-18 22:50:40 Process-7 抢占并获得锁,运行
    2021-05-18 22:50:40 Process-6 运行结束,释放锁
    2021-05-18 22:50:40 Process-8 抢占并获得锁,运行

    2021-05-18 22:50:41 Process-7 运行结束,释放锁
    2021-05-18 22:50:41 Process-8 运行结束,释放锁

    Process finished with exit code 0

    三、进程同步之LOCK

    多个进程并发执行,提高资源利用率,从而提高效率,但是有时候我们需要在某一时刻只能有一个进程访问某个共享资源的话,就需要使用锁LOCK

    1.不加LOCK的示例

    import multiprocessing
    import time
    
    def task1():
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task1 输出信息')
            time.sleep(1)
            n -= 1
    
    def task2():
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task2 输出信息')
            time.sleep(1)
            n -= 1
    
    def task3():
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task3 输出信息')
            time.sleep(1)
            n -= 1
    
    if __name__ == '__main__':
        p1 = multiprocessing.Process(target=task1)
        p2 = multiprocessing.Process(target=task2)
        p3 = multiprocessing.Process(target=task3)
        p1.start()
        p2.start()
        p3.start()
    
    

    执行结果:

    2021-59-18 22:59:46  task1 输出信息
    2021-59-18 22:59:46  task2 输出信息
    2021-59-18 22:59:46  task3 输出信息

    2021-59-18 22:59:47  task1 输出信息
    2021-59-18 22:59:47  task2 输出信息
    2021-59-18 22:59:47  task3 输出信息

    2021-59-18 22:59:48  task1 输出信息
    2021-59-18 22:59:48  task2 输出信息
    2021-59-18 22:59:48  task3 输出信息

    Process finished with exit code 0

    2.加上LOCK的示例

    有两种加锁方式:首先将 lock = multiprocessing.Lock() 生成锁对象lock

    1. with lock: with会在执行前启动lock,在执行结束后关闭lock
    2. lock.acquire() … lock.release() : 注意,这俩必须是一个接一个的对应关系
    import multiprocessing
    
    import time
    
    def task1(lock):
        with lock:
            n = 4
            while n > 1:
                print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task1 输出信息')
                time.sleep(1)
                n -= 1
    
    def task2(lock):
        lock.acquire()
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task2 输出信息')
            time.sleep(1)
            n -= 1
        lock.release()
    
    def task3(lock):
        lock.acquire()
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task3 输出信息')
            time.sleep(1)
            n -= 1
        lock.release()
    
    if __name__ == '__main__':
        lock = multiprocessing.Lock()
        p1 = multiprocessing.Process(target=task1, args=(lock,))
        p2 = multiprocessing.Process(target=task2, args=(lock,))
        p3 = multiprocessing.Process(target=task3, args=(lock,))
        p1.start()
        p2.start()
        p3.start()

    执行结果

    2021-11-18 23:11:37  task1 输出信息

    2021-11-18 23:11:38  task1 输出信息

    2021-11-18 23:11:39  task1 输出信息

    2021-11-18 23:11:40  task2 输出信息

    2021-11-18 23:11:41  task2 输出信息

    2021-11-18 23:11:42  task2 输出信息

    2021-11-18 23:11:43  task3 输出信息

    2021-11-18 23:11:44  task3 输出信息

    2021-11-18 23:11:45  task3 输出信息

    Process finished with exit code 0
     

    js