完全搞懂Python的线程机制

前言

总觉得用了Python那么久,感觉隔一层面纱似的,只是会用而已。

现在Python更新了3.9,越来越厉害了,多了很多新的语法特性。

想想自己,也不能总是写一些Golang的东西,Python也要持续跟上,所以打算开一个Python三巨头的坑,把Python的线程,进程,协程都讲明白,这样第一个是让我加深理解,第二个是让我彻底搞懂这三者的机制,我认为这样我才可以走的更远。

在开发的过程当中,我们总是需要去提高效率,一般在Python里面,我们经常会听到所谓多线程,多进程,加上现在的协程的诸多方法去解决性能问题,那么,到底这些术语到底是个什么意思,我们什么时候用什么方法,才是最合适的呢?这才是促使我写这篇文章的原因吧。

因为这些东西,应该是Python最核心的了,搞懂这个,Python也就没什么特别难的问题了。所以,我们开始吧

1
2
3
4
5
6
7
从来不畏惧到底明天会是什么模样

活在今天告诉你生命不只一种颜色

咬牙挺住却坚信黑暗之后即是黎明

解剖生活才能看到未来的光

Python的多线程是什么

首先,线程是什么,说的简单一点,线程就是进程的儿子,一个进程可以搞出多个线程,但是一个线程只能有一个爹(进程),不允许随便乱认爹(进程)。

这就好比你看电影,电影是一个进程,里面的画面是一个线程,声音是一个线程,字幕又是一个线程,这就是线程的一种表现。

如果是系统执行进程,首先需要划分独立的进程空间/内存,但是线程就不会这么麻烦。

线程是独立执行的,并且也是并发的,占用资源也小,一个线程可以去创建/杀死另一个线程,相当于“子子孙孙无穷匮也”,并且线程之间还支持通信,可以共享其中的数据/内存等等。

上面这些都是线程的优点,但是!但是!但是!Python的线程和这个不一样

Python的线程其实是个假的,为啥呢,因为Python有GIL全局锁

不知道你们看过《无限恐怖》这本小说没有,里面有个东西叫做基因锁,因为有基因锁的存在,导致人类的各项机能被限制了,同理,GIL全局锁也是一样,它的逻辑就是,只要你用了带GIL锁的解释器,任何时候,你都只能,也只可以在同一时间执行一个线程。

带GIL锁的解释器有哪些呢?当然是我现在说的CPython啦!

CPython用C实现的,用的人也最多,所以为什么市面上的教材都说Python的线程不好使,是因为大家全都是用的CPython。

有点儿绕吧,其实意思就是,Python的多线程实际上是个伪多线程,并不是真正的多线程,实际上,无论如何,它只能保证一个线程执行。

我来描述一下Python多线程执行的逻辑

1
2
3
4
5
6
7
8
线程1获取到GIL锁
执行业务代码
线程1释放GIL锁

线程2获取到GIL锁
执行业务代码
线程2释放GIL锁
......

这个是不是贼TM绕,我写个代码给大家伙儿瞅瞅到底这个是怎么表现出来的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time

exitFlag = 0

# 继承的方法写一个多线程逻辑
class myThread (threading.Thread):

def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter

def run(self):
print ("开始线程:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出线程:" + self.name)

def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
# 这里模拟一个阻塞
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()

# join等待
thread1.join()
thread2.join()
print ("退出主线程")

这里用了一个继承的方法,写了一个多线程逻辑

这里输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
开始线程:Thread-1
开始线程:Thread-2
Thread-1: Tue Dec 1 09:58:21 2020
Thread-2: Tue Dec 1 09:58:22 2020
Thread-1: Tue Dec 1 09:58:22 2020
Thread-1: Tue Dec 1 09:58:23 2020
Thread-2: Tue Dec 1 09:58:24 2020
Thread-1: Tue Dec 1 09:58:24 2020
Thread-1: Tue Dec 1 09:58:25 2020
退出线程:Thread-1
Thread-2: Tue Dec 1 09:58:26 2020
Thread-2: Tue Dec 1 09:58:28 2020
Thread-2: Tue Dec 1 09:58:30 2020
退出线程:Thread-2
退出主线程

发现了没,这个,有个时间差,这两根本不是同时去运行的,这就是GIL的石锤

再看个图,这就是GIL的运行流程

整明白了嘛?

但是你会说,哎呀,我没感觉到缓慢阿,害挺快的,是,这就是我下面要聊的,它到底用了什么技术,才能让我们感觉到挺快的。

Python的多线程在代码部分是怎么实现的?

又涉及到看源码的东西了,这里首先追踪一把,threading的代码放置在

1
https://github.com/python/cpython/blob/3.9/Lib/threading.py

先从入口的Start追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def start(self):
"""Start the thread's activity.

It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the
same thread object.

"""
# 线程是否初始化(这里就是刚那个init函数
if not self._initialized:
raise RuntimeError("thread.__init__() not called")

# 设置线程的开始状态(把线程状态置为start
if self._started.is_set():
raise RuntimeError("threads can only be started once")

# 加锁,调用GIL
with _active_limbo_lock:
_limbo[self] = self

# 去启动新的线程
try:
_start_new_thread(self._bootstrap, ())
# 如果启动出了问题
except Exception:
# 把锁给del了
with _active_limbo_lock:
del _limbo[self]
raise
# 释放锁,阻塞。直到被唤醒/超时
self._started.wait()

其实threading的代码写的很好,已经大概把流程和步骤都说明白了,涉及到lock的部分是C写的, 首先追踪溯源,看一下启动线程的C代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void PyEval_InitThreads(void)
{
if (gil_created())
return;
// 创建GIL锁
create_gil();
// 申请GIL锁
take_gil(PyThreadState_GET());
// 主线程
main_thread = PyThread_get_thread_ident();
// 如果没有等待锁
if (!pending_lock)
// 创建等待锁
pending_lock = PyThread_allocate_lock();
}

可以看到
核心的GIL代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static void take_gil(PyThreadState *tstate)
{
MUTEX_LOCK(gil_mutex); // 加锁

if (!_Py_atomic_load_relaxed(&gil_locked))
// 获取 GIL,若已释放,直接获取,跳转
goto _ready;

while (_Py_atomic_load_relaxed(&gil_locked)) {
// GIL 未释放
int timed_out = 0;
unsigned long saved_switchnum;
// 记录切换次数
saved_switchnum = gil_switch_number;
// 利用 pthread_cond_tim 阻塞等待超时
COND_TIMED_WAIT(gil_cond, gil_mutex, INTERVAL, timed_out);
/* 等待超时,仍未释放, 发送释放请求信号 */
if (timed_out &&
_Py_atomic_load_relaxed(&gil_locked) &&
gil_switch_number == saved_switchnum) {
// 设置 gil_drop_request=1,eval_breaker=1
// 释放信号
SET_GIL_DROP_REQUEST();
}
}
_ready:
/* We now hold the GIL */
_Py_atomic_store_relaxed(&gil_locked, 1); // 设置 GIL 占用
_Py_ANNOTATE_RWLOCK_ACQUIRED(&gil_locked, /*is_write=*/1);

if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil_last_holder)) {
_Py_atomic_store_relaxed(&gil_last_holder, (uintptr_t)tstate);
++gil_switch_number;
}

if (_Py_atomic_load_relaxed(&gil_drop_request)) {
// 重置 gil_drop_request=0
RESET_GIL_DROP_REQUEST();
}
if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc();
}

MUTEX_UNLOCK(gil_mutex); // 解锁
}

这里实现了互斥锁gil_mutex,如果GIL被占用,那么将持续等待,超时后将修改重置变量。

这点代码给我看的累死了。

回到本段开始的问题,为什么GIL有时候感觉不到慢?

首先,GIL类似一个信号锁,意思就像是尚方宝剑,持有它的线程告诉其他线程,都不许动阿,我现在正用着呢,你们要么等着我主动释放,要么等我超时了自己释放,然后继续竞争切换持有GIL的线程。

核心的意思就是,多线程的好处在于,阻塞并不会影响其他的线程,因为阻塞的线程持有的GIL马上就被释放了,其他线程可以接力马上干活儿,不会出现阻塞的情况。

这里可以看一下切换线程的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
PyObject *
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag){
...
for (;;) {
if (_Py_atomic_load_relaxed(&eval_breaker)) {
if (_Py_OPCODE(*next_instr) == SETUP_FINALLY ||
_Py_OPCODE(*next_instr) == YIELD_FROM) {
goto fast_next_opcode;
}
if (_Py_atomic_load_relaxed(&pendingcalls_to_do)) {
if (Py_MakePendingCalls() < 0)
goto error;
}

// 如果检测到gil_drop_request,释放GIL
if (_Py_atomic_load_relaxed(&gil_drop_request)) {
/* Give another thread a chance */
if (PyThreadState_Swap(NULL) != tstate)
Py_FatalError("ceval: tstate mix-up");
drop_gil(tstate);

// 继续去调用GIL抢占
take_gil(tstate);

// 检查是否退出线程
if (_Py_Finalizing && _Py_Finalizing != tstate) {
drop_gil(tstate);
PyThread_exit_thread();
}

if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError("ceval: orphan tstate");
}
}
...
}
}

当检测到 eval_breaker、gil_drop_request 时,会被动的释放 GIL,跟其他线程一起再次竞争 GIL,所以它几乎没有阻塞,虽然这狗GIL把你给锁住了,但是也保证了效率,当某个线程执行时间过长,可以迅速切换下一个线程调用。

类似这样

如何绕过GIL锁?

如何绕过GIL锁,这个也是老生常谈的问题了。

在我这里的话,如果是我,涉及到需要绕过的地方

  1. 我会直接写C代码,用CPython去调用
  2. 我会用JPython
  3. 我会用Golang

大概就这么几个方法,没别的方法了,再也不用看了

Python多线程适用于哪些场景?

前面说了,Python多线程并不是真正意义的多线程,其实是个假的,但是还是有很多老哥不遗余力的推荐它,到底为啥,上一节也说得很清楚了,GIL虽然恶心,但是人家还是支持自动切换比较慢的阻塞线程的,不会影响其他线程运行,所以,你品一下,在咱们的开发岁月中,多线程,到底适用于哪些场景?

首先,发生阻塞的是什么情况,网络的连接时间过长,或者是处理多个业务,读取多个文件,访问多个网页等等。

网上很多文章都说: I/O 密集场景,多线程最合适

你抄我,我抄你,抄到最后就这么一句话,讲真,我看了那么多文章,真的没几个说清楚的,举个例子的都没。

那行嘛,那我来举例子吧。

首先,I/O密集型,指的是涉及到网络、磁盘IO的任务,现在互联网的web大部分都是IO密集的场景

举几个常用的自用例子吧

  1. 网络爬虫,这个都写烂了
  2. web应用,例如多用户访问,多用户登录,多用户下载
  3. 数据库写入,Mysql/Es等等
  4. RPC框架

大概就这些常用的,Python多线程,大概就这些比较合适。

Python多线程的几个基础用例

说了那么多,最后还是加几个基础的用例,拿去举一反三,你们都是聪明人

基础的多线程框架

这个基础的多线程框架包含的功能

  1. 启动/停止线程
  2. 展示线程的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

exitFlag = 0

# 利用继承的方法
class myThread (threading.Thread):

def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter

def run(self):
print ("开始线程:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出线程:" + self.name)

def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print ("退出主线程")

多线程的线程通信框架(利用队列)

有些时候我们需要在线程之间交换信息和数据,所以多线程之间需要互相通信

多线程的线程通信框架包含的功能

  1. 利用队列共享数据(queue)
  2. 创建一个生产者消费者模型,启动生产者,消费者线程

这种框架同样适用于爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import time
from queue import Queue


queue = Queue(20)


# 生产者
def Producer():
i = 0
print("开始线程")
while True:
i = i + 1
print("生产数据", i, "现有数据", queue.qsize())
time.sleep(1)
queue.put(i)

# 消费者
def Consumer():
while True:
i = queue.get()
time.sleep(0.5)
print("消费数据", i)


if __name__ == "__main__":
Th1 = threading.Thread(target=Producer, )

Th2 = threading.Thread(target=Consumer, )

Th2.start()
Th1.start()

Th1.join()
Th2.join()

大概就是这样,这个害挺简单的。

线程锁框架

当需要修改共享数据的时候,多个线程会造成冲突,所以对执行的部分进行加锁很有必要,这里采用互斥锁逻辑

线程池加锁框架大概实现的功能

  1. 在程序中加锁,避免竞争冲突

这部分代码我懒得写了,直接用了
https://www.cnblogs.com/tashanzhishi/p/10775641.html

这位老兄的代码写的很好,看一遍就懂了,感恩!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

count = 0


# 做一个累加的函数
def add_num():
global count
if lock.acquire(): # 获得锁,并返回True
tmp = count
time.sleep(0.001)
count = tmp + 1
lock.release() # 执行完释放锁


def run(add_fun):
global count
thread_list = []

# 累加100次
for i in range(100):
t = threading.Thread(target=add_fun)
t.start()
thread_list.append(t)

# 等待线程执行完
for j in thread_list:
j.join()

print(count)


if __name__ == '__main__':
# 添加全局锁
lock = threading.Lock()
# 执行函数
run(add_num)

线程池框架

线程池相当于冰箱里的啤酒

你只要想喝打开冰箱拿就行

你不喝人家也不会跑

就在冰箱里,不吵不闹

等待你的下一次临幸

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor


# 假装跑一个服务器client
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()


# 假装跑一个server
def echo_server(addr):
# 定义一个线程池
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
# 从池子里拿线程消费
pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

结尾

线程这块,基本核心的东西都弄完了,其实梳理完,觉得还好,最起码我弄明白人家是干嘛的了,以后碰到任何线程的问题,我也不害怕了,这是12月第一篇blog,我要坚持三四个月,直到黎明的到来。

end。

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • © 2019-2024 Yemilice lau
  • Powered by Hexo Theme Ayer
  • PV: UV:

觉得帮到你了么?赏我点儿~

支付宝
微信