博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】...
阅读量:5905 次
发布时间:2019-06-19

本文共 6390 字,大约阅读时间需要 21 分钟。

下面这篇,原理理解了,

再结合 这一周来的心得体会,整个框架就差不多了。。。

http://www.haiyun.me/archives/1056.html

 

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。

下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/python
# -*- coding: utf-8 -*-
import
select
import
socket
import
Queue
  
server
=
socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(
False
)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  ,
1
)
server_address
=
(
'192.168.1.5'
,
8080
)
server.bind(server_address)
server.listen(
10
)
  
#select轮询等待读socket集合
inputs
=
[server]
#select轮询等待写socket集合
outputs
=
[]
message_queues
=
{}
#select超时时间
timeout
=
20
  
while
True
:
    
print
"等待活动连接......"
    
readable , writable , exceptional
=
select.select(inputs, outputs, inputs, timeout)
  
    
if
not
(readable
or
writable
or
exceptional) :
        
print
"select超时无活动连接,重新select...... "
        
continue
;  
    
#循环可读事件
    
for
s
in
readable :
        
#如果是server监听的socket
        
if
s
is
server:
            
#同意连接
            
connection, client_address
=
s.accept()
            
print
"新连接: "
, client_address
            
connection.setblocking(
0
)
            
#将连接加入到select可读事件队列
            
inputs.append(connection)
            
#新建连接为key的字典,写回读取到的消息
            
message_queues[connection]
=
Queue.Queue()
        
else
:
            
#不是本机监听就是客户端发来的消息
            
data
=
s.recv(
1024
)
            
if
data :
                
print
"收到数据:"
, data ,
"客户端:"
,s.getpeername()
                
message_queues[s].put(data)
                
if
s
not
in
outputs:
                    
#将读取到的socket加入到可写事件队列
                    
outputs.append(s)
            
else
:
                
#空白消息,关闭连接
                
print
"关闭连接:"
, client_address
                
if
s
in
outputs :
                    
outputs.remove(s)
                
inputs.remove(s)
                
s.close()
                
del
message_queues[s]
    
for
s
in
writable:
        
try
:
            
msg
=
message_queues[s].get_nowait()
        
except
Queue.Empty:
            
print
"连接:"
, s.getpeername() ,
'消息队列为空'
            
outputs.remove(s)
        
else
:
            
print
"发送数据:"
, msg ,
"到"
, s.getpeername()
            
s.send(msg)
      
    
for
s
in
exceptional:
        
print
"异常连接:"
, s.getpeername()
        
inputs.remove(s)
        
if
s
in
outputs:
            
outputs.remove(s)
        
s.close()
        
del
message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/python
# -*- coding: utf-8 -*-
import
socket
import
select
import
Queue
  
server
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(
False
)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
1
)
server_address
=
(
"192.168.1.5"
,
8080
)
server.bind(server_address)
server.listen(
5
)
print 
"服务器启动成功,监听IP:"
, server_address
message_queues
=
{}
#超时,毫秒
timeout
=
5000
#监听哪些事件
READ_ONLY
=
( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE
=
(READ_ONLY|select.POLLOUT)
#新建轮询事件对象
poller
=
select.poll()
#注册本机监听socket到等待可读事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket
=
{server.fileno():server,}
while
True
:
    
print
"等待活动连接......"
    
#轮询注册的事件集合
    
events
=
poller.poll(timeout)
    
if
not
events:
      
print
"poll超时,无活动连接,重新poll......"
      
continue
    
print
"有"
,
len
(events),
"个新事件,开始处理......"
    
for
fd ,flag
in
events:
        
s
=
fd_to_socket[fd]
        
#可读事件
        
if
flag & (select.POLLIN | select.POLLPRI) :
            
if
s
is
server :
                
#如果socket是监听的server代表有新连接
                
connection , client_address
=
s.accept()
                
print
"新连接:"
, client_address
                
connection.setblocking(
False
)
                  
                
fd_to_socket[connection.fileno()]
=
connection
                
#加入到等待读事件集合
                
poller.register(connection,READ_ONLY)
                
message_queues[connection] 
=
Queue.Queue()
            
else
:
                
#接收客户端发送的数据
                
data
=
s.recv(
1024
)
                
if
data:
                    
print
"收到数据:"
, data ,
"客户端:"
, s.getpeername()
                    
message_queues[s].put(data)
                    
#修改读取到消息的连接到等待写事件集合
                    
poller.modify(s,READ_WRITE)
                
else
:
                    
# Close the connection
                    
print
"  closing"
, s.getpeername()
                    
# Stop listening for input on the connection
                    
poller.unregister(s)
                    
s.close()
                    
del
message_queues[s]
        
#连接关闭事件
        
elif
flag & select.POLLHUP :
            
print
" Closing "
, s.getpeername() ,
"(HUP)"
            
poller.unregister(s)
            
s.close()
        
#可写事件
        
elif
flag & select.POLLOUT :
            
try
:
                
msg
=
message_queues[s].get_nowait()
            
except
Queue.Empty:
                
print
s.getpeername() ,
" queue empty"
                
poller.modify(s,READ_ONLY)
            
else
:
                
print
"发送数据:"
, data ,
"客户端:"
, s.getpeername()
                
s.send(msg)
        
#异常事件
        
elif
flag & select.POLLERR:
            
print
"  exception on"
, s.getpeername()
            
poller.unregister(s)
            
s.close()
            
del
message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/python
# -*- coding: utf-8 -*-
import
socket, select
import
Queue
 
serversocket
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
1
)
server_address
=
(
"192.168.1.5"
,
8080
)
serversocket.bind(server_address)
serversocket.listen(
1
)
print 
"服务器启动成功,监听IP:"
, server_address
serversocket.setblocking(
0
)
timeout
=
10
#新建epoll事件对象,后续要监控的事件添加到其中
epoll
=
select.epoll()
#添加服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues
=
{}
 
fd_to_socket
=
{serversocket.fileno():serversocket,}
while
True
:
  
print
"等待活动连接......"
  
#轮询注册的事件集合
  
events
=
epoll.poll(timeout)
  
if
not
events:
     
print
"epoll超时无活动连接,重新轮询......"
     
continue
  
print
"有"
,
len
(events),
"个新事件,开始处理......"
  
for
fd, event
in
events:
     
socket
=
fd_to_socket[fd]
     
#可读事件
     
if
event & select.EPOLLIN:
         
#如果活动socket为服务器所监听,有新连接
         
if
socket
=
=
serversocket:
            
connection, address
=
serversocket.accept()
            
print
"新连接:"
, address
            
connection.setblocking(
0
)
            
#注册新连接fd到待读事件集合
            
epoll.register(connection.fileno(), select.EPOLLIN)
            
fd_to_socket[connection.fileno()]
=
connection
            
message_queues[connection] 
=
Queue.Queue()
         
#否则为客户端发送的数据
         
else
:
            
data
=
socket.recv(
1024
)
            
if
data:
               
print
"收到数据:"
, data ,
"客户端:"
, socket.getpeername()
               
message_queues[socket].put(data)
               
#修改读取到消息的连接到等待写事件集合
               
epoll.modify(fd, select.EPOLLOUT)
     
#可写事件
     
elif
event & select.EPOLLOUT:
        
try
:
           
msg
=
message_queues[socket].get_nowait()
        
except
Queue.Empty:
           
print
socket.getpeername() ,
" queue empty"
           
epoll.modify(fd, select.EPOLLIN)
        
else
:
           
print
"发送数据:"
, data ,
"客户端:"
, socket.getpeername()
           
socket.send(msg)
     
#关闭事件
     
elif
event & select.EPOLLHUP:
        
epoll.unregister(fd)
        
fd_to_socket[fd].close()
        
del
fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

转载地址:http://gzcpx.baihongyu.com/

你可能感兴趣的文章
使用 PowerShell 创建和修改 ExpressRoute 线路
查看>>
在C#中获取如PHP函数time()一样的时间戳
查看>>
Redis List数据类型
查看>>
大数据项目实践(四)——之Hive配置
查看>>
初学vue2.0-组件-文档理解笔记v1.0
查看>>
Centos7安装Gitlab10.0
查看>>
上传图片预览
查看>>
lagp,lacp详解
查看>>
LVS之DR模式原理与实践
查看>>
Docker的系统资源限制及验证
查看>>
c++ ios_base register_callback方法使用
查看>>
Java中为什么需要Object类,Object类为什么是所有类的父类
查看>>
angularjs-paste-upload
查看>>
linux基础命令 head
查看>>
objective c:import和include的区别, ""和<>区别
查看>>
The Shared folder with you
查看>>
sax方式解析XML学习笔记
查看>>
Springboot配置(上)
查看>>
java--Eclipse for mac 代码提示(代码助手,代码联想)快捷键修改
查看>>
left join on/right join on/inner join on/full join on连接
查看>>