单进程服务器
1. 完成一个简单的TCP服务器
from socket import *serSocket = socket(AF_INET, SOCK_STREAM)# 重复使用绑定的信息serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1)localAddr = ('', 7788)serSocket.bind(localAddr)serSocket.listen(5)while True: print('-----主进程,,等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程,,接下来负责数据处理[%s]-----'%str(destAddr)) try: while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break finally: newSocket.close()serSocket.close()
2. 总结
- 同一时刻只能为一个客户进行服务,不能同时为多个客户服务
- 类似于找一个“明星”签字一样,客户需要耐心等待才可以获取到服务
当服务器为一个客户端服务时,而另外的客户端发起了connect,只要服务器listen的队列有空闲的位置,就会为这个新客户端进行连接,并且客户端可以发送数据,但当服务器为这个新客户端服务时,可能一次性把所有数据接收完毕
- 当recv接收数据时,返回值为空,即没有返回数据,那么意味着客户端已经调用了close关闭了;因此服务器通过判断recv接收数据是否为空 来判断客户端是否已经下线
多进程服务器
1. 多进程服务器
from socket import *from multiprocessing import *from time import sleep# 处理客户端的请求并为其服务def dealWithClient(newSocket,destAddr): while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break newSocket.close()def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) serSocket.listen(5) try: while True: print('-----主进程,,等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程,,接下来创建一个新的进程负责数据处理[%s]-----'%str(destAddr)) client = Process(target=dealWithClient, args=(newSocket,destAddr)) client.start() #因为已经向子进程中copy了一份(引用),并且父进程中这个套接字也没有用处了 #所以关闭 newSocket.close() finally: #当为所有的客户端服务完之后再进行关闭,表示不再接收新的客户端的链接 serSocket.close()if __name__ == '__main__': main()
2. 总结
- 通过为每个客户端创建一个进程的方式,能够同时为多个客户端进行服务
- 当客户端不是特别多的时候,这种方式还行,如果有几百上千个,就不可取了,因为每次创建进程等过程需要好较大的资源
多线程服务器
#coding=utf-8from socket import *from threading import Threadfrom time import sleep# 处理客户端的请求并执行事情def dealWithClient(newSocket,destAddr): while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break newSocket.close()def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) serSocket.listen(5) try: while True: print('-----主进程,,等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程,,接下来创建一个新的进程负责数据处理[%s]-----'%str(destAddr)) client = Thread(target=dealWithClient, args=(newSocket,destAddr)) client.start() #因为线程中共享这个套接字,如果关闭了会导致这个套接字不可用, #但是此时在线程中这个套接字可能还在收数据,因此不能关闭 #newSocket.close() finally: serSocket.close()if __name__ == '__main__': main()
单进程服务器-非堵塞模式
服务器
#coding=utf-8from socket import *import time# 用来存储所有的新链接的socketg_socketList = []def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) #可以适当修改listen中的值来看看不同的现象 serSocket.listen(1000) #将套接字设置为非堵塞 #设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会 #产生一个异常,所以需要try来进行处理 serSocket.setblocking(False) while True: #用来测试 #time.sleep(0.5) try: newClientInfo = serSocket.accept() except Exception as result: pass else: print("一个新的客户端到来:%s"%str(newClientInfo)) newClientInfo[0].setblocking(False) g_socketList.append(newClientInfo) # 用来存储需要删除的客户端信息 needDelClientInfoList = [] for clientSocket,clientAddr in g_socketList: try: recvData = clientSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(clientAddr), recvData)) else: print('[%s]客户端已经关闭'%str(clientAddr)) clientSocket.close() g_needDelClientInfoList.append((clientSocket,clientAddr)) except Exception as result: pass for needDelClientInfo in needDelClientInfoList: g_socketList.remove(needDelClientInfo)if __name__ == '__main__': main()
#coding=utf-8from socket import *import time# 用来存储所有的新链接的socketg_socketList = []def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) #可以适当修改listen中的值来看看不同的现象 serSocket.listen(1000) #将套接字设置为非堵塞 #设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会 #产生一个异常,所以需要try来进行处理 serSocket.setblocking(False) while True: #用来测试 #time.sleep(0.5) try: newClientInfo = serSocket.accept() except Exception as result: pass else: print("一个新的客户端到来:%s"%str(newClientInfo)) newClientInfo[0].setblocking(False) g_socketList.append(newClientInfo) # 用来存储需要删除的客户端信息 needDelClientInfoList = [] for clientSocket,clientAddr in g_socketList: try: recvData = clientSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(clientAddr), recvData)) else: print('[%s]客户端已经关闭'%str(clientAddr)) clientSocket.close() g_needDelClientInfoList.append((clientSocket,clientAddr)) except Exception as result: pass for needDelClientInfo in needDelClientInfoList: g_socketList.remove(needDelClientInfo)if __name__ == '__main__': main()
客户端
#coding=utf-8from socket import *import randomimport timeserverIp = raw_input("请输入服务器的ip:")connNum = raw_input("请输入要链接服务器的次数(例如1000):")g_socketList = []for i in range(int(connNum)): s = socket(AF_INET, SOCK_STREAM) s.connect((serverIp, 7788)) g_socketList.append(s) print(i)while True: for s in g_socketList: s.send(str(random.randint(0,100))) # 用来测试用 #time.sleep(1)
select版-TCP服务器
1. select 原理
在多路复用的模型中,比较常用的有select模型和epoll模型。这两个都是系统接口,由操作系统提供。当然,Python的select模块进行了更高级的封装。
网络通信被Unix系统抽象为文件的读写,通常是一个设备,由设备驱动程序提供,驱动可以知道自身的数据是否可用。支持阻塞操作的设备驱动通常会实现一组自身的等待队列,如读/写等待队列用于支持上层(用户层)所需的block或non-block操作。设备的文件的资源如果可用(可读或者可写)则会通知进程,反之则会让进程睡眠,等到数据到来可用的时候,再唤醒进程。
这些设备的文件描述符被放在一个数组中,然后select调用的时候遍历这个数组,如果对于的文件描述符可读则会返回改文件描述符。当遍历结束之后,如果仍然没有一个可用设备文件描述符,select让用户进程则会睡眠,直到等待资源可用的时候在唤醒,遍历之前那个监视的数组。每次遍历都是依次进行判断的。
2. select 回显服务器
使用python的select模块很容易写出下面一个echo(回显)服务器:
import selectimport socketimport sysserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind(('', 7788))server.listen(5)inputs = [server, sys.stdin]running = Truewhile True: # 调用 select 函数,阻塞等待 readable, writeable, exceptional = select.select(inputs, [], []) # 数据抵达,循环 for sock in readable: # 监听到有新的连接 if sock == server: conn, addr = server.accept() # select 监听的socket inputs.append(conn) # 监听到键盘有输入 elif sock == sys.stdin: cmd = sys.stdin.readline() running = False break # 有数据到达 else: # 读取客户端连接发送的数据 data = sock.recv(1024) if data: sock.send(data) else: # 移除select监听的socket inputs.remove(sock) sock.close() # 如果检测到用户输入敲击键盘,那么就退出 if not running: breakserver.close()
在windows中,使用‘网络调试助手’,进行连接服务器即可测试
另外一个服务器(包含writeList):
#coding=utf-8import socket import Queuefrom select import select SERVER_IP = ('', 9999) # 保存客户端发送过来的消息,将消息放入队列中 message_queue = {} input_list = [] output_list = [] if __name__ == "__main__": server = socket.socket() server.bind(SERVER_IP) server.listen(10) # 设置为非阻塞 server.setblocking(False) # 初始化将服务端加入监听列表 input_list.append(server) while True: # 开始 select 监听,对input_list中的服务端server进行监听 stdinput, stdoutput, stderr = select(input_list, output_list, input_list) # 循环判断是否有客户端连接进来,当有客户端连接进来时select将触发 for obj in stdinput: # 判断当前触发的是不是服务端对象, 当触发的对象是服务端对象时,说明有新客户端连接进来了 if obj == server: # 接收客户端的连接, 获取客户端对象和客户端地址信息 conn, addr = server.accept() print("Client %s connected! "%str(addr)) # 将客户端对象也加入到监听的列表中, 当客户端发送消息时 select 将触发 input_list.append(conn) # 为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息 message_queue[conn] = Queue.Queue() else: # 由于客户端连接进来时服务端接收客户端连接请求,将客户端加入到了监听列表中(input_list),客户端发送消息将触发 # 所以判断是否是客户端对象触发 try: recv_data = obj.recv(1024) # 客户端未断开 if recv_data: print("received %s from client %s"%(recv_data, str(addr))) # 将收到的消息放入到各客户端的消息队列中 message_queue[obj].put(recv_data) # 将回复操作放到output列表中,让select监听 if obj not in output_list: output_list.append(obj) except ConnectionResetError: # 客户端断开连接了,将客户端的监听从input列表中移除 input_list.remove(obj) # 移除客户端对象的消息队列 del message_queue[obj] print("\n[input] Client %s disconnected"%str(addr)) # 如果现在没有客户端请求,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息 for sendobj in output_list: try: # 如果消息队列中有消息,从消息队列中获取要发送的消息 if not message_queue[sendobj].empty(): # 从该客户端对象的消息队列中获取要发送的消息 send_data = message_queue[sendobj].get() sendobj.send(send_data) else: # 将监听移除等待下一次客户端发送消息 output_list.remove(sendobj) except ConnectionResetError: # 客户端连接断开了 del message_queue[sendobj] output_list.remove(sendobj) print("\n[output] Client %s disconnected"%str(addr))
3. 总结
优点
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
缺点
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.
对socket进行扫描时是依次扫描的,即采用轮询的方法,效率较低。
当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。
epoll版-TCP服务器
1. epoll的优点:
- 没有最大并发连接的限制,能打开的FD(指的是文件描述符,通俗的理解就是套接字对应的数字编号)的上限远大于1024
- 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,epoll的效率就会远远高于select和poll。
2. epoll使用参考代码
import socketimport select# 创建套接字s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 设置可以重复使用绑定的信息s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)# 绑定本机信息s.bind(("",7788))# 变为被动s.listen(10)# 创建一个epoll对象epoll=select.epoll()# 测试,用来打印套接字对应的文件描述符# print s.fileno()# print select.EPOLLIN|select.EPOLLET# 注册事件到epoll中# epoll.register(fd[, eventmask])# 注意,如果fd已经注册过,则会发生异常# 将创建的套接字添加到epoll的事件监听中epoll.register(s.fileno(),select.EPOLLIN|select.EPOLLET)connections = {}addresses = {}# 循环等待客户端的到来或者对方发送数据while True: # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 epoll_list=epoll.poll() # 对事件进行判断 for fd,events in epoll_list: # print fd # print events # 如果是socket创建的套接字被激活 if fd == s.fileno(): conn,addr=s.accept() print('有新的客户端到来%s'%str(addr)) # 将 conn 和 addr 信息分别保存起来 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr # 向 epoll 中注册 连接 socket 的 可读 事件 epoll.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) elif events == select.EPOLLIN: # 从激活 fd 上接收 recvData = connections[fd].recv(1024) if len(recvData)>0: print('recv:%s'%recvData) else: # 从 epoll 中移除该 连接 fd epoll.unregister(fd) # server 侧主动关闭该 连接 fd connections[fd].close() print("%s---offline---"%str(addresses[fd]))
2. 说明
- EPOLLIN (可读)
- EPOLLOUT (可写)
- EPOLLET (ET模式)
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。
协程
协程,又称微线程,纤程。英文名Coroutine。
协程是啥
首先我们得知道协程是啥?协程其实可以认为是比线程更小的执行单元。 为啥说他是一个执行单元,因为他自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。
通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定
协程和线程差异
那么这个过程看起来比线程差不多。其实不然, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。
协程的问题
但是协程有一个问题,就是系统并不感知,所以操作系统不会帮你做切换。 那么谁来帮你做切换?让需要执行的协程更多的获得CPU时间才是问题的关键。
例子
目前的协程框架一般都是设计成 1:N 模式。所谓 1:N 就是一个线程作为一个容器里面放置多个协程。 那么谁来适时的切换这些协程?答案是有协程自己主动让出CPU,也就是每个协程池里面有一个调度器, 这个调度器是被动调度的。意思就是他不会主动调度。而且当一个协程发现自己执行不下去了(比如异步等待网络的数据回来,但是当前还没有数据到), 这个时候就可以由这个协程通知调度器,这个时候执行到调度器的代码,调度器根据事先设计好的调度算法找到当前最需要CPU的协程。 切换这个协程的CPU上下文把CPU的运行权交个这个协程,直到这个协程出现执行不下去需要等等的情况,或者它调用主动让出CPU的API之类,触发下一次调度。
那么这个实现有没有问题?
其实是有问题的,假设这个线程中有一个协程是CPU密集型的他没有IO操作, 也就是自己不会主动触发调度器调度的过程,那么就会出现其他协程得不到执行的情况, 所以这种情况下需要程序员自己避免。这是一个问题,假设业务开发的人员并不懂这个原理的话就可能会出现问题。
协程的好处
在IO密集型的程序中由于IO操作远远慢于CPU的操作,所以往往需要CPU去等IO操作。 同步IO下系统需要切换线程,让操作系统可以在IO过程中执行其他的东西。 这样虽然代码是符合人类的思维习惯但是由于大量的线程切换带来了大量的性能的浪费,尤其是IO密集型的程序。
所以人们发明了异步IO。就是当数据到达的时候触发我的回调。来减少线程切换带来性能损失。 但是这样的坏处也是很大的,主要的坏处就是操作被 “分片” 了,代码写的不是 “一气呵成” 这种。 而是每次来段数据就要判断 数据够不够处理哇,够处理就处理吧,不够处理就在等等吧。这样代码的可读性很低,其实也不符合人类的习惯。
但是协程可以很好解决这个问题。比如 把一个IO操作 写成一个协程。当触发IO操作的时候就自动让出CPU给其他协程。要知道协程的切换很轻的。 协程通过这种对异步IO的封装 既保留了性能也保证了代码的容易编写和可读性。在高IO密集型的程序下很好。但是高CPU密集型的程序下没啥好处。
协程一个简单实现
import timedef A(): while True: print("----A---") yield time.sleep(0.5)def B(c): while True: print("----B---") c.next() time.sleep(0.5)if __name__=='__main__': a = A() B(a)运行结果:--B----A----B----A----B----A----B----A----B----A----B----A--...省略...
协程-greenlet版
为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单
安装方式
使用如下命令安装greenlet模块:
sudo pip install greenlet
#coding=utf-8from greenlet import greenletimport timedef test1(): while True: print "---A--" gr2.switch() time.sleep(0.5)def test2(): while True: print "---B--" gr1.switch() time.sleep(0.5)gr1 = greenlet(test1)gr2 = greenlet(test2)#切换到gr1中运行gr1.switch()
运行效果
---A-----B-----A-----B-----A-----B-----A-----B--...省略...
gevent
greenlet已经实现了协程,但是这个还的人工切换,是不是觉得太麻烦了,不要捉急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent
其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO
1. gevent的使用
#coding=utf-8#请使用python 2 来执行此程序import geventdef f(n): for i in range(n): print gevent.getcurrent(), ig1 = gevent.spawn(f, 5)g2 = gevent.spawn(f, 5)g3 = gevent.spawn(f, 5)g1.join()g2.join()g3.join()
运行结果
0 1 2 3 4 0 1 2 3 4 0 1 2 3 4
可以看到,3个greenlet是依次运行而不是交替运行
2. gevent切换执行
import geventdef f(n): for i in range(n): print gevent.getcurrent(), i #用来模拟一个耗时操作,注意不是time模块中的sleep gevent.sleep(1)g1 = gevent.spawn(f, 5)g2 = gevent.spawn(f, 5)g3 = gevent.spawn(f, 5)g1.join()g2.join()g3.join()
运行结果
0 0 0 1 1 1 2 2 2 3 3 3 4 4 4
3个greenlet交替运行
3. gevent并发下载器
当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下
#coding=utf-8from gevent import monkey; import geventimport urllib2#有IO才做时需要这一句monkey.patch_all()def myDownLoad(url): print('GET: %s' % url) resp = urllib2.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url))gevent.joinall([ gevent.spawn(myDownLoad, 'http://www.baidu.com/'), gevent.spawn(myDownLoad, 'http://www.itcast.cn/'), gevent.spawn(myDownLoad, 'http://www.itheima.com/'),])
运行结果
GET: http://www.baidu.com/GET: http://www.itcast.cn/GET: http://www.itheima.com/102247 bytes received from http://www.baidu.com/.166903 bytes received from http://www.itheima.com/.162294 bytes received from http://www.itcast.cn/.
从上能够看到是先发送的获取baidu的相关信息,然后依次是itcast、itheima,但是收到数据的先后顺序不一定与发送顺序相同,这也就体现出了异步,即不确定什么时候会收到数据,顺序不一定
gevent版-TCP服务器
import sysimport timeimport geventfrom gevent import socket,monkeymonkey.patch_all()def handle_request(conn): while True: data = conn.recv(1024) if not data: conn.close() break print("recv:", data) conn.send(data)def server(port): s = socket.socket() s.bind(('', port)) s.listen(5) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli)if __name__ == '__main__': server(7788)