python服务器模型

单进程服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import socket

server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)


address = ("", 8000)
server_sock.bind(address)
server_sock.listen(128)


while True:
print('-----主进程,,等待新客户端的到来------')
client_sock, client_addr = server_sock.accept()
print("客户端%s:%s进行了连接!" % client_addr)
while True:
recv_data = client_sock.recv(1024)
if len(recv_data) > 0:
print("接收到的数据为:", recv_data.decode())
else:
print('[%s]客户端已经关闭' % str(client_addr))
break
client_sock.close()
server_sock.close()
  • 同一时刻只能为一个客户进行服务,不能同时为多个客户服务
  • 类似于找一个“明星”签字一样,客户需要耐心等待才可以获取到服务
  • 当服务器为一个客户端服务时,而另外的客户端发起了connect,只要服务器listen的队列有空闲的位置,就会为这个新客户端进行连接,并且客户端可以发送数据,但当服务器为这个新客户端服务时,可能一次性把所有数据接收完毕
  • 当recv接收数据时,返回值为空,即没有返回数据,那么意味着客户端已经调用了close关闭了;因此服务器通过判断recv接收数据是否为空 来判断客户端是否已经下线

多进程服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

import socket
from multiprocessing import Process


def handle_client(client_sock, client_addr):
while True:
recv_data = client_sock.recv(1024)
if len(recv_data) > 0:
print('recv[%s]:%s' % (str(client_addr), recv_data.decode()))
else:
print('[%s]客户端已经关闭' % str(client_addr))
break

client_sock.close()


def main():
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ("", 8000)
server_sock.bind(address)
server_sock.listen(128)
while True:
print('-----主进程,,等待新客户端的到来------')
client_sock, client_addr = server_sock.accept()
print('-----主进程,,接下来创建一个新的进程负责数据处理[%s]-----' % str(client_addr))
process = Process(target=handle_client, args=(client_sock, client_addr))
process.start()
# 因为已经向子进程中copy了一份(引用),并且父进程中这个套接字也没有用处了
# 所以关闭
client_sock.close()
# 当为所有的客户端服务完之后再进行关闭,表示不再接收新的客户端的链接
server_sock.close()


if __name__ == '__main__':
main()
  • 通过为每个客户端创建一个进程的方式,能够同时为多个客户端进行服务
  • 当客户端不是特别多的时候,这种方式还行,如果有几百上千个,就不可取了,因为每次创建进程等过程需要好较大的资源

多线程服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import socket
from threading import Thread


def handle_client(client_sock, client_addr):
while True:
recv_data = client_sock.recv(1024)
if len(recv_data) > 0:
print('recv[%s]:%s' % (str(client_addr), recv_data.decode()))
else:
print('[%s]客户端已经关闭' % str(client_addr))
break

client_sock.close()


def main():
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ("", 8001)
server_sock.bind(address)
server_sock.listen(128)
while True:
print('-----主线程,,等待新客户端的到来------')
client_sock, client_addr = server_sock.accept()
print('-----主线程,,接下来创建一个新的线程负责数据处理[%s]-----' % str(client_addr))
thread = Thread(target=handle_client, args=(client_sock, client_addr))
thread.start()
# 因为线程中共享这个套接字,如果关闭了会导致这个套接字不可用,
# 但是此时在线程中这个套接字可能还在收数据,因此不能关闭
# client_sock.close()
server_sock.close()


if __name__ == '__main__':
main()

单进程服务器-非堵塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import socket

# 用来存储所有的新链接的客户端
client_list = []


def main():
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ("", 8000)
server_sock.bind(address)
server_sock.listen(128)

# 将套接字设置为非堵塞
# 设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会
# 产生一个异常,所以需要try来进行处理
server_sock.setblocking(False)

while True:

try:
client_info = server_sock.accept()
except BlockingIOError:
pass
else:
print("一个新的客户端到来%s:" % str(client_info))
# 将套接字设置为非堵塞
client_info[0].setblocking(False)
client_list.append(client_info)

# 用来存储需要删除的客户端信息
need_del_client_info_list = []

for client_socket, client_addr in client_list:
try:
recv_data = client_socket.recv(1024)
if len(recv_data) > 0:
print('recv[%s]:%s' % (str(client_addr), recv_data.decode()))
else:
print('[%s]客户端已经关闭' % str(client_addr))
client_socket.close()
need_del_client_info_list.append((client_socket, client_addr))
except BlockingIOError:
pass

for client in need_del_client_info_list:
client_list.remove(client)


if __name__ == '__main__':
main()

select版-TCP服务器

在多路复用的模型中,比较常用的有select模型和epoll模型。这两个都是系统接口,由操作系统提供。当然,Python的select模块进行了更高级的封装。

网络通信被Unix系统抽象为文件的读写,通常是一个设备,由设备驱动程序提供,驱动可以知道自身的数据是否可用。支持阻塞操作的设备驱动通常会实现一组自身的等待队列,如读/写等待队列用于支持上层(用户层)所需的block或non-block操作。设备的文件的资源如果可用(可读或者可写)则会通知进程,反之则会让进程睡眠,等到数据到来可用的时候,再唤醒进程。

这些设备的文件描述符被放在一个数组中,然后select调用的时候遍历这个数组,如果对于的文件描述符可读则会返回改文件描述符。当遍历结束之后,如果仍然没有一个可用设备文件描述符,select让用户进程则会睡眠,直到等待资源可用的时候在唤醒,遍历之前那个监视的数组。每次遍历都是依次进行判断的。

select 回显服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import select
import socket
import sys

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', 7788))
server.listen(5)

inputs = [server, sys.stdin]

running = True

while True:
# 调用 select 函数,阻塞等待
readable, writeable, exceptional = select.select(inputs, [], [])

# 数据抵达,循环
for sock in readable:
# 监听到有新的连接
if sock == server:
conn, addr = server.accept()
# select 监听的socket
inputs.append(conn)
# 监听到键盘有输入
elif sock == sys.stdin:
cmd = sys.stdin.readline()
running = False
break
# 有数据到达
else:
# 读取客户端连接发送的数据
data = sock.recv(1024)
if data:
sock.send(data)
else:
# 移除select监听的socket
inputs.remove(sock)
sock.close()
# 如果检测到用户输入敲击键盘,那么就退出
if not running:
break
server.close()

另外一个服务器(包含writeList):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import socket
import select
import Queue

SERVER_IP = ('', 9999)

# 保存客户端发送过来的消息,将消息放入队列中
message_queue = {}
input_list = []
output_list = []

if __name__ == '__main__':
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(SERVER_IP)
server.listen(8)
# 设置为非阻塞
server.setblocking(False)

# 初始化将服务端加入监听列表
input_list.append(server)

while True:
# 开始 select 监听,对input_list中的服务端server进行监听
stdinput, stdoutput, stderr = select(input_list, output_list, input_list)

for obj in stdinput:
# 判断当前触发的是不是服务端对象, 当触发的对象是服务端对象时,说明有新客户端连接进来了
if obj == server:
# 接收客户端的连接, 获取客户端对象和客户端地址信息
conn, addr = server.accept()
print("Client %s connected! " % str(addr))
# 将客户端对象也加入到监听的列表中, 当客户端发送消息时 select 将触发
input_list.append(conn)
# 为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息
message_queue[conn] = Queue.Queue()
else:
# 由于客户端连接进来时服务端接收客户端连接请求,将客户端加入到了监听列表中(input_list),客户端发送消息将触发
# 所以判断是否是客户端对象触发
try:
recv_data = obj.recv(1024)
# 客户端未断开
if recv_data:
print("received %s from client %s" % (recv_data, str(addr)))
# 将收到的消息放入到各客户端的消息队列中
message_queue[obj].put(recv_data)

# 将回复操作放到output列表中,让select监听
if obj not in output_list:
output_list.append(obj)
except ConnectionResetError:
# 客户端断开连接了,将客户端的监听从input列表中移除
input_list.remove(obj)
# 移除客户端对象的消息队列
del message_queue[obj]
print("\n[input] Client %s disconnected" % str(addr))
# 如果现在没有客户端请求,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息
for sendobj in output_list:
try:
# 如果消息队列中有消息,从消息队列中获取要发送的消息
if not message_queue[sendobj].empty():
# 从该客户端对象的消息队列中获取要发送的消息
send_data = message_queue[sendobj].get()
sendobj.send(send_data)
else:
# 将监听移除等待下一次客户端发送消息
output_list.remove(sendobj)

except ConnectionResetError:
# 客户端连接断开了
del message_queue[sendobj]
output_list.remove(sendobj)
print("\n[output] Client %s disconnected" % str(addr))

总结

优点:select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
缺点:

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.

对socket进行扫描时是依次扫描的,即采用轮询的方法,效率较低。

当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。

epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import socket
import select

# 创建套接字
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

# 设置可以重复使用绑定的信息
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

# 绑定本机信息
s.bind(("",7788))

# 变为被动
s.listen(10)

# 创建一个epoll对象
epoll=select.epoll()

# 测试,用来打印套接字对应的文件描述符
# print s.fileno()
# print select.EPOLLIN|select.EPOLLET

# 注册事件到epoll中
# epoll.register(fd[, eventmask])
# 注意,如果fd已经注册过,则会发生异常
# 将创建的套接字添加到epoll的事件监听中
epoll.register(s.fileno(),select.EPOLLIN|select.EPOLLET)


connections = {}
addresses = {}

# 循环等待客户端的到来或者对方发送数据
while True:

# epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
epoll_list=epoll.poll()

# 对事件进行判断
for fd,events in epoll_list:

# print fd
# print events

# 如果是socket创建的套接字被激活
if fd == s.fileno():
conn,addr=s.accept()

print('有新的客户端到来%s'%str(addr))

# 将 conn 和 addr 信息分别保存起来
connections[conn.fileno()] = conn
addresses[conn.fileno()] = addr

# 向 epoll 中注册 连接 socket 的 可读 事件
epoll.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)


elif events == select.EPOLLIN:
# 从激活 fd 上接收
recvData = connections[fd].recv(1024)

if len(recvData)>0:
print('recv:%s'%recvData)
else:
# 从 epoll 中移除该 连接 fd
epoll.unregister(fd)

# server 侧主动关闭该 连接 fd
connections[fd].close()

print("%s---offline---"%str(addresses[fd]))

EPOLLIN (可读)
EPOLLOUT (可写)
EPOLLET (ET模式)
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。

ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。