python中的线程小结

单线程

1
2
3
4
5
6
7
8
def say_hello(name):
print("hello world! %d" % name)
time.sleep(3)


if __name__ == '__main__':
for i in range(5):
say_hello(i)

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time


def say_hello(name):
print("hello world! %d" % name)
time.sleep(3)


if __name__ == '__main__':
start_time = time.time()
t_list = []
for i in range(1000):
t = threading.Thread(target=say_hello, args=(i,))
t_list.append(t)

for t_thread in t_list:
t_thread.start()

for t_thread in t_list:
t_thread.join()

end_time = time.time()
print("time used %0.2f" % (end_time - start_time))

查看线程数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
from time import sleep, ctime


def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)


def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)


if __name__ == '__main__':
print('---开始---:', ctime())

t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)

t1.start()
t2.start()

while True:
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
if length <= 1:
break

sleep(0.5)

线程执行代码的封装

通过使用threading模块能完成多任务的程序开发,为了让每个线程的封装性更完美,所以使用threading模块时,往往会定义一个新的子类class,只要继承threading.Thread就可以了,然后重写run方法.

1
2
3
4
5
6
7
8
9
10
11
class MyThread(threading.Thread):

def run(self):
for i in range(5):
print("hello my custom thread %d" % i)
sleep(5)


if __name__ == '__main__':
t = MyThread()
t.start()

python的threading.Thread类有一个run方法,用于定义线程的功能函数,可以在自己的线程类中覆盖该方法。而创建自己的线程实例后,通过Thread类的start方法,可以启动该线程,交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyThread(threading.Thread):

def run(self):
for i in range(3):
msg = "I'm " + self.name + ' @ ' + str(i)
print(msg)
sleep(2)


if __name__ == '__main__':
for i in range(5):
t = MyThread()
t.start()

从代码和执行结果我们可以看出,多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。

  • 每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
  • 当线程的run()方法结束时该线程完成。
  • 无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
  • 线程的几种状态

avatar

多线程-共享全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
num = 1


class MyThread(threading.Thread):

def run(self):
global num
num += 1
print("I'm " + self.name + ' @ ' + str(i) + 'num=' + str(num))


if __name__ == '__main__':
for i in range(5):
t = MyThread()
t.start()

同步

多线程开发可能遇到的问题

假设两个线程t1和t2都要对num=0进行增1运算,t1和t2都各对num修改10次,num的最终的结果应该为20。

但是由于是多线程访问,有可能出现下面情况:

在num=0时,t1取得num=0。此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也获得num=0。然后t2对得到的值进行加1并赋给num,使得num=1。然后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给num。这样,明明t1和t2都完成了1次加1工作,但结果仍然是num=1。

什么是同步

如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。

解决问题的思路

  1. 系统调用t1,然后获取到num的值为0,此时上一把锁,即不允许其他现在操作num
  2. 对num的值进行+1
  3. 解锁,此时num的值为1,其他的线程就可以使用num了,而且是num的值不是0而是1
  4. 同理其他线程在对num进行修改时,都要先上锁,处理完后再解锁,在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time

num = 0


class MyThread(threading.Thread):
def run(self):
global num

num = num + 1
time.sleep(0.5) # 用来模拟适当的数据处理
print(self.name + ' set num to ' + str(num))


def test():
for i in range(5):
t = MyThread()
t.start()


if __name__ == '__main__':
test()

问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。

互斥锁

当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制

线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。

互斥锁为资源引入一个状态:锁定/非锁定。

某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

1
2
3
4
5
6
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([blocking])
#释放
mutex.release()

其中,锁定方法acquire可以有一个blocking参数。

  • 如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)
  • 如果设定blocking为False,则当前线程不会堵塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time

num = 0

lock = threading.Lock()


class MyThread(threading.Thread):
def run(self):
global num
mutex_flag = lock.acquire(True) # True表示堵塞
print('---线程(%s)的锁状态为%d---' % (self.name, mutex_flag))
if mutex_flag:
num = num + 1
time.sleep(0.5) # 用来模拟适当的数据处理
print(self.name + ' set num to ' + str(num))
lock.release()


def test():
for i in range(5):
t = MyThread()
t.start()


if __name__ == '__main__':
test()

上锁解锁过程

当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。

每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。

线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

优势和劣势对比

锁的好处:

确保了某段关键代码只能由一个线程从头到尾完整地执行

锁的坏处:

阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁

多线程-非共享数据

在多线程开发中,全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

class MyThread(threading.Thread):

def __init__(self, num, sleep_time):
threading.Thread.__init__(self)
self.num = num
self.sleep_time = sleep_time

def run(self):
self.num += 1
time.sleep(self.sleep_time)
print('线程(%s),num=%d' % (self.name, self.num))


if __name__ == '__main__':
mutex = threading.Lock()
t1 = MyThread(100, 5)
t1.start()
t2 = MyThread(200, 1)
t2.start()

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()


class MyThread1(threading.Thread):

def run(self):
with lock_a:
print(self.name + '----do1---up----')
time.sleep(1)
with lock_b:
print(self.name + '----do1---down----')


class MyThread2(threading.Thread):

def run(self):
with lock_b:
print(self.name + '----do2---up----')
time.sleep(1)
with lock_a:
print(self.name + '----do2---down----')


if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()

同步应用

可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()
lock_b.acquire()
lock_c = threading.Lock()
lock_c.acquire()


class TaskA(threading.Thread):

def run(self):
while True:
if lock_a.acquire():
print("-----Task A------")
time.sleep(1)
lock_b.release()


class TaskB(threading.Thread):

def run(self):
while True:
if lock_b.acquire():
print("-----Task B------")
time.sleep(1)
lock_c.release()


class TaskC(threading.Thread):

def run(self):
while True:
if lock_c.acquire():
print("-----Task C------")
time.sleep(1)
lock_a.release()


a = TaskA()
b = TaskB()
c = TaskC()

a.start()
b.start()
c.start()

线程同步之条件变量

互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持。

Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。

线程首先acquire一个条件变量,然后判断一些条件。

  • 如果条件不满足则wait;
  • 如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from click._compat import raw_input

con = threading.Condition()


class A(threading.Thread):
def run(self):
while True:
if con.acquire():
print('---A---1---')
con.wait()
print('---A---2---')
con.release()
time.sleep(1)


class B(threading.Thread):
def run(self):
while True:
if con.acquire():
raw_input('输入任意字符:')
con.notify()
con.release()
time.sleep(1)


if __name__ == '__main__':
a = A()
a.start()

b = B()
b.start()

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

生产者与消费者-条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import threading
import time


class MyThread(threading.Thread):

def __init__(self, func, args=()):
super(threading.Thread.__init__(self))
self.func = func
self.args = args

def run(self):
self.result = self.func(*self.args)

def get_result(self):
try:
return self.result
except:
return None


class Producer(threading.Thread):

def run(self):
global count

while True:
if con.acquire():
if count > 1000:
con.wait()
else:
count += 100
print(self.name + ' produce 100, count=' + str(count))
con.notify()
con.release()
time.sleep(1)


class Consumer(threading.Thread):

def run(self):
global count

while True:
if con.acquire():
if count < 100:
con.wait()
else:
count -= 50
print(self.name + ' consume 50, count=' + str(count))
con.notify()
con.release()
time.sleep(1)


if __name__ == '__main__':
count = 500
con = threading.Condition()

for i in range(2):
p = Producer()
p.start()

for i in range(5):
c = Consumer()
c.start()

ThreadLocal

全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading

local_test = threading.local()


def show():
print("Hello world %s in thread %s " % (local_test.name, threading.current_thread().name))


def process_param(name):
local_test.name = name
show()


if __name__ == '__main__':
t1 = threading.Thread(target=process_param, args=("A name",), name="ThreadA")
t2 = threading.Thread(target=process_param, args=("B name",), name="ThreadB")
t1.start()
t2.start()

线程池

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Executor 提供了如下常用方法:

  • submit(fn, args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

1
2
3
4
5
6
7
cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
cancelled():返回 Future 代表的线程任务是否被成功取消。
running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

使用线程池来执行线程任务的步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')

此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (50, 100, 150))
print('--------------')
for r in results:
print(r)

参考文章

  1. http://c.biancheng.net/view/2630.html