03Linux网络编程基础 ---- IO复用

2021/4/18 7:25:40

本文主要是介绍03Linux网络编程基础 ---- IO复用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

select系统调用

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

nfds:是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1

readfds:对应可读的文件符集合,是我们关心的,是否可以从这些文件中读取数据的集合,若有大于等于一个可读文件,则select会返回大于0的值。若无,则根据timeout判断。

writefds: 对应可写的文件符集合。

exceptfds:对应异常的文件符集合。

 fd_set结构如下:可以看出容量是有限的,最大1024,一般通过以下来操作

  • FD_CLR(inr fd,fd_set* set):用来清除文件描述符集合set中相关fd的位
  • FD_ISSET(int fd,fd_set *set):用来测试文件描述符集合set中相关fd的位是否为真
  • FD_SET(int fd,fd_set*set):用来设置文件描述符集合set中相关fd的位
  • FD_ZERO(fd_set *set):用来清除文件描述符集合set的全部位
#define __FD_SETSIZE 1024
/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif

timeout:超时时间,主要分为三种

  1. NULL:永远等待下去,仅在有一个描述字准备好I/O时才返回;
  2. 0:立即返回,仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生;
  3. 特定的时间值: 如果在指定的时间段里没有事件发生,select将超时返回;
struct timeval
{
__time_t tv_sec;        /* Seconds. */
__suseconds_t tv_usec;  /* Microseconds. */
};

return: 有三种情况

  1. 返回0表示超时了;
  2. 返回-1,表示出错了;
  3. 返回一个大于0的数,表示文件描述符状态改变的个数;

demo:

 1 #include <sys/types.h>
 2 #include <sys/socket.h>
 3 #include <netinet/in.h>
 4 #include <arpa/inet.h>
 5 #include <assert.h>
 6 #include <stdio.h>
 7 #include <unistd.h>
 8 #include <errno.h>
 9 #include <string.h>
10 #include <fcntl.h>
11 #include <stdlib.h>
12 
13 int main( int argc, char* argv[] )
14 {
15     if( argc <= 2 )
16     {
17         printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
18         return 1;
19     }
20     const char* ip = argv[1];
21     int port = atoi( argv[2] );
22     printf( "ip is %s and port is %d\n", ip, port );
23 
24     int ret = 0;
25     struct sockaddr_in address;
26     bzero( &address, sizeof( address ) );
27     address.sin_family = AF_INET;
28     inet_pton( AF_INET, ip, &address.sin_addr );
29     address.sin_port = htons( port );
30 
31     int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
32     assert( listenfd >= 0 );
33 
34     ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
35     assert( ret != -1 );
36 
37     ret = listen( listenfd, 5 );
38     assert( ret != -1 );
39 
40     struct sockaddr_in client_address;
41         socklen_t client_addrlength = sizeof( client_address );
42     int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
43     if ( connfd < 0 )
44     {
45         printf( "errno is: %d\n", errno );
46         close( listenfd );
47     }
48 
49     char remote_addr[INET_ADDRSTRLEN];
50     printf( "connected with ip: %s and port: %d\n", inet_ntop( AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN ), ntohs( client_address.sin_port ) );
51 
52     char buf[1024];
53     fd_set read_fds;
54     fd_set exception_fds;
55 
56     FD_ZERO( &read_fds );
57     FD_ZERO( &exception_fds );
58 
59     int nReuseAddr = 1;
60     setsockopt( connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof( nReuseAddr ) );
61     while( 1 )
62     {
63         memset( buf, '\0', sizeof( buf ) );
64         FD_SET( connfd, &read_fds );
65         FD_SET( connfd, &exception_fds );
66 
67         ret = select( connfd + 1, &read_fds, NULL, &exception_fds, NULL );
68         printf( "select one\n" );
69         if ( ret < 0 )
70         {
71                 printf( "selection failure\n" );
72                 break;
73         }
74 
75         if ( FD_ISSET( connfd, &read_fds ) )
76         {
77             ret = recv( connfd, buf, sizeof( buf )-1, 0 );
78             if( ret <= 0 )
79             {
80                 break;
81             }
82             printf( "get %d bytes of normal data: %s\n", ret, buf );
83         }
84         else if( FD_ISSET( connfd, &exception_fds ) )
85         {
86             ret = recv( connfd, buf, sizeof( buf )-1, MSG_OOB );
87             if( ret <= 0 )
88             {
89                 break;
90             }
91             printf( "get %d bytes of oob data: %s\n", ret, buf );
92         }
93 
94     }
95 
96     close( connfd );
97     close( listenfd );
98     return 0;
99 }
View Code

out:

ip is 127.0.0.1 and port is 1233
connected with ip: 127.0.0.1 and port: 33524
select one
get 13 bytes of normal data: 12345678901

poll系统调用

# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);

fds:需要被监视的文件描述符集合;
nfds:被监视的文件描述符数量;
timeout:超时时间,有三种取值:

  1. 负数:无限超时,一直等到一个指定事件发生;
  2. 0:立即返回,并列出准备好的文件描述符;
  3. 正数:等待指定的时间,单位为毫秒;

poll函数与select函数的最大不同之处在于:select函数有最大文件描述符的限制,一般1024个,而poll函数对文件描述符的数量没有限制。但select和poll函数都是通过轮询的方式来查询某个文件描述符状态是否发生了变化,并且需要将整个文件描述符集合在用户空间和内核空间之间来回拷贝,这样随着文件描述符的数量增加,相应的开销也随之增加。

struct pollfd :

struct pollfd {
    int fd;         /* 文件描述符 */
    short events;   /* 等待的事件 */
    short revents;  /* 实际发生了的事件 */
} ;

fd:文件描述符

events:告诉poll监听fd上的哪些事件,它是事件的按位或。

  • POLLIN:有数据可读。
  • POLLRDNORM:有普通数据可读。
  • POLLRDBAND:有优先数据可读。
  • POLLPRI:有紧迫数据可读。
  • POLLOUT:写数据不会导致阻塞。
  • POLLWRNORM:写普通数据不会导致阻塞。
  • POLLWRBAND:写优先数据不会导致阻塞。
  • POLLMSGSIGPOLL:消息可用。

revents:由内核修改,通知应用程序fd上实际发生了哪些事件。除了event这些,还包括以下:

  • POLLER:指定的文件描述符发生错误。
  • POLLHUP:指定的文件描述符挂起事件。
  • POLLNVAL:指定的文件描述符非法。

epoll系统调用

epoll操作是包含有三个接口的:

epoll_create函数:

#include <sys/epoll.h>
int epoll_create(int size);

size:表示监听的数目,并不起作用,只是给内核一个提示。

return:返回一个epoll句柄。

epoll_ctl函数:

#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epfd:epoll_create()的返回值;

op:动作,有三种取值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL:从epfd中删除一个fd;

fd:需要监听的fd;

event: 告诉内核需要监听什么事件,取值有:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列;
typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据

struct epoll_event {
    __uint32_t events;      /* epoll event */
    epoll_data_t data;      /* User data variable */
};
enum EPOLL_EVENTS
  {
    EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN
    EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI
    EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT
    EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM
    EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND
    EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM
    EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND
    EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG
    EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR
    EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP
    EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP
    EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP
    EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT
    EPOLLET = 1u << 31
#define EPOLLET EPOLLET
  };

epoll_wait函数:

#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

events:从内核得到事件的集合;

maxevents:事件集合的大小;

timeout:超时时间,0会立即返回,-1表示永久阻塞,正数表示一个指定的值;

return:成功时返回就绪的文件描述符的个数,失败时返回-1并设置error;

LT和ET模式

epoll对文件描述符的操作由两种模式:水平触发LT(level trigger)和边沿触发ET(edge trigger)。默认的情况下为LT模式。LT模式与ET模式的区别在于:

  • LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  • ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

我们来看看一个demo看一下区别:

  1 #include <sys/types.h>
  2 #include <sys/socket.h>
  3 #include <netinet/in.h>
  4 #include <arpa/inet.h>
  5 #include <assert.h>
  6 #include <stdio.h>
  7 #include <unistd.h>
  8 #include <errno.h>
  9 #include <string.h>
 10 #include <fcntl.h>
 11 #include <stdlib.h>
 12 #include <sys/epoll.h>
 13 #include <pthread.h>
 14 
 15 #define MAX_EVENT_NUMBER 1024
 16 #define BUFFER_SIZE 10
 17 
 18 int setnonblocking( int fd )
 19 {
 20     int old_option = fcntl( fd, F_GETFL );
 21     int new_option = old_option | O_NONBLOCK;
 22     fcntl( fd, F_SETFL, new_option );
 23     return old_option;
 24 }
 25 
 26 void addfd( int epollfd, int fd, bool enable_et )
 27 {
 28     epoll_event event;
 29     event.data.fd = fd;
 30     event.events = EPOLLIN;
 31     if( enable_et )
 32     {
 33         event.events |= EPOLLET;
 34     }
 35     epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
 36     setnonblocking( fd );
 37 }
 38 
 39 void lt( epoll_event* events, int number, int epollfd, int listenfd )
 40 {
 41     char buf[ BUFFER_SIZE ];
 42     for ( int i = 0; i < number; i++ )
 43     {
 44         printf( "lt events:%d fd:%d\n", events[i].events, events[i].data.fd);
 45         int sockfd = events[i].data.fd;
 46         if ( sockfd == listenfd )
 47         {
 48             struct sockaddr_in client_address;
 49             socklen_t client_addrlength = sizeof( client_address );
 50             int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
 51             addfd( epollfd, connfd, false );
 52             printf( "accept new client:%d\n", connfd);
 53         }
 54         else if ( events[i].events & EPOLLIN )
 55         {
 56             // printf( "event trigger once\n" );
 57             memset( buf, '\0', BUFFER_SIZE );
 58             int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
 59             if( ret <= 0 )
 60             {
 61                 close( sockfd );
 62                 continue;
 63             }
 64             printf( "get %d bytes of content: %s\n", ret, buf );
 65         }
 66         else
 67         {
 68             printf( "something else happened \n" );
 69         }
 70     }
 71 }
 72 
 73 void et( epoll_event* events, int number, int epollfd, int listenfd )
 74 {
 75     char buf[ BUFFER_SIZE ];
 76     for ( int i = 0; i < number; i++ )
 77     {
 78         printf( "et events:%d fd:%d\n", events[i].events, events[i].data.fd);
 79         int sockfd = events[i].data.fd;
 80         if ( sockfd == listenfd )
 81         {
 82             struct sockaddr_in client_address;
 83             socklen_t client_addrlength = sizeof( client_address );
 84             int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
 85             addfd( epollfd, connfd, true );
 86         }
 87         else if ( events[i].events & EPOLLIN )
 88         {
 89             // printf( "event trigger once\n" );
 90             while( 1 )
 91             {
 92                 memset( buf, '\0', BUFFER_SIZE );
 93                 int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
 94                 if( ret < 0 )
 95                 {
 96                     if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
 97                     {
 98                         printf( "read later\n" );
 99                         break;
100                     }
101                     close( sockfd );
102                     break;
103                 }
104                 else if( ret == 0 )
105                 {
106                     close( sockfd );
107                 }
108                 else
109                 {
110                     printf( "get %d bytes of content: %s\n", ret, buf );
111                 }
112             }
113         }
114         else
115         {
116             printf( "something else happened \n" );
117         }
118     }
119 }
120 
121 int main( int argc, char* argv[] )
122 {
123     if( argc <= 2 )
124     {
125         printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
126         return 1;
127     }
128     const char* ip = argv[1];
129     int port = atoi( argv[2] );
130 
131     int ret = 0;
132     struct sockaddr_in address;
133     bzero( &address, sizeof( address ) );
134     address.sin_family = AF_INET;
135     inet_pton( AF_INET, ip, &address.sin_addr );
136     address.sin_port = htons( port );
137 
138     int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
139     assert( listenfd >= 0 );
140 
141     ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
142     assert( ret != -1 );
143 
144     ret = listen( listenfd, 5 );
145     assert( ret != -1 );
146 
147     epoll_event events[ MAX_EVENT_NUMBER ];
148     int epollfd = epoll_create( 5 );
149     assert( epollfd != -1 );
150     addfd( epollfd, listenfd, true );
151 
152     while( 1 )
153     {
154         int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
155         if ( ret < 0 )
156         {
157             printf( "epoll failure\n" );
158             break;
159         }
160     
161         // lt( events, ret, epollfd, listenfd );
162         et( events, ret, epollfd, listenfd );
163     }
164 
165     close( listenfd );
166     return 0;
167 }
View Code

lt out:

lt events:1 fd:3
accept new client:5
lt events:1 fd:5
get 9 bytes of content: 123456789
lt events:1 fd:5
get 4 bytes of content: 01

et out:

et events:1 fd:3
et events:1 fd:5
get 9 bytes of content: 123456789
get 4 bytes of content: 01

可以看到et的效率更高。·

EPOLLONESHOT事件

这个可以保证事件只触发一次,主要用来多线程中防止对同一socket同时操作。比如:

  1 #include <sys/types.h>
  2 #include <sys/socket.h>
  3 #include <netinet/in.h>
  4 #include <arpa/inet.h>
  5 #include <assert.h>
  6 #include <stdio.h>
  7 #include <unistd.h>
  8 #include <errno.h>
  9 #include <string.h>
 10 #include <fcntl.h>
 11 #include <stdlib.h>
 12 #include <sys/epoll.h>
 13 #include <pthread.h>
 14 
 15 #define MAX_EVENT_NUMBER 1024
 16 #define BUFFER_SIZE 1024
 17 struct fds
 18 {
 19    int epollfd;
 20    int sockfd;
 21 };
 22 
 23 int setnonblocking( int fd )
 24 {
 25     int old_option = fcntl( fd, F_GETFL );
 26     int new_option = old_option | O_NONBLOCK;
 27     fcntl( fd, F_SETFL, new_option );
 28     return old_option;
 29 }
 30 
 31 void addfd( int epollfd, int fd, bool oneshot )
 32 {
 33     epoll_event event;
 34     event.data.fd = fd;
 35     event.events = EPOLLIN | EPOLLET;
 36     if( oneshot )
 37     {
 38         event.events |= EPOLLONESHOT;
 39     }
 40     epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
 41     setnonblocking( fd );
 42 }
 43 
 44 void reset_oneshot( int epollfd, int fd )
 45 {
 46     epoll_event event;
 47     event.data.fd = fd;
 48     event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
 49     epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
 50 }
 51 
 52 void* worker( void* arg )
 53 {
 54     int sockfd = ( (fds*)arg )->sockfd;
 55     int epollfd = ( (fds*)arg )->epollfd;
 56     printf( "start new thread to receive data on fd: %d\n", sockfd );
 57     char buf[ BUFFER_SIZE ];
 58     memset( buf, '\0', BUFFER_SIZE );
 59     while( 1 )
 60     {
 61         int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
 62         if( ret == 0 )
 63         {
 64             close( sockfd );
 65             printf( "foreiner closed the connection\n" );
 66             break;
 67         }
 68         else if( ret < 0 )
 69         {
 70             if( errno == EAGAIN )
 71             {
 72                 reset_oneshot( epollfd, sockfd );
 73                 printf( "read later\n" );
 74                 break;
 75             }
 76         }
 77         else
 78         {
 79             printf( "get content: %s\n", buf );
 80             sleep( 5 );
 81         }
 82     }
 83     printf( "end thread receiving data on fd: %d\n", sockfd );
 84 }
 85 
 86 int main( int argc, char* argv[] )
 87 {
 88     if( argc <= 2 )
 89     {
 90         printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
 91         return 1;
 92     }
 93     const char* ip = argv[1];
 94     int port = atoi( argv[2] );
 95 
 96     int ret = 0;
 97     struct sockaddr_in address;
 98     bzero( &address, sizeof( address ) );
 99     address.sin_family = AF_INET;
100     inet_pton( AF_INET, ip, &address.sin_addr );
101     address.sin_port = htons( port );
102 
103     int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
104     assert( listenfd >= 0 );
105 
106     ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
107     assert( ret != -1 );
108 
109     ret = listen( listenfd, 5 );
110     assert( ret != -1 );
111 
112     epoll_event events[ MAX_EVENT_NUMBER ];
113     int epollfd = epoll_create( 5 );
114     assert( epollfd != -1 );
115     addfd( epollfd, listenfd, false );
116 
117     while( 1 )
118     {
119         int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
120         if ( ret < 0 )
121         {
122             printf( "epoll failure\n" );
123             break;
124         }
125     
126         for ( int i = 0; i < ret; i++ )
127         {
128             int sockfd = events[i].data.fd;
129             if ( sockfd == listenfd )
130             {
131                 struct sockaddr_in client_address;
132                 socklen_t client_addrlength = sizeof( client_address );
133                 int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
134                 addfd( epollfd, connfd, true );
135             }
136             else if ( events[i].events & EPOLLIN )
137             {
138                 pthread_t thread;
139                 fds fds_for_new_worker;
140                 fds_for_new_worker.epollfd = epollfd;
141                 fds_for_new_worker.sockfd = sockfd;
142                 pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
143             }
144             else
145             {
146                 printf( "something else happened \n" );
147             }
148         }
149     }
150 
151     close( listenfd );
152     return 0;
153 }
View Code

out:

start new thread to receive data on fd: 5
get content: 12345678901

read later
end thread receiving data on fd: 5

三组IO复用区别:

区别:

1、select和poll只能在LT模式下;
2、epoll可以在工作高效的ET模式下。epoll支持EPOLLONESHOT事件,该事件可以进一步减少可读、可写和异常事件被触发的次数。
实现原理:
select和poll,采用轮询的方式。每次调用都要扫描整个注册文件描述符集合,并将就绪的文件描述符返回给用户程序,故检测就绪事件O(n);
epoll_wait,采用回调函数的方法。内核检测到就绪文件描述符时将触发回调函数,回调函数将该文件描述符上对应的事件插入就绪事件队列,内核最后在适应的时机将该就绪事件队列中的内容拷贝到用户空间->epoll_wait无需轮询整个文件描述符的集合,来检测哪些事件已经就绪了。O(1)
适用情况:
当活动连接比较多的时候(回调函数被触发的过于频繁),epoll_wait的效率未必有select和poll高。
epoll_wait适用于连接数量较多,但活动连接较少的情况

 



这篇关于03Linux网络编程基础 ---- IO复用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程