多线程操作在涉及网络编程常常用到,学习一下。本文使用threading

常用函数

1
2
3
4
5
6
7
8
9
10
import threading

print(threading.current_thread())
print(threading.active_count())
print(threading.main_thread().name)

#<_MainThread(MainThread, started 4514921920)>
#1
#MainThread

创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time

def thread_body():
t = threading.current_thread()
for i in range(3):
print("线程{0}运行第{1}次".format(t.name, i))
time.sleep(1)
print("线程" + t.name + "运行完成")

def main():
t1 = threading.Thread(target=thread_body)
t1.start()

t2 = threading.Thread(target=thread_body)
t2.start()

if __name__ == "__main__":
main()


"""线程Thread-1运行第0次
线程Thread-2运行第0次
线程Thread-1运行第1次
线程Thread-2运行第1次
线程Thread-1运行第2次
线程Thread-2运行第2次
线程Thread-1运行完成
线程Thread-2运行完成
"""

或者修改run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time

class MyThread(threading.Thread):
def __init__(self, name=None):
super().__init__(name=name)

def run(self):
t = threading.current_thread()
for i in range(3):
print("线程{0}运行第{1}次".format(t.name, i))
time.sleep(1)
print("线程" + t.name + "运行完成")


def main():
t1 = MyThread()
t1.start()

t2 = MyThread()
t2.start()

if __name__ == "__main__":
main()

"""线程Thread-1运行第0次
线程Thread-2运行第0次
线程Thread-1运行第1次
线程Thread-2运行第1次
线程Thread-1运行第2次
线程Thread-2运行第2次
线程Thread-1运行完成
线程Thread-2运行完成
"""

线程管理

等待线程结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# coding=utf-8

import threading
import time

value = 0

def thread_body():
global value
# 声明全局变量
t = threading.current_thread()
for i in range(3):
print("线程{0}运行第{1}次".format(t.name, i))
value += 1
# 进行加一
time.sleep(1)
print("线程" + t.name + "运行完成")


def main():
print("主线程开始...")

t1 = threading.Thread(target=thread_body)
t1.start()
t1.join()

print("主线程结束, value为 " + str(value))


if __name__ == "__main__":
main()

"""主线程开始...
线程Thread-1运行第0次
线程Thread-1运行第1次
线程Thread-1运行第2次
线程Thread-1运行完成
主线程结束, value为 3
"""

# 注释掉join输出如下
"""主线程开始...
线程Thread-1运行第0次
主线程结束, value为 1
线程Thread-1运行第1次
线程Thread-1运行第2次
线程Thread-1运行完成
"""

线程停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

isRunning = True

def thread_body():
global isRunning
while isRunning:
print("下载中...")
time.sleep(5)
print("下载完成")


def main():
t1 = threading.Thread(target=thread_body)
t1.start()

command = input('输入y停止: ')
if command == 'y':
global isRunning
isRunning = False



if __name__ == "__main__":
main()

"""下载中...
输入y停止: 下载中...
y
下载完成
"""

线程安全

数据共享

使用lock互斥锁保证售出过程中只有一个线程运行,主要使用threading的lock对象

  • lock.acquire()锁定
  • lock.release()释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import threading
import time


class TicketDB:
def __init__(self):
# 机票的数量
self.ticket_count = 5

# 获得当前机票数量
def get_ticket_count(self):
return self.ticket_count

# 销售机票
def sell_ticket(self):
# TODO 等于用户付款
# 线程休眠,阻塞当前线程,模拟等待用户付款
time.sleep(1)
print("第{0}号票,已经售出".format(self.ticket_count))
self.ticket_count -= 1


# 创建TicketDB对象
db = TicketDB()
# 创建Lock对象
lock = threading.Lock()


# 线程体1函数
def thread1_body():
global db, lock # 声明为全局变量
while True:
lock.acquire()
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket()
else:
lock.release()
# 无票退出
break
lock.release()
time.sleep(1)


# 线程体2函数
def thread2_body():
global db, lock # 声明为全局变量
while True:
lock.acquire()
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket()
else:
lock.release()
# 无票退出
break
lock.release()
time.sleep(1)


# 主函数
def main():
# 创建线程对象t1
t1 = threading.Thread(target=thread1_body)
# 启动线程t1
t1.start()
# 创建线程对象t2
t2 = threading.Thread(target=thread2_body)
# 启动线程t2
t2.start()


if __name__ == '__main__':
main()

"""第5号票,已经售出
第4号票,已经售出
第3号票,已经售出
第2号票,已经售出
第1号票,已经售出
"""

线程间通信

使用Condition
  • wait(timeout=None) 阻塞等待被唤醒或者超时
  • notify() 唤醒相同条件变量的一个线程
  • notify_all() 唤醒相同条件变量的所有线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import threading
import time

# 创建条件变量对象
condition = threading.Condition()


class Stack:
def __init__(self):
# 堆栈指针初始值为0
self.pointer = 0
# 堆栈有5个数字的空间
self.data = [-1, -1, -1, -1, -1]

# 压栈方法
def push(self, c):
global condition
condition.acquire()
# 堆栈已满,不能压栈
while self.pointer == len(self.data):
# 等待其它线程把数据出栈
condition.wait()
# 通知其他线程把数据出栈
condition.notify()
# 数据压栈
self.data[self.pointer] = c
# 指针向上移动
self.pointer += 1
condition.release()

# 出栈方法
def pop(self):
global condition
condition.acquire()
# 堆栈无数据,不能出栈
while self.pointer == 0:
# 等待其他线程把数据压栈
condition.wait()
# 通知其他线程压栈
condition.notify()
# 指针向下移动
self.pointer -= 1
data = self.data[self.pointer]
condition.release()
# 数据出栈
return data


# 创建堆栈Stack对象
stack = Stack()


# 生产者线程体函数
def producer_thread_body():
global stack # 声明为全局变量
# 产生10个数字
for i in range(0, 10):
# 把数字压栈
stack.push(i)
# 打印数字
print('生产:{0}'.format(i))
# 每产生一个数字线程就睡眠
time.sleep(1)


# 消费者线程体函数
def consumer_thread_body():
global stack # 声明为全局变量
# 从堆栈中读取数字
for i in range(0, 10):
# 从堆栈中读取数字
x = stack.pop()
# 打印数字
print('消费:{0}'.format(x))
# 每消费一个数字线程就睡眠
time.sleep(1)


# 主函数
def main():
# 创建生产者线程对象producer
producer = threading.Thread(target=producer_thread_body)
# 启动生产者线程
producer.start()
# 创建消费者线程对象consumer
consumer = threading.Thread(target=consumer_thread_body)
# 启动消费者线程
consumer.start()


if __name__ == '__main__':
main()

"""生产:0
消费:0
生产:1
消费:1
生产:2
消费:2
生产:3
消费:3
生产:4
消费:4
生产:5
消费:5
生产:6
消费:6
生产:7
消费:7
生产:8
消费:8
生产:9
消费:9
"""

可见消费和生产是同步的

使用Event
  • wait(timeout=None) 阻塞进程
  • set() 使等待状态的进程恢复运行
  • 不需要互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import threading
import time

event = threading.Event()


class Stack:
def __init__(self):
# 堆栈指针初始值为0
self.pointer = 0
# 堆栈有5个数字的空间
self.data = [-1, -1, -1, -1, -1]

# 压栈方法
def push(self, c):
global event
# 堆栈已满,不能压栈
while self.pointer == len(self.data):
# 等待其它线程把数据出栈
event.wait()
# 通知其他线程把数据出栈
event.set()
# 数据压栈
self.data[self.pointer] = c
# 指针向上移动
self.pointer += 1

# 出栈方法
def pop(self):
global event
# 堆栈无数据,不能出栈
while self.pointer == 0:
# 等待其他线程把数据压栈
event.wait()
# 通知其他线程压栈
event.set()
# 指针向下移动
self.pointer -= 1
# 数据出栈
data = self.data[self.pointer]
return data


# 创建堆栈Stack对象
stack = Stack()


# 生产者线程体函数
def producer_thread_body():
global stack # 声明为全局变量
# 产生10个数字
for i in range(0, 10):
# 把数字压栈
stack.push(i)
# 打印数字
print('生产:{0}'.format(i))
# 每产生一个数字线程就睡眠
time.sleep(1)


# 消费者线程体函数
def consumer_thread_body():
global stack # 声明为全局变量
# 从堆栈中读取数字
for i in range(0, 10):
# 从堆栈中读取数字
x = stack.pop()
# 打印数字
print('消费:{0}'.format(x))
# 每消费一个数字线程就睡眠
time.sleep(1)


# 主函数
def main():
# 创建生产者线程对象producer
producer = threading.Thread(target=producer_thread_body)
# 启动生产者线程
producer.start()
# 创建消费者线程对象consumer
consumer = threading.Thread(target=consumer_thread_body)
# 启动消费者线程
consumer.start()


if __name__ == '__main__':
main()