前言

前两篇分别介绍了IO多路复用的select()poll()

  • select()解决了传统的网络编程中,一个线程只能服务一个客户端连接的问题,但是最多只能同时监听1024个文件描述符,并且效率比较低。
  • poll()解决了select()监听文件描述符个数限制的问题,但是并没有解决效率的问题。

本篇介绍epoll(),既解决了描述符个数限制的问题,又解决了效率问题。

epoll()poll()有相同的语义,是poll()的改良版,首字母e代表event,即事件的意思。为IO多路复用的一种,在Linux下,当监控大量描述符的时候,epoll()的性能要优于select()poll()

在内核态epoll()主要维护两个列表:

  • interest list:感兴趣的列表,即用户向内核注册文件描述符,内核将文件描述符加入此列表,并感知列表中所有文件描述符的I/O事件(可读、可写等)。
  • ready list:准备好的列表,即该列表中的文件描述符已准备好进行I/O。这个列表是interest list的一个子集。

主要使用如下三个系统调用,后面将详细介绍这三个系统调用。

  • epoll_create()
  • epoll_ctl()
  • epoll_wait()

系统调用

1. epoll_create

描述:

#include <sys/epoll.h>

int epoll_create(int size);

epoll_create()创建一个epoll实例,后续的所有系统调用都需要该实例,当不需要该实例的时候,需要调用close()函数来关闭,系统内核将销毁该实例及其相关的资源。

参数:

  • size:该参数原本是告知内核,需要监控的文件描述符的个数,内核根据此参数来分配初始的内存空间,如果监控的描述符的个数超过此数字,内核再额外分配内存。但是目前,内核会根据监听描述符的个数,动态的分配,不再需要此size值作为参考,但是为了保持兼容性,此值要求大于0即可。

返回值:

  • 成功的话,返回一个文件描述符,即代表epoll实例。该值为非负整数。
  • 发生错误的话,返回-1,并且设置errno的值,可以通过打印errno来判断发生了什么错误。

2. epoll_ctl

描述:

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

该函数是用来操作前面提到的interest list的,可以从interest list中添加、修改、删除描述符fd

参数:

  • epfd

    • epoll_ctl()创建的epoll实例。
  • op

    • EPOLL_CTL_ADD

      fd添加到interestl list中。event为要内核监控的事件,见下面event的描述。

    • EPOLL_CTL_MOD

      修改fd的事件,即event参数。用来告知内核,对fd的事件做变更。

    • EPOLL_CTL_DEL

      fdinterest list中移除。即告知内核,不再对fd的任何事件进行监控,内核将忽略此处的event参数。

  • fd

    要监控的文件描述符,比如已经调用listen()函数的监听描述符,或者已经调用accept()函数的客户端的描述符。

  • event

    event的类型为epoll_event,该类型的定义如下:

    typedef union epoll_data {
        void        *ptr;
        int          fd;
        uint32_t     u32;
        uint64_t     u64;
    } epoll_data_t;
    
    struct epoll_event {
        uint32_t     events;      /* Epoll events */
        epoll_data_t data;        /* User data variable */
    };

    其中data成员为当epoll_wait()返回之后,内核的传出值。

    events成员是一个位掩码,代表需要内核监控的事件,该值可以由以下多个通过运算符组合起来:

    • EPOLLIN

      文件描述符可读。

    • EPOLLOUT

      文件描述符可写。

    • EPOLLRDHUP

      流描述符关闭连接,或者半关闭状态(关闭写)的时候。

    • EPOLLPRI

      带外数据(紧急数据)。

    • EPOLLERR

      当发生错误时。当读端关闭的时候,写端会触发事件。

    • EPOLLHUP

      挂起。

    • EPOLLET

      边缘触发模式。如果读端没有从socket缓冲区中把数据全部读出来,则不会触发第二次。

      默认为水平触发模式。具体的区别见下面的章节水平触发和边缘触发

    • EPOLLONESHOT

      告知内核对该文件描述符仅仅监控一次。

    • EPOLLWAKEUP
    • EPOLLEXCLUSIVE

返回值:

  • 成功时:返回0
  • 失败时:返回-1,并且设置errno的值,可以通过打印errno来判断发生了什么错误。

3. epoll_wait

描述:

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events,
                int maxevents, int timeout);

该函数在指定的epoll实例上等待使用epoll_ctl注册的事件,当发生注册的事件时,通过events参数返回发生的事件以及文件描述符fd。当发生感兴趣的事件或者发生超时,该函数才返回。

参数:

  • epfd

    通过epoll_create()创建的epoll实例。

  • events

    events的类型为epoll_event的指针,该类型的见上面章节。其中:

    • data字段代表发生事件的文件描述符。
    • events字段代表该描述符发生了哪些事件。
  • maxevents

    events参数的个数,epoll_wait()每次最多返回maxevents个。

  • timeout

    • -1:永远阻塞,除非发生感兴趣的事件。
    • 0:立即返回,即使没有感兴趣的事件发生。
    • 正数:等待的毫秒数。

返回值:

  • 大于0:代表可以进行I/O的文件描述符的个数。
  • 等于0:代表等待了timeout毫秒后,超时返回。
  • -1:代表发生了错误,并且内核会设置errno的值。

注意事项:

  1. 当一个线程阻塞于epoll_wait()调用时,可以通过另一个线程向epoll实例添加文件描述符,当新的文件描述符可I/O的时候,阻塞的线程会返回。
  2. 当在多线程(多进程)中,同时在等同一个文件描述符,则只有一个线程被唤醒,避免了惊群效应。

水平触发和边缘触发

通俗的来讲,两者的区别为:

  • 水平触发:可以触发多次,当缓冲区的数据没有读完时,下次调用epoll_wait()仍然会触发。
  • 边缘触发:只能触发一次,当缓冲区的数据没有读完时,下次调用epoll_wait()不会触发。所以编程时若使用边缘触发模式,要求将socket设置为非阻塞模式,同时当socket可读时,循环读取,直至返回EAGAIN错误,代表已经将缓冲区读空,然后继续调用epoll_wait()

代码示例

epoll的边缘触发模式,实现一个echo服务端。代码如下所示:

#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <limits.h>

#define MAX_EVENTS 10
#define MAXLINE 4096
char buf[MAXLINE];
int read_count;

void setnonblocking(int sock);

int main(int argc, char *argv[])
{
    int listenport = 6666;
    socklen_t clientlen;
    struct sockaddr_in servaddr, clientaddr;
    struct epoll_event ev, events[MAX_EVENTS];
    int listen_sock, conn_sock, nfds, epollfd, n;

    if (argc != 2)
    {
        printf("Usage: ./a.out <ListenPort>\n");
        exit(EXIT_FAILURE);
    }

    listenport = atoi(argv[1]);

    listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_sock < 0)
    {
        perror("socket error");
        exit(EXIT_FAILURE);
    }

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(listenport);
    if (bind(listen_sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
    {
        perror("bind error");
        exit(EXIT_FAILURE);
    }

    if (listen(listen_sock, SOMAXCONN) < 0)
    {
        perror("listen error");
        exit(EXIT_FAILURE);
    }

    epollfd = epoll_create(5);
    if (epollfd == -1)
    {
        perror("epoll_create");
        exit(EXIT_FAILURE);
    }

    ev.events = EPOLLIN;
    ev.data.fd = listen_sock;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1)
    {
        perror("epoll_ctl: listen_sock");
        exit(EXIT_FAILURE);
    }

    for (;;)
    {
        nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (nfds == -1)
        {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

        for (n = 0; n < nfds; ++n)
        {
            if (events[n].data.fd == listen_sock)
            {
                clientlen = sizeof(clientaddr);

                conn_sock = accept(listen_sock,
                                   (struct sockaddr *)&clientaddr, &clientlen);
                if (conn_sock == -1)
                {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }
                setnonblocking(conn_sock);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = conn_sock;
                if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                              &ev) == -1)
                {
                    perror("epoll_ctl: conn_sock");
                    exit(EXIT_FAILURE);
                }
            }
            else
            {
                int fd = events[n].data.fd;

                // 循环读取并写回到对方
                while ((read_count = read(fd, buf, MAXLINE)) > 0)
                {
                    write(fd, buf, read_count);
                }

                // 对方断开连接
                if (read_count == 0)
                {
                    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
                    close(fd);
                }
                else if (read_count < 0)
                {
                    if (errno == EAGAIN)
                    {
                        // 正常读取结束
                        continue;
                    }
                    else
                    {
                        perror("error");
                        epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
                        close(fd);
                    }
                }
            }
        }
    }

    close(epollfd);
}

// 将socket设置为非阻塞模式
void setnonblocking(int sock)
{
    int opts;
    opts = fcntl(sock, F_GETFL);
    if (opts < 0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts | O_NONBLOCK;
    if (fcntl(sock, F_SETFL, opts) < 0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

要求

  • 内核版本:≥ 2.6
  • glibc版本:≥ 2.3.2

标签: none

添加新评论