前言 总觉得用了Python那么久,感觉隔一层面纱似的,只是会用而已。
现在Python更新了3.9,越来越厉害了,多了很多新的语法特性。
想想自己,也不能总是写一些Golang的东西,Python也要持续跟上,所以打算开一个Python三巨头的坑,把Python的线程,进程,协程都讲明白,这样第一个是让我加深理解,第二个是让我彻底搞懂这三者的机制,我认为这样我才可以走的更远。
在开发的过程当中,我们总是需要去提高效率,一般在Python里面,我们经常会听到所谓多线程,多进程,加上现在的协程的诸多方法去解决性能问题,那么,到底这些术语到底是个什么意思,我们什么时候用什么方法,才是最合适的呢?这才是促使我写这篇文章的原因吧。
因为这些东西,应该是Python最核心的了,搞懂这个,Python也就没什么特别难的问题了。所以,我们开始吧
1 2 3 4 5 6 7 COPY 从来不畏惧到底明天会是什么模样 活在今天告诉你生命不只一种颜色 咬牙挺住却坚信黑暗之后即是黎明 解剖生活才能看到未来的光
Python的多线程是什么 首先,线程是什么,说的简单一点,线程就是进程的儿子,一个进程可以搞出多个线程,但是一个线程只能有一个爹(进程),不允许随便乱认爹(进程)。
这就好比你看电影,电影是一个进程,里面的画面是一个线程,声音是一个线程,字幕又是一个线程,这就是线程的一种表现。
如果是系统执行进程,首先需要划分独立的进程空间/内存,但是线程就不会这么麻烦。
线程是独立执行的,并且也是并发的,占用资源也小,一个线程可以去创建/杀死另一个线程,相当于“子子孙孙无穷匮也”,并且线程之间还支持通信,可以共享其中的数据/内存等等。
上面这些都是线程的优点,但是!但是!但是!Python的线程和这个不一样 !
Python的线程其实是个假的,为啥呢,因为Python有GIL全局锁 。
不知道你们看过《无限恐怖》这本小说没有,里面有个东西叫做基因锁,因为有基因锁的存在,导致人类的各项机能被限制了,同理,GIL全局锁也是一样,它的逻辑就是,只要你用了带GIL锁的解释器,任何时候,你都只能,也只可以在同一时间执行一个线程。
带GIL锁的解释器有哪些呢?当然是我现在说的CPython啦!
CPython用C实现的,用的人也最多,所以为什么市面上的教材都说Python的线程不好使,是因为大家全都是用的CPython。
有点儿绕吧,其实意思就是,Python的多线程实际上是个伪多线程,并不是真正的多线程,实际上,无论如何,它只能保证一个线程执行。
我来描述一下Python多线程执行的逻辑
1 2 3 4 5 6 7 8 COPY 线程1获取到GIL锁 执行业务代码 线程1释放GIL锁 线程2获取到GIL锁 执行业务代码 线程2释放GIL锁 ......
这个是不是贼TM绕,我写个代码给大家伙儿瞅瞅到底这个是怎么表现出来的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 COPY import threadingimport timeexitFlag = 0 class myThread (threading.Thread) : def __init__ (self, threadID, name, counter) : threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run (self) : print ("开始线程:" + self.name) print_time(self.name, self.counter, 5 ) print ("退出线程:" + self.name) def print_time (threadName, delay, counter) : while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 thread1 = myThread(1 , "Thread-1" , 1 ) thread2 = myThread(2 , "Thread-2" , 2 ) thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主线程" )
这里用了一个继承的方法,写了一个多线程逻辑
这里输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 COPY 开始线程:Thread-1 开始线程:Thread-2 Thread-1: Tue Dec 1 09:58:21 2020 Thread-2: Tue Dec 1 09:58:22 2020 Thread-1: Tue Dec 1 09:58:22 2020 Thread-1: Tue Dec 1 09:58:23 2020 Thread-2: Tue Dec 1 09:58:24 2020 Thread-1: Tue Dec 1 09:58:24 2020 Thread-1: Tue Dec 1 09:58:25 2020 退出线程:Thread-1 Thread-2: Tue Dec 1 09:58:26 2020 Thread-2: Tue Dec 1 09:58:28 2020 Thread-2: Tue Dec 1 09:58:30 2020 退出线程:Thread-2 退出主线程
发现了没,这个,有个时间差,这两根本不是同时去运行的,这就是GIL的石锤
再看个图,这就是GIL的运行流程
整明白了嘛?
但是你会说,哎呀,我没感觉到缓慢阿,害挺快的,是,这就是我下面要聊的,它到底用了什么技术,才能让我们感觉到挺快的。
Python的多线程在代码部分是怎么实现的? 又涉及到看源码的东西了,这里首先追踪一把,threading的代码放置在
1 COPY https://github.com/python/cpython/blob/3.9/Lib/threading.py
先从入口的Start追踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 COPY def start (self) : """Start the thread's activity. It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control. This method will raise a RuntimeError if called more than once on the same thread object. """ if not self._initialized: raise RuntimeError("thread.__init__() not called" ) if self._started.is_set(): raise RuntimeError("threads can only be started once" ) with _active_limbo_lock: _limbo[self] = self try : _start_new_thread(self._bootstrap, ()) except Exception: with _active_limbo_lock: del _limbo[self] raise self._started.wait()
其实threading的代码写的很好,已经大概把流程和步骤都说明白了,涉及到lock的部分是C写的, 首先追踪溯源,看一下启动线程的C代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 COPY void PyEval_InitThreads (void ) { if (gil_created()) return ; create_gil(); take_gil(PyThreadState_GET()); main_thread = PyThread_get_thread_ident(); if (!pending_lock) pending_lock = PyThread_allocate_lock(); }
可以看到 核心的GIL代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 COPY static void take_gil (PyThreadState *tstate) { MUTEX_LOCK(gil_mutex); if (!_Py_atomic_load_relaxed(&gil_locked)) goto _ready; while (_Py_atomic_load_relaxed(&gil_locked)) { int timed_out = 0 ; unsigned long saved_switchnum; saved_switchnum = gil_switch_number; COND_TIMED_WAIT(gil_cond, gil_mutex, INTERVAL, timed_out); if (timed_out && _Py_atomic_load_relaxed(&gil_locked) && gil_switch_number == saved_switchnum) { SET_GIL_DROP_REQUEST(); } } _ready: _Py_atomic_store_relaxed(&gil_locked, 1 ); _Py_ANNOTATE_RWLOCK_ACQUIRED(&gil_locked, 1 ); if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil_last_holder)) { _Py_atomic_store_relaxed(&gil_last_holder, (uintptr_t )tstate); ++gil_switch_number; } if (_Py_atomic_load_relaxed(&gil_drop_request)) { RESET_GIL_DROP_REQUEST(); } if (tstate->async_exc != NULL ) { _PyEval_SignalAsyncExc(); } MUTEX_UNLOCK(gil_mutex); }
这里实现了互斥锁gil_mutex,如果GIL被占用,那么将持续等待,超时后将修改重置变量。
这点代码给我看的累死了。
回到本段开始的问题,为什么GIL有时候感觉不到慢?
首先,GIL类似一个信号锁,意思就像是尚方宝剑,持有它的线程告诉其他线程,都不许动阿,我现在正用着呢,你们要么等着我主动释放,要么等我超时了自己释放,然后继续竞争切换持有GIL的线程。
核心的意思就是,多线程的好处在于,阻塞并不会影响其他的线程,因为阻塞的线程持有的GIL马上就被释放了,其他线程可以接力马上干活儿,不会出现阻塞的情况。
这里可以看一下切换线程的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 COPY PyObject * _PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag){ ... for (;;) { if (_Py_atomic_load_relaxed(&eval_breaker)) { if (_Py_OPCODE(*next_instr) == SETUP_FINALLY || _Py_OPCODE(*next_instr) == YIELD_FROM) { goto fast_next_opcode; } if (_Py_atomic_load_relaxed(&pendingcalls_to_do)) { if (Py_MakePendingCalls() < 0 ) goto error; } if (_Py_atomic_load_relaxed(&gil_drop_request)) { if (PyThreadState_Swap(NULL ) != tstate) Py_FatalError("ceval: tstate mix-up" ); drop_gil(tstate); take_gil(tstate); if (_Py_Finalizing && _Py_Finalizing != tstate) { drop_gil(tstate); PyThread_exit_thread(); } if (PyThreadState_Swap(tstate) != NULL ) Py_FatalError("ceval: orphan tstate" ); } } ... } }
当检测到 eval_breaker、gil_drop_request 时,会被动的释放 GIL,跟其他线程一起再次竞争 GIL,所以它几乎没有阻塞,虽然这狗GIL把你给锁住了,但是也保证了效率,当某个线程执行时间过长,可以迅速切换下一个线程调用。
类似这样
如何绕过GIL锁? 如何绕过GIL锁,这个也是老生常谈的问题了。
在我这里的话,如果是我,涉及到需要绕过的地方
我会直接写C代码,用CPython去调用
我会用JPython
我会用Golang
大概就这么几个方法,没别的方法了,再也不用看了
Python多线程适用于哪些场景? 前面说了,Python多线程并不是真正意义的多线程,其实是个假的,但是还是有很多老哥不遗余力的推荐它,到底为啥,上一节也说得很清楚了,GIL虽然恶心,但是人家还是支持自动切换比较慢的阻塞线程的,不会影响其他线程运行,所以,你品一下,在咱们的开发岁月中,多线程,到底适用于哪些场景?
首先,发生阻塞的是什么情况,网络的连接时间过长,或者是处理多个业务,读取多个文件,访问多个网页等等。
网上很多文章都说: I/O 密集场景,多线程最合适
你抄我,我抄你,抄到最后就这么一句话,讲真,我看了那么多文章,真的没几个说清楚的,举个例子的都没。
那行嘛,那我来举例子吧。
首先,I/O密集型,指的是涉及到网络、磁盘IO的任务,现在互联网的web大部分都是IO密集的场景
举几个常用的自用例子吧
网络爬虫,这个都写烂了
web应用,例如多用户访问,多用户登录,多用户下载
数据库写入,Mysql/Es等等
RPC框架
大概就这些常用的,Python多线程,大概就这些比较合适。
Python多线程的几个基础用例 说了那么多,最后还是加几个基础的用例,拿去举一反三,你们都是聪明人
基础的多线程框架 这个基础的多线程框架包含的功能
启动/停止线程
展示线程的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 COPY import threadingimport timeexitFlag = 0 class myThread (threading.Thread) : def __init__ (self, threadID, name, counter) : threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run (self) : print ("开始线程:" + self.name) print_time(self.name, self.counter, 5 ) print ("退出线程:" + self.name) def print_time (threadName, delay, counter) : while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 thread1 = myThread(1 , "Thread-1" , 1 ) thread2 = myThread(2 , "Thread-2" , 2 ) thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主线程" )
多线程的线程通信框架(利用队列) 有些时候我们需要在线程之间交换信息和数据,所以多线程之间需要互相通信
多线程的线程通信框架包含的功能
利用队列共享数据(queue)
创建一个生产者消费者模型,启动生产者,消费者线程
这种框架同样适用于爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 COPY import threadingimport timefrom queue import Queuequeue = Queue(20 ) def Producer () : i = 0 print("开始线程" ) while True : i = i + 1 print("生产数据" , i, "现有数据" , queue.qsize()) time.sleep(1 ) queue.put(i) def Consumer () : while True : i = queue.get() time.sleep(0.5 ) print("消费数据" , i) if __name__ == "__main__" : Th1 = threading.Thread(target=Producer, ) Th2 = threading.Thread(target=Consumer, ) Th2.start() Th1.start() Th1.join() Th2.join()
大概就是这样,这个害挺简单的。
线程锁框架 当需要修改共享数据的时候,多个线程会造成冲突,所以对执行的部分进行加锁很有必要,这里采用互斥锁逻辑
线程池加锁框架大概实现的功能
在程序中加锁,避免竞争冲突
这部分代码我懒得写了,直接用了https://www.cnblogs.com/tashanzhishi/p/10775641.html
这位老兄的代码写的很好,看一遍就懂了,感恩!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 COPY import threadingimport timecount = 0 def add_num () : global count if lock.acquire(): tmp = count time.sleep(0.001 ) count = tmp + 1 lock.release() def run (add_fun) : global count thread_list = [] for i in range(100 ): t = threading.Thread(target=add_fun) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) if __name__ == '__main__' : lock = threading.Lock() run(add_num)
线程池框架 线程池相当于冰箱里的啤酒
你只要想喝打开冰箱拿就行
你不喝人家也不会跑
就在冰箱里,不吵不闹
等待你的下一次临幸
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 COPY from socket import AF_INET, SOCK_STREAM, socketfrom concurrent.futures import ThreadPoolExecutordef echo_client (sock, client_addr) : ''' Handle a client connection ''' print('Got connection from' , client_addr) while True : msg = sock.recv(65536 ) if not msg: break sock.sendall(msg) print('Client closed connection' ) sock.close() def echo_server (addr) : pool = ThreadPoolExecutor(128 ) sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5 ) while True : client_sock, client_addr = sock.accept() pool.submit(echo_client, client_sock, client_addr) echo_server(('' ,15000 ))
结尾 线程这块,基本核心的东西都弄完了,其实梳理完,觉得还好,最起码我弄明白人家是干嘛的了,以后碰到任何线程的问题,我也不害怕了,这是12月第一篇blog,我要坚持三四个月,直到黎明的到来。
end。