当前位置 博文首页 > Python中threading库实现线程锁与释放锁

    Python中threading库实现线程锁与释放锁

    作者:李元静 时间:2021-06-05 18:20

    控制资源访问

    前文提到threading库在多线程时,对同一资源的访问容易导致破坏与丢失数据。为了保证安全的访问一个资源对象,我们需要创建锁。

    示例如下:

    import threading
    import time
    
    class AddThread():
        def __init__(self, start=0):
            self.lock = threading.Lock()
            self.value = start
    
        def increment(self):
            print("Wait Lock")
            self.lock.acquire()
            try:
                print("Acquire Lock")
                self.value += 1
                print(self.value)
            finally:
                self.lock.release()
    
    def worker(a):
        time.sleep(1)
        a.increment()
    
    addThread = AddThread()
    for i in range(3):
        t = threading.Thread(target=worker, args=(addThread,))
        t.start()

    运行之后,效果如下:

    解锁

    acquire()会通过锁进行阻塞其他线程执行中间段,release()释放锁,可以看到,基本都是获得锁之后才执行。避免了多个线程同时改变其资源对象,不会造成混乱。

    判断是否有另一个线程请求锁

    要确定是否有另一个线程请求锁而不影响当前的线程,可以设置acquire()的参数blocking=False。

    示例如下:

    import threading
    import time
    
    def worker2(lock):
        print("worker2 Wait Lock")
        while True:
            lock.acquire()
            try:
                print("Holding")
                time.sleep(0.5)
            finally:
                print("not Holding")
                lock.release()
            time.sleep(0.5)
    
    def worker1(lock):
        print("worker1 Wait Lock")
        num_acquire = 0
        value = 0
        while num_acquire < 3:
            time.sleep(0.5)
            have_it = lock.acquire(blocking=False)
            try:
                value += 1
                print(value)
                print("Acquire Lock")
                if have_it:
                    num_acquire += 1
            finally:
                print("release Lock")
                if have_it:
                    lock.release()
    
    lock = threading.Lock()
    word2Thread = threading.Thread(
        target=worker2,
        name='work2',
        args=(lock,)
    )
    word2Thread.start()
    word1Thread = threading.Thread(
        target=worker1,
        name='work1',
        args=(lock,)
    )
    word1Thread.start()
    

    运行之后,效果如下:

    8次

    这里,我们需要迭代很多次,work1才能获取3次锁。但是尝试了很8次。

    with lock

    前文,我们通过lock.acquire()与lock.release()实现了锁的获取与释放,但其实我们Python还给我们提供了一个更简单的语法,通过with lock来获取与释放锁。

    示例如下:

    import threading
    import time
    
    class AddThread():
        def __init__(self, start=0):
            self.lock = threading.Lock()
            self.value = start
    
        def increment(self):
            print("Wait Lock")
            with self.lock:
                print("lock acquire")
                self.value += 1
                print(self.value)
            print("lock release")
    
    def worker(a):
        time.sleep(1)
        a.increment()
    
    addThread = AddThread()
    for i in range(3):
        t = threading.Thread(target=worker, args=(addThread,))
        t.start()
    

    这里,我们只是将最上面的例子改变了一下。效果如下:

    效果

    需要注意的是,正常的Lock对象不能请求多次,即使是由同一个线程请求也不例外。如果同一个调用链中的多个函数访问一个锁,则会发生意外。如果期望在同一个线程的不同代码需要重新获得锁,那么这种情况下使用RLock。

    同步线程

    Condition

    在实际的操作中,我们还可以使用Condition对象来同步线程。由于Condition使用了一个Lock,所以它可以绑定到一个共享资源,允许多个线程等待资源的更新。

    示例如下:

    import threading
    import time
    
    def consumer(cond):
        print("waitCon")
        with cond:
            cond.wait()
            print('获取更新的资源')
    
    def producer(cond):
        print("worker")
        with cond:
            print('更新资源')
            cond.notifyAll()
    
    cond = threading.Condition()
    t1 = threading.Thread(name='t1', target=consumer, args=(cond,))
    t2 = threading.Thread(name='t2', target=consumer, args=(cond,))
    t3 = threading.Thread(name='t3', target=producer, args=(cond,))
    t1.start()
    time.sleep(0.2)
    t2.start()
    time.sleep(0.2)
    t3.start()
    

    运行之后,效果如下:

    资源

    这里,我们通过producer线程处理完成之后调用notifyAll(),consumer等线程等到了它的更新,可以类比为观察者模式。这里是,当一个线程用完资源之后时,则会自动通知依赖它的所有线程。

    屏障(barrier)

    屏障是另一种线程的同步机制。barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与方都到达这一点。采用这种方法,线程可以单独启动然后暂停,直到所有线程都准备好了才可以继续。

    示例如下:

    import threading
    import time
    
    def worker(barrier):
        print(threading.current_thread().getName(), "worker")
        worker_id = barrier.wait()
        print(threading.current_thread().getName(), worker_id)
    
    threads = []
    barrier = threading.Barrier(3)
    for i in range(3):
        threads.append(
            threading.Thread(
                name="t" + str(i),
                target=worker,
                args=(barrier,)
            )
        )
    for t in threads:
        print(t.name, 'starting')
        t.start()
        time.sleep(0.1)
    
    for t in threads:
        t.join()
    

    运行之后,效果如下:

    屏障

    从控制台的输出会发发现,barrier.wait()会阻塞线程,直到所有线程被创建后,才同时释放越过这个控制点继续执行。wait()的返回值指示了释放的参与线程数,可以用来限制一些线程做清理资源等动作。

    当然屏障Barrier还有一个abort()方法,该方法可以使所有等待线程接收一个BroKenBarrierError。如果线程在wait()上被阻塞而停止处理,会产生这个异常,通过except可以完成清理工作。

    有限资源的并发访问

    除了多线程可能访问同一个资源之外,有时候为了性能,我们也会限制多线程访问同一个资源的数量。例如,线程池支持同时连接,但数据可能是固定的,或者一个网络APP提供的并发下载数支持固定数目。这些连接就可以使用Semaphore来管理。

    示例如下:

    import threading
    import time
    
    class WorkerThread(threading.Thread):
        def __init__(self):
            super(WorkerThread, self).__init__()
            self.lock = threading.Lock()
            self.value = 0
    
        def increment(self):
            with self.lock:
                self.value += 1
                print(self.value)
    
    def worker(s, pool):
        with s:
            print(threading.current_thread().getName())
            pool.increment()
            time.sleep(1)
            pool.increment()
    
    pool = WorkerThread()
    s = threading.Semaphore(2)
    for i in range(5):
        t = threading.Thread(
            name="t" + str(i),
            target=worker,
            args=(s, pool,)
        )
        t.start()
    

    运行之后,效果如下:

    控制台

    从图片虽然能看所有输出,但无法看到其停顿的事件。读者自己运行会发现,每次顶多只有两个线程在工作,是因为我们设置了threading.Semaphore(2)。

    隐藏资源

    在实际的项目中,有些资源需要锁定以便于多个线程使用,而另外一些资源则需要保护,以使它们对并非使这些资源的所有者的线程隐藏。

    local()函数会创建一个对象,它能够隐藏值,使其在不同的线程中无法被看到。示例如下:

    import threading
    import random
    
    def show_data(data):
        try:
            result = data.value
        except AttributeError:
            print(threading.current_thread().getName(), "No value")
        else:
            print(threading.current_thread().getName(), "value=", result)
    
    def worker(data):
        show_data(data)
        data.value = random.randint(1, 100)
        show_data(data)
    
    local_data = threading.local()
    show_data(local_data)
    local_data.value = 1000
    show_data(local_data)
    
    for i in range(2):
        t = threading.Thread(
            name="t" + str(i),
            target=worker,
            args=(local_data,)
        )
        t.start()
    

    运行之后,效果如下:

    输出

    这里local_data.value对所有线程都不可见,除非在某个线程中设置了这个属性,这个线程才能看到它。

    js
    下一篇:没有了