Socket学习系列之IO复用select、poll、epoll的区别

[TOC]

描述符就绪条件

可读(任意一个满足)

  • 该套接字接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记的当前大小。对这样的套接字执行读操作不会阻塞并将返回一个大于0的值(也就是返回准备好读入的数据)。我们可以使用SO_RCVLOWAT套接字选项设置该套接字的低水位标记。对于TCP和UDP套接字而言,其默认值是1。
  • 该链接的读半部关闭(也就是接收了FIN的TCP链接)对这样的套接字的读操作将不再阻塞并返回0(也就是返回EOF)
  • 该套接字是一个监听套接字且已完成的连接数不为0.对这样的套接字的accept通常不会阻塞。
  • 其上有一个套接字错误待处理。对这样的套接字的读操作将不会阻塞并返回-1(也就是返回一个错误),同时把errno设置成确切的错误条件。这些待处理错误也可以通过制定SO_ERROR套接字选项调用getsockopt获取并清除。

可写(任意一个满足)

  • 该套接字发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的当前大小,并且或者该套接字已连接,或者该套接字不需要连接(如UDP套接字)。这意味着如果我们把这样的套接字设置成非阻塞,写操作将不阻塞并返回一个正值(如由传输层接收的字节数)。我们可以使用SO_SNDLOWAT套接字选项来设置该套接字的低水位标记。对于TCP和UDP套接字而言,其默认值通常是2048.
  • 该套接字的写半部关闭。对这样的套接字的写操作将产生SIGPIPE信号。
  • 使用非阻塞connect的套接字已建立连接,或者connect已经以失败告终。
  • 其上有一个套接字错误待处理。对这样的套接字的写操作将不会阻塞并返回-1(也就是返回一个错误),同时把errno设置成确切的错误条件。这些待处理错误也可以通过制定SO_ERROR套接字选项调用getsockopt获取并清除。

附加说明

接收低水位标记和发送低水位标记的目的在于:允许应用程序控制在select返回可读或可写条件之前有多少数据可读或有多大的空间可用于写。举例来说,如果我们知道除非至少存在64个字节的数据,否则我们的应用进程没有任何有效工作可做,那么可以把接收低水位标记设置为64,以防止少于64个直接的数据准备好读时select唤醒我们。

条件 可读吗? 可写吗? 异常吗?
有数据可读
关闭连接的读一半
给监听套接字准备好新连接


有可用于写的空间
关闭连接的写一半

待处理错误
TCP带

select、poll、epoll简介

epoll跟select都能提供多路I/O复用的解决方案。在现在的Linux内核里有都能够支持,其中epoll是Linux所特有,而select则应该是POSIX所规定,一般操作系统均有实现。

select:

select函数运行进程指示内核等待多个事件中的任何一个发生,并只在一个或多个事件发生或者经历一段指定的时间后才唤醒它。

select 函数监视的描述符分3类,分别是writefdsreadfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符

例如:

我们可以调用select,告知内核仅在下列情况发生时才返回:
– 集合{1,4,5}中的任意描述符准备好读
– 集合{3,6}中任意描述符准备好写
– 集合{7,9}中任意描述符有异常情况待处理
– 已经历了10.3秒

select本质上是通过设置(FD_SET)、检查(FD_ISSET)存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:

  1. 单个进程可监视的fd数量被限制,即能监听端口的大小有限。

    一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.

  2. 对socket进行扫描时是线性扫描(通过委托内核监听fd描述符事件,内核完成任务返回后,进程要判断那些socket fd有指定事件产生时,需要遍历对比),即采用轮询的方法,效率较低:

    当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。

  3. 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大(每次使用select都需要将文件描述符从用户态复制到内核)

// 函数原型
int select(int n,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
// struct timeval结构体定义
//     tv_sec  单位是秒
//     tv_usec 单位是微秒,不是毫秒!毫秒的英文单词是millisecond。   
struct timeval {
    __kernel_time_t        tv_sec;      /* seconds */
    __kernel_suseconds_t    tv_usec;    /* microseconds */
};

// 函数说明
// select()用来等待文件描述词状态的改变。参数n代表最大的文件描述词加1,参数readfds、writefds和exceptfds 称为描述词组,是用来回传该描述词的读,写或例外的状况。底下的宏提供了处理这三种描述词组的方式:
FD_CLR(inr fd,fd_set *set)    // 用来清除描述词组set中相关fd的位
FD_ISSET(int fd,fd_set *set)  // 用来测试描述词组set中相关fd的位是否为真
FD_SET(int fd,fd_set *set)   // 用来设置描述词组set中相关fd的位
FD_ZERO(fd_set *set)        //用来清除描述词组set的全部位

// 思路流程
// 1.将待检测的fd初始化到原始fd_set集合中
while(1) // 2.循环委托内核检测
{
    // 每次都需要将文件描述符复制到内核
    retval = select(max_fd+1, &rfds, &wfds, &efds, tv_p);
    for()
    {
        // 3.处理可读可写等文件描述符
        if (fd_isset(i,&rfds))
        {
            // 处理accept或者是读取客户端数据
            // 判断可读fd是否是监听的
            // 如果是已经连接客户端,则处理客户端发送的数据
        }
    }
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>


#define PORT 8888
#define MAX_LINE 2048
#define LISTENQ 20


int main(int argc , char **argv)
{
    int i, maxi, maxfd, listenfd, connfd, sockfd;

    int nready , client[FD_SETSIZE];

    ssize_t n, ret;

    fd_set rset , allset;

    char buf[MAX_LINE];

    socklen_t clilen;

    struct sockaddr_in servaddr , cliaddr;

    /*(1) 得到监听描述符*/
    listenfd = socket(AF_INET , SOCK_STREAM , 0);

    /*(2) 绑定套接字*/
    bzero(&servaddr , sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);

    bind(listenfd , (struct sockaddr *)&servaddr , sizeof(servaddr));

    /*(3) 监听*/
    listen(listenfd , LISTENQ);

    /*(4) 设置select*/
    maxfd = listenfd;
    maxi = -1;
    for(i=0 ; i<FD_SETSIZE ; ++i)
    {
        client[i] = -1;
    }//for
    FD_ZERO(&allset);
    FD_SET(listenfd , &allset);

    /*(5) 进入服务器接收请求死循环*/
    while(1)
    {
        rset = allset;
        // nready这里是指定可读fd数量
        // rset在select返回后是内核给到的可读fd集合位
        // 我们通过遍历所有fd跟rset做对比确定(FD_ISSET)是否可处理
        nready = select(maxfd+1 , &rset , NULL , NULL , NULL);

        if(FD_ISSET(listenfd , &rset))
        {
            /*接收客户端的请求*/
            clilen = sizeof(cliaddr);

            printf("\naccpet connection~\n");

            if((connfd = accept(listenfd , (struct sockaddr *)&cliaddr , &clilen)) < 0)
            {
                perror("accept error.\n");
                exit(1);
            }//if       

            printf("accpet a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr) , cliaddr.sin_port);

            /*将客户链接套接字描述符添加到数组*/
            for(i=0 ; i<FD_SETSIZE ; ++i)
            {
                if(client[i] < 0)
                {
                    client[i] = connfd;
                    break;
                }//if
            }//for

            if(FD_SETSIZE == i)
            {
                perror("too many connection.\n");
                exit(1);
            }//if

            FD_SET(connfd , &allset);
            if(connfd > maxfd)
                maxfd = connfd;
            if(i > maxi)
                maxi = i;

            if(--nready < 0)
                continue;
        }//if

        // 遍历所有连接套接字
        for(i=0; i<=maxi ; ++i)
        {
            if((sockfd = client[i]) < 0)
                continue;
            // 判断套接字是否在select返回的可读rset中
            // 如果在则进行处理
            if(FD_ISSET(sockfd , &rset))
            {
                /*处理客户请求*/
                printf("\nreading the socket~~~ \n");

                bzero(buf , MAX_LINE);
                if((n = read(sockfd , buf , MAX_LINE)) <= 0)
                {
                    close(sockfd);
                    FD_CLR(sockfd , &allset);
                    client[i] = -1;
                }//if
                else{
                    printf("clint[%d] send message: %s\n", i , buf);
                    if((ret = write(sockfd , buf , n)) != n)

                    {
                        printf("error writing to the sockfd!\n");
                        break;
                    }//if
                }//else
                if(--nready <= 0)
                    break;
            }//if
        }//for
    }//while
}

poll:

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

  1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
  2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

epoll:

epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次

epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就需态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知

epoll的优点:

  1. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口);

  2. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;

    即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

函数详解

  1. epoll_create

    int epoll_create(int size)

    创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。

    这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。自从Linux 2.6.8开始,size参数被忽略,但是依然要大于0。

  2. epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

    第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL:从epfd中删除一个fd;

    第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

    struct epoll_event {
    __uint32_t events; /* Epoll events */
    
    epoll_data_t data; /* User data variable */
    };
    
    //union
    typedef union epoll_data
    {
     void *ptr;
     int fd;
     uint32_t u32;
     uint64_t u64;
    } epoll_data_t;
    

    events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
  1. epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    等待事件的产生,类似于select()调用。

  • 参数epfd:由epoll_create创建的epoll句柄
  • 参数events用来从内核得到事件的集合
  • maxevents表示每次能处理的最大事件数,告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size
  • 参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。

    该函数返回需要处理的事件数目,如返回0表示已超时。

实例代码

#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>

using namespace std;

#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000

void setnonblocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

int main(int argc, char* argv[])
{
    int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;


    if ( 2 == argc )
    {
        if( (portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);
        return 1;
    }



    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件

    struct epoll_event ev,events[20];
    //生成用于处理accept的epoll专用的文件描述符

    epfd=epoll_create(256);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    //把socket设置为非阻塞方式

    //setnonblocking(listenfd);

    //设置与要处理的事件相关的文件描述符

    ev.data.fd=listenfd;
    //设置要处理的事件类型

    ev.events=EPOLLIN|EPOLLET;
    //ev.events=EPOLLIN;

    //注册epoll事件

    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr="127.0.0.1";
    inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber);

    serveraddr.sin_port=htons(portnumber);
    bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    maxi = 0;
    for ( ; ; ) {
        //等待epoll事件的发生

        nfds=epoll_wait(epfd,events,20,500);
        //处理所发生的所有事件

        for(i=0;i<nfds;++i)
        {
            if(events[i].data.fd==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。

            {
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }
                //setnonblocking(connfd);

                char *str = inet_ntoa(clientaddr.sin_addr);
                cout << "accapt a connection from " << str << endl;
                //设置用于读操作的文件描述符

                ev.data.fd=connfd;
                //设置用于注测的读操作事件

                ev.events=EPOLLIN|EPOLLET;
                //ev.events=EPOLLIN;

                //注册ev

                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }
            else if(events[i].events&EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。

            {
                cout << "EPOLLIN" << endl;
                if ( (sockfd = events[i].data.fd) < 0)
                    continue;
                if ( (n = read(sockfd, line, MAXLINE)) < 0) {
                    if (errno == ECONNRESET) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } else
                        std::cout<<"readline error"<<std::endl;
                } else if (n == 0) {
                    close(sockfd);
                    events[i].data.fd = -1;
                }
                line[n] = '/0';
                cout << "read " << line << endl;
                //设置用于写操作的文件描述符

                ev.data.fd=sockfd;
                //设置用于注测的写操作事件

                ev.events=EPOLLOUT|EPOLLET;
                //修改sockfd上要处理的事件为EPOLLOUT

                //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);

            }
            else if(events[i].events&EPOLLOUT) // 如果有数据发送

            {
                sockfd = events[i].data.fd;
                write(sockfd, line, n);
                //设置用于读操作的文件描述符

                ev.data.fd=sockfd;
                //设置用于注测的读操作事件

                ev.events=EPOLLIN|EPOLLET;
                //修改sockfd上要处理的事件为EPOLIN

                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
            }
        }
    }
    return 0;
}
// 推荐阅读: http://blog.chinaunix.net/uid-24459558-id-4713055.html

select、poll、epoll 区别总结:

  • 支持一个进程所能打开的最大连接数
select 单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是3232,同理64位机器上FD_SETSIZE为3264),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试。
poll poll本质上和select没有区别,但是它没有最大连接数的限制,原因是它是基于链表来存储的
epoll 虽然连接数有上限,但是很大,1G内存的机器上可以打开10万左右的连接,2G内存的机器可以打开20万左右的连接
  • FD剧增后带来的IO效率问题
select 因为每次调用时都会对连接进行线性遍历,所以随着FD的增加会造成遍历速度慢的“线性下降性能问题”。
poll 同上
epoll 因为epoll内核中实现是根据每个fd上的callback函数来实现的,只有活跃的socket才会主动调用callback,所以在活跃socket较少的情况下,使用epoll没有前面两者的线性下降的性能问题,但是所有socket都很活跃的情况下,可能会有性能问题。
  • 消息传递方式
select 内核需要将消息传递到用户空间,都需要内核拷贝动作
poll 同上
epoll epoll通过内核和用户空间共享一块内存来实现的。

总结:

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

1、表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

2、select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的(操作系统需要将数据从内核态复制到用户态),而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

为什么 IO 多路复用要搭配非阻塞 IO?

select返回可读,和 read 去读,这是两个独立的系统调用,两个操作之间是有窗口的,也就是说 select 返回可读,紧接着去 read,不能保证一定可读。

惊群现象,就是一个典型场景,多个进程或者线程通过 select 或者 epoll 监听一个 listen socket,当有一个新连接完成三次握手之后,所有进程都会通过 select 或者 epoll 被唤醒,但是最终只有一个进程或者线程 accept 到这个新连接,若是采用了阻塞 I/O,没有accept 到连接的进程或者线程就 block 住了。

推荐阅读

发表评论