博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python:信号量semaphore
阅读量:4111 次
发布时间:2019-05-25

本文共 6598 字,大约阅读时间需要 21 分钟。

目录

semaphone信号量

和Lock很像,信号量对象内部维护一个倒数计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其他的线程对信号量release后,计数大于0,恢复阻塞的线程

名称         含义
Semaphore(value=1) 构造方法。value小于0,抛ValueError异常
acquire(blocking=True,timeout=None) 获取信号量,计数器减1,获取成功返回True
release()         释放信号量,计数器加1

计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞

import threadingimport loggingimport timeFORMAT = '%(asctime)-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(s:threading.Semaphore):    logging.info("in sub thread")    logging.info(s.acquire())  #阻塞    logging.info("sub thread over")#信号量s = threading.Semaphore(3)logging.info(s.acquire())print(s._value)logging.info(s.acquire())print(s._value)logging.info(s.acquire())print(s._value)threading.Thread(target=worker,args=(s,)).start()time.sleep(2)logging.info(s.acquire(False))logging.info(s.acquire(timeout=3))#释放logging.info("released")s.release()结果:2021-06-24 23:33:30,508	 [MainThread,   17360] True2021-06-24 23:33:30,508	 [MainThread,   17360] True2021-06-24 23:33:30,508	 [MainThread,   17360] True2021-06-24 23:33:30,509	 [Thread-1,   12680] in sub thread2102021-06-24 23:33:32,511	 [MainThread,   17360] False2021-06-24 23:33:35,512	 [MainThread,   17360] False2021-06-24 23:33:35,512	 [MainThread,   17360] released2021-06-24 23:33:35,512	 [Thread-1,   12680] True2021-06-24 23:33:35,512	 [Thread-1,   12680] sub thread over

信号量的应用举例

连接池:因为资源有限,且开启一个连接成本高,所以,使用连接池

一个简单的连接池:连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用

class Conn:    def __init__(self,name):        self.name = nameclass Pool:    def __init__(self,count):        self.count = count        #池中是连接对象的列表        self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]            def _connect(self,conn_name):        #创建连接的方法,返回一个名称        return Conn(conn_name)        def get_conn(self):        #从池中拿走一个连接        if len(self.pool) > 0 :            return self.pool.pop()            def return_pool(self,conn:Conn):        #向池中添加一个连接        self.pool.append(conn)

真正的连接池的实现比上面的例子要复杂的多,这里只是一个简单的功能的事项

本例中,get_conn()方法,在多线程的时候,有线程安全问题:

假设池中有一个连接,有可能多个线程判断池的长度是大于0的,当一个线程拿走了连接对象,其他线程再来pop就会抛出异常,如何解决?

1、加锁,在读写的地方加锁

2、使用信号量Semapore

 

1、使用信号量对上例进行修改

import threadingimport randomimport loggingFORMAT = '%(asctime)-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)class Conn:    def __init__(self,name):        self.name = nameclass Pool:    def __init__(self,count):        self.count = count        #池中是连接对象的列表        self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]        self.semaphore = threading.Semaphore(count)    def _connect(self,conn_name):        #创建连接的方法,返回一个名称        return Conn(conn_name)    def get_conn(self):        #从池中拿走一个连接        print("~"*20)        self.semaphore.acquire()        print("="*20)        conn = self.pool.pop()        return conn    def return_pool(self,conn:Conn):        #向池中添加一个连接        self.pool.append(conn)        self.semaphore.release()pool = Pool(3)def worker(pool:Pool):    conn = pool.get_conn()    logging.info(conn)    #模拟使用了一段事件    threading.Event().wait(random.randint(1,4))    pool.return_pool(conn)for i in range(6):    threading.Thread(target=worker,name="worek-{}".format(i),args=(pool,)).start()结果:2021-06-25 00:20:59,477	 [worek-0,   23612] <__main__.Conn object at 0x02167030>2021-06-25 00:20:59,477	 [worek-1,   23128] <__main__.Conn object at 0x02164F50>2021-06-25 00:20:59,477	 [worek-2,   21476] <__main__.Conn object at 0x02164CF0>~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~====================~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~2021-06-25 00:21:00,478	 [worek-3,   23684] <__main__.Conn object at 0x02167030>====================2021-06-25 00:21:01,478	 [worek-4,   12960] <__main__.Conn object at 0x02164F50>====================2021-06-25 00:21:02,478	 [worek-5,    4208] <__main__.Conn object at 0x02164CF0>====================

上列中,使用信号量解决资源有限的问题。

如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超时资源数,请求者只能等待。当使用者 用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源

注意:这个例子不能用于生成环境,只时为了说明信号量使用的例子,还有很多未完成功能

self.pool.append(conn)这一句要不要加锁

1、从逻辑上分析

1.1、假设如果还没有使用信号量,就release,会怎样?

import threadingsema = threading.Semaphore(3)sema.release()sema.release()sema.release()print(sema.__dict__)结果:{'_cond': 
, 0)>, '_value': 6}

对上例输出结果看出,竟然内置计数器达到了4,这样实际上超出我们最大值,需要解决这个问题

BoundedSemaphore类

有界的信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常

这样用有界信号量修改源代码,保证如果多return_pool就会抛出异常

保证了多归还连接抛出异常

如果归还了同一个连接多次怎么办,去重很容易判断出来

1.2如果使用了信号量,但是还没有用完

self.pool.append(conn)self.semphore.release()

假设一种极端情况,计数器还差1就满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常

因此信号量,可以保证,一定不能多归还

1.3很多线程用完了信号量

没有获得信号量的线程都阻塞,没有线程和归还线程争抢,当append后才release,这时候才能唤醒等待的线程,才能pop,也就是没有获取信号量就不能pop,这是安全的。

经过上面的分析,信号量列表长度好,线程安全

信号量和锁

锁,只允许同一个事件一个线程独占资源,它是特殊的信号量,即信号量计数器初值为1.信号量,可以多个线程访问共享资源,但这个共享资源数量有限。

锁,可以看做特殊的信号量

数据结构和GIL

Queue

标准库queue模块,提供FIFO的Queue、LIFO的队列,优先队列。

Queue类时线程安全的,适用于多线程间安全的交换数据。内部适用了Lock和Condition。

为什么将魔术方法时,说实话容器的大小,不准确?

如果不加锁,是不可能获得准确的大小的,因为你刚读取了一个大小,还没有取走,就有可能被其他线程改了。

Queue类的size虽然加了锁,但是,依然不能立即保证立即get、put就能成功,英文读取大小和get、put方法是分开的

import queueq = queue.Queue(8)if q.qsize() ==7:    q.put() #上下两句可能被打断if q.qsize() ==1:    q.get() #未必会成功

GIL全局解释器锁

CPython在解释器进程级别有一把锁,叫做GIL全局解释器锁。

GIL保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也是如此

CPython中

IO密集型,由于线程阻塞,就会调度其他线程;

CPU密集型,当前线程可能连续的获得GIL,导致其他线程几乎无法适用CPU

在CPython中由于有GIL存在,IO密集型,适用多线程;CPU密集型,适用多线程,绕开GIL。

新版CPython正在努力优化GIL的问题,但不是移除。

如果非要适用多线程的效率问题,请绕行其他语言erlang,go等。

Python中绝大多数内置结构的读写都是原子操作。

由于GIL的存在,Python的内置数据类型在多线程编程的时候,就变成安全的了,但是实际上他们本身不是线程安全类型的。

看下面两个程序

1、单线程
 

import loggingimport datetimelogging.basicConfig(level=logging.INFO,format='%(thread)s %(message)s')start = datetime.datetime.now()#计算def calc():    sum = 0    for _ in range(1000000000):        sum +=1calc()calc()calc()calc()calc()delta = (datetime.datetime.now() -start).total_seconds()logging.info(delta)结果14760 267.097923

2、多线程

import loggingimport datetimeimport threadinglogging.basicConfig(level=logging.INFO,format='%(thread)s %(message)s')start = datetime.datetime.now()#计算def calc():    sum = 0    for _ in range(1000000000):        sum +=1t1 = threading.Thread(target=calc)t2 = threading.Thread(target=calc)t3 = threading.Thread(target=calc)t4 = threading.Thread(target=calc)t1.start()t2.start()t3.start()t4.start()t1.join()t2.join()t3.join()t4.join()delta = (datetime.datetime.now() -start).total_seconds()logging.info(delta)结果:24616 215.609224

从两端程序测试的结果来看,CPython中多线程根本没有任何优势,和一个线程执行时间相当。因为GIL的存在,尤其是像上面的计算密集型程序

转载地址:http://zoesi.baihongyu.com/

你可能感兴趣的文章
Mysql复制表以及复制数据库
查看>>
Kafka
查看>>
9.1 为我们的角色划分权限
查看>>
维吉尼亚之加解密及破解
查看>>
TCP/IP协议三次握手与四次握手流程解析
查看>>
PHP 扩展开发 : 编写一个hello world !
查看>>
inet_ntoa、 inet_aton、inet_addr
查看>>
用模板写单链表
查看>>
链表各类操作详解
查看>>
C++实现 简单 单链表
查看>>
Linux的SOCKET编程 简单演示
查看>>
Linux并发服务器编程之多线程并发服务器
查看>>
C语言内存检测
查看>>
Linux epoll模型
查看>>
Linux系统编程——线程池
查看>>
Linux C++线程池实例
查看>>
shared_ptr的一些尴尬
查看>>
C++总结8——shared_ptr和weak_ptr智能指针
查看>>
c++写时拷贝1
查看>>
Linux网络编程---I/O复用模型之poll
查看>>