大多数时候,除了响应事件之外,应用程序还希望执行一些数据缓冲。例如,当我们想要写入数据时,通常的模式运行如下:

  • 决定我们要向连接写入一些数据;将该数据放入缓冲区。
  • 等待连接变为可写
  • 尽可能多地写入数据
  • 记住我们写了多少,如果我们还有更多的数据要写,等待连接再次变为可写。

这种缓冲 IO 模式很常见,以至于 Libevent 为其提供了通用机制。 bufferevent由底层传输(如套接字)、读取缓冲区和写入缓冲区组成。与在底层传输准备好读取或写入时提供回调的常规事件不同,bufferevent 在读取或写入足够的数据时调用其用户提供的回调。

有多种类型的bufferevent,它们都共享一个公共接口。在撰写本文时,存在以下类型:

  • 基于套接字的bufferevent

    它使用 event_* 接口作为后端方法,从底层流套接字发送和接收数据。

  • 异步IO的bufferevent

    使用 Windows IOCP 接口向底层流套接字发送和接收数据的bufferevent。(仅限 Windows,实验性的)

  • 过滤的bufferevent

    在将传入和传出数据传递给底层 bufferevent 对象之前处理传入和传出数据的bufferevent - 例如,压缩或转换数据。

  • 成对的bufferevent

    两个之间互相传输数据

注意:

bufferevent目前只支持面向字节流的TCP协议,在未来可能会支持面向数据报的UDP协议。

本节中的所有函数和类型都在 event2/bufferevent.h 中声明。与 evbuffer 相关的函数在 event2/buffer.h 中声明;有关这些的信息,请参阅下一章。

Bufferevents and evbuffers

每一个bufferevent有一个输入缓冲区和输出缓冲区,类型为struct evbuffer,当你对bufferevent执行写操作时,实际上是写到了输出缓冲区上,当bufferevent有数据需要你读时,你实际上是从输入缓冲区进行读取的。

我们将在后面的章节讨论evbuffer。

回调和水位

每一个bufferevent有两个相关的回调:读回调、写回调。默认情况下,只要从底层传输读取任何数据,就会调用读取回调。每当输出缓冲区中的足够数据被清空到底层传输时,就会调用写入回调。您可以通过bufferevent的读写“水位”来覆盖这些函数的行为。

每一个bufferevent有4个水位:

  • 读低水位

    当发生读取数据时,输入缓冲区大于等于此数值,则会调用读取回调。默认是0,所以每次读取数据都会调用读取回调。

  • 读高水位

    如果 bufferevent 的输入缓冲区达到此级别,则 bufferevent 将停止读取,直到从输入缓冲区中排出足够的数据使我们再次低于它。默认为无限制,因此我们永远不会因为输入缓冲区的大小而停止读取。

  • 写低水位

    当发生写数据时,输出缓冲区小于等于此数值,我们都会调用写回调。默认为 0,因此除非清空输出缓冲区,否则不会调用写入回调。

  • 写高水位

    不直接由bufferevent使用,当bufferevent用作另一个bufferevent的底层传输时,此水位可能具有特殊含义。请参阅下面有关过滤bufferevent的说明。

bufferevent 也有一个“错误”或“事件”回调,它被调用以告诉应用程序有关非面向数据的事件,例如连接关闭或发生错误时。定义了以下事件标志:

  • BEV_EVENT_READING

    读事件

  • BEV_EVENT_WRITING

    写事件

  • BEV_EVENT_ERROR

    错误事件

  • BEV_EVENT_TIMEOUT

    超时事件

  • BEV_EVENT_EOF

    结束事件

  • BEV_EVENT_CONNECTED

    连接成功事件

延迟回调

默认情况下,当相应的条件发生时,会立即执行 bufferevent 回调。 (这也适用于 evbuffer 回调;我们稍后会谈到。)当依赖关系变得复杂时,这种立即调用可能会带来麻烦。例如,假设有一个回调在 evbuffer A 变空时将数据移动到 evbuffer A 中,而另一个回调在 evbuffer A 变满时将数据从 evbuffer A 中处理出来。由于这些调用都发生在堆栈上,如果依赖关系变得足够糟糕,您可能会面临堆栈溢出的风险。

为了解决这个问题,你可以告诉一个 bufferevent(或一个 evbuffer)它的回调应该被推迟。当满足延迟回调的条件时,它不会立即调用它,而是作为 event_loop() 调用的一部分排队,并在常规事件的回调之后调用。

bufferevents 的可选标志位

在创建bufferevents的时候,你可以传入标志位来改变其默认行为,标志位如下所示:

  • BEV_OPT_CLOSE_ON_FREE

    当bufferevent被释放的时候,关闭底层传输,这会关闭底层socket传输,释放底层的bufferevent等。

  • BEV_OPT_THREADSAFE

    自动为bufferevent分配锁,可以在多线程中安全的使用

  • BEV_OPT_DEFER_CALLBACKS

    如上所述,延迟回调

  • BEV_OPT_UNLOCK_CALLBACKS

    默认情况下,当bufferevent设置为线程安全时,当调用用户的回调时,bufferevent会自动加锁,设置此选项告诉libevent在调用用户回调的时候不加锁。

基于socket的bufferevents

最简单的bufferevent类型是基于套接字的。基于套接字的 bufferevent 使用 Libevent 的底层事件机制来检测底层网络套接字何时准备好进行读取和/或写入操作,并使用底层网络调用(如 readv、writev、WSASend 或 WSARecv)来传输和接收数据。

创建基于套接字的bufferevent

可以使用bufferevent_socket_new()函数创建一个基于套接字的bufferevent:

struct bufferevent *bufferevent_socket_new(
    struct event_base *base,
    evutil_socket_t fd,
    enum bufferevent_options options);
  • base:event_base
  • options:位掩码(BEV_OPT_CLOSE_ON_FREE等)
  • fd:可选的文件描述符,可以设置为-1,代表想之后设置文件描述符。

提示:

确保传入此函数的socket为非阻塞模式,libevent提供了便利的方法来设置非阻塞evutil_make_socket_nonblocking

这个函数在成功时返回一个bufferevent,失败时返回NULL。

在基于套接字的bufferevent上启动连接

如果bufferevent的socket还未连接,你可以启动一个新连接

int bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *address, int addrlen);

其中参数address、addrlen和标准的connect函数的参数一样,如果bufferevent还没有设置socket,调用此函数会分配一个新的socket给它,并设置为非阻塞模式。

如果 bufferevent 已经有一个套接字,调用 bufferevent_socket_connect() 会告诉 Libevent 套接字未连接,并且在连接操作成功之前不应在套接字上进行读取或写入。

在连接没有成功建立之前往输出缓冲区写数据是允许的。

此函数在成功时返回0,失败时返回-1。

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>

void eventcb(struct bufferevent *bev, short events, void *ptr)
{
    if (events & BEV_EVENT_CONNECTED) {
         /* We're connected to 127.0.0.1:8080.   Ordinarily we'd do
            something here, like start reading or writing. */
    } else if (events & BEV_EVENT_ERROR) {
         /* An error occured while connecting. */
    }
}

int main_loop(void)
{
    struct event_base *base;
    struct bufferevent *bev;
    struct sockaddr_in sin;

    base = event_base_new();

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
    sin.sin_port = htons(8080); /* Port 8080 */

    bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

    bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);

    if (bufferevent_socket_connect(bev,
        (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        /* Error starting connection */
        bufferevent_free(bev);
        return -1;
    }

    event_base_dispatch(base);
    return 0;
}

bufferevent_base_connect() 函数是在 Libevent-2.0.2-alpha 中引入的。在此之前,您必须自己在套接字上手动调用 connect(),当连接完成时,bufferevent 会将其报告为写入。

译者注:

此处原作者应该是笔误,函数的名字为bufferevent_socket_connect

请注意,如果您使用 bufferevent_socket_connect() 来建立socket连接,您只会收到 BEV_EVENT_CONNECTED 事件。如果您自己调用 connect()来建立连接,将报告写入事件。

如果您想自己调用connect(),但在连接成功时仍然收到BEV_EVENT_CONNECTED 事件,请在connect() 返回-1 且errno 等于EAGAIN 或EINPROGRESS 后调用bufferevent_socket_connect(bev, NULL, 0)。

通过主机名启动连接

很多时候,您希望将解析主机名和连接到它的操作合并为一个操作。有一个接口:

int bufferevent_socket_connect_hostname(struct bufferevent *bev,
    struct evdns_base *dns_base, int family, const char *hostname,
    int port);
int bufferevent_socket_get_dns_error(struct bufferevent *bev);

此函数解析 DNS 名称hostname,查找family类型的address。(允许的family类型是 AF_INET、AF_INET6 和 AF_UNSPEC。)如果名称解析失败,它会调用带有错误事件的事件回调。如果成功,它会像 bufferevent_socket_connect 一样启动连接尝试。

dns_base参数是可选的,如果是NULL的话,libevent会在等待名称查找完成之前阻塞,这通常不是你想要的。如果提供的话,则libevent使用它来异步查找主机名,有关DNS的更多信息,请参阅第九章。

就像bufferevent_socket_connect()函数一样,这个函数告诉libevent,bufferevent上的任何现有套接字都是未连接的,并且在解析完成且连接操作成功之前不应在套接字上进行读取或者写入操作。

如果发生错误,错误原因可能是DNS主机名查找错误,你可以调用bufferevent_socket_get_dns_error()函数来获取最近发生的错误。如果返回值为0,则代表没有错误发生。

/* Don't actually copy this code: it is a poor way to implement an
   HTTP client.  Have a look at evhttp instead.
*/
#include <event2/dns.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/event.h>

#include <stdio.h>

void readcb(struct bufferevent *bev, void *ptr)
{
    char buf[1024];
    int n;
    struct evbuffer *input = bufferevent_get_input(bev);
    while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
        fwrite(buf, 1, n, stdout);
    }
}

void eventcb(struct bufferevent *bev, short events, void *ptr)
{
    if (events & BEV_EVENT_CONNECTED) {
         printf("Connect okay.\n");
    } else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
         struct event_base *base = ptr;
         if (events & BEV_EVENT_ERROR) {
                 int err = bufferevent_socket_get_dns_error(bev);
                 if (err)
                         printf("DNS error: %s\n", evutil_gai_strerror(err));
         }
         printf("Closing\n");
         bufferevent_free(bev);
         event_base_loopexit(base, NULL);
    }
}

int main(int argc, char **argv)
{
    struct event_base *base;
    struct evdns_base *dns_base;
    struct bufferevent *bev;

    if (argc != 3) {
        printf("Trivial HTTP 0.x client\n"
               "Syntax: %s [hostname] [resource]\n"
               "Example: %s www.google.com /\n",argv[0],argv[0]);
        return 1;
    }

    base = event_base_new();
    dns_base = evdns_base_new(base, 1);

    bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, readcb, NULL, eventcb, base);
    bufferevent_enable(bev, EV_READ|EV_WRITE);
    evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]);
    bufferevent_socket_connect_hostname(
        bev, dns_base, AF_UNSPEC, argv[1], 80);
    event_base_dispatch(base);
    return 0;
}

通用bufferevent操作

释放bufferevent

void bufferevent_free(struct bufferevent *bev);

这个函数释放一个bufferevent,bufferevent是内部引用计数的,所以如果bufferevent在你释放它时有挂起的延迟回调,他在回调完成之前不会被删除。

然后,bufferevent_free()函数会尽快释放bufferevent,如果在bufferevent上有待写入的数据,他可能不会在bufferevent释放之前被刷新。

如果设置了BEV_OPT_CLOSE_ON_FREE 标志位,并且此bufferevent有一个与之关联的socket或底层bufferevent作为其传输,则当您释放bufferevent时,该传输将关闭。

操纵回调、水位、启用操作

typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,
    short events, void *ctx);

void bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg);

void bufferevent_getcb(struct bufferevent *bufev,
    bufferevent_data_cb *readcb_ptr,
    bufferevent_data_cb *writecb_ptr,
    bufferevent_event_cb *eventcb_ptr,
    void **cbarg_ptr);

bufferevent_setcb()函数修改bufferevent的一个或多个回调。当足够的数据被读取时调用读回调,当足够的数据被写入时调用写回调,当有事件发生时调用事件回调。第一个参数时发生事件的bufferevent,最后一个参数是用户在bufferevent_setcb()中提供的cbarg,你可以使用此方法传递参数至自己的回调函数中去。事件回调中的events参数为事件标志位的位集合。参考上述“回调和水位”部分。

你可以用NULL参数来代替回调函数,达到禁用回调的目的。注意所有的回调函数共享同一个cbarg参数,所以如果修改此参数会影响到所有的回调。

可以给bufferevent_getcb()传递指针来获取当前的回调,会将readcb_ptr设置位读回调,writecb_ptr设置为写回调,eventcb_ptr设置位事件回调,cbarg_ptr指向当前回调参数字段。如果任一参数为NULL,则被忽略。

void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);

short bufferevent_get_enabled(struct bufferevent *bufev);

你可以在一个bufferevent上启动或禁用EV_READEV_WRITEEV_READ|EV_WRITE。当读或者写没有启用的时候,bufferevent不会尝试进行数据的读写。

输出缓冲区为空的时候没必要禁用写操作:bufferevent会自动停止写入,当有数据可写的时候会重新启动。

类似的,当输入缓冲区到达高水位的时候,没必要禁用读操作:bufferevent会自动停止读取,当有足够的空间可读的时候会自动重启。

默认的,一个新创建的bufferevent启用了写操作,但是没有启用读操作。

你可以调用bufferevent_get_enabled()函数来确定在当前bufferevent上启用了哪些事件。

void bufferevent_setwatermark(struct bufferevent *bufev, short events,
    size_t lowmark, size_t highmark);

bufferevent_setwatermark()函数调整整个缓冲区的读水位、写水位或者两者。(如果events参数设置为EV_READ,则会调整读水位,如果events参数设置为EV_WRITE,则会调整写水位)

高水位的0代表无限制。

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>

#include <stdlib.h>
#include <errno.h>
#include <string.h>

struct info {
    const char *name;
    size_t total_drained;
};

void read_callback(struct bufferevent *bev, void *ctx)
{
    struct info *inf = ctx;
    struct evbuffer *input = bufferevent_get_input(bev);
    size_t len = evbuffer_get_length(input);
    if (len) {
        inf->total_drained += len;
        evbuffer_drain(input, len);
        printf("Drained %lu bytes from %s\n",
             (unsigned long) len, inf->name);
    }
}

void event_callback(struct bufferevent *bev, short events, void *ctx)
{
    struct info *inf = ctx;
    struct evbuffer *input = bufferevent_get_input(bev);
    int finished = 0;

    if (events & BEV_EVENT_EOF) {
        size_t len = evbuffer_get_length(input);
        printf("Got a close from %s.  We drained %lu bytes from it, "
            "and have %lu left.\n", inf->name,
            (unsigned long)inf->total_drained, (unsigned long)len);
        finished = 1;
    }
    if (events & BEV_EVENT_ERROR) {
        printf("Got an error from %s: %s\n",
            inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
        finished = 1;
    }
    if (finished) {
        free(ctx);
        bufferevent_free(bev);
    }
}

struct bufferevent *setup_bufferevent(void)
{
    struct bufferevent *b1 = NULL;
    struct info *info1;

    info1 = malloc(sizeof(struct info));
    info1->name = "buffer 1";
    info1->total_drained = 0;

    /* ... Here we should set up the bufferevent and make sure it gets
       connected... */

    /* Trigger the read callback only whenever there is at least 128 bytes
       of data in the buffer. */
    bufferevent_setwatermark(b1, EV_READ, 128, 0);

    bufferevent_setcb(b1, read_callback, NULL, event_callback, info1);

    bufferevent_enable(b1, EV_READ); /* Start reading. */
    return b1;
}

操纵bufferevent中的数据

如果不进行数据的读取和写入,那么进行数据的网络传输将变得毫无意义。bufferevent提供了如下的方法可以进行数据的读写。

struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);

这是两个非常基础而且强大的函数:分别各自返回了输出和输出缓冲区。有关evbuffer的更多细节,请参考下一章节。

注意:应用程序只能在输入缓冲区上移除数据(不是添加)。只能在输出缓冲区上添加数据(不是移除)。

int bufferevent_write(struct bufferevent *bufev,
    const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
    struct evbuffer *buf);

这两个函数向bufferevent的输出缓冲区写数据。调用bufferevent_write()函数,将以data为起始位置,长度为size的数据添加到输出缓冲区的末尾。bufferevent_write_buffer函数将移除buf的全部内容,并添加到输出缓冲区的末尾。两个函数在成功时均返回0,在失败时返回-1.

size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
    struct evbuffer *buf);

这两个函数从bufferevent的输入缓冲区中读数据。调用bufferevent_read()函数,将数据读取到data所在的内存中去,最多读取size字节,并返回实际读取的字节数。bufferevent_read_buffer()函数将读取输入缓冲区的全部内容,并将内容放置在buf的内存中,该函数在成功时返回0,失败时返回-1。

注意:调用bufferevent_read()函数时,必须确保data有足够的空间容纳size字节的数据。

#include <event2/bufferevent.h>
#include <event2/buffer.h>

#include <ctype.h>

void
read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
        /* This callback removes the data from bev's input buffer 128
           bytes at a time, uppercases it, and starts sending it
           back.

           (Watch out!  In practice, you shouldn't use toupper to implement
           a network protocol, unless you know for a fact that the current
           locale is the one you want to be using.)
         */

        char tmp[128];
        size_t n;
        int i;
        while (1) {
                n = bufferevent_read(bev, tmp, sizeof(tmp));
                if (n <= 0)
                        break; /* No more data. */
                for (i=0; i<n; ++i)
                        tmp[i] = toupper(tmp[i]);
                bufferevent_write(bev, tmp, n);
        }
}

struct proxy_info {
        struct bufferevent *other_bev;
};
void
read_callback_proxy(struct bufferevent *bev, void *ctx)
{
        /* You might use a function like this if you're implementing
           a simple proxy: it will take data from one connection (on
           bev), and write it to another, copying as little as
           possible. */
        struct proxy_info *inf = ctx;

        bufferevent_read_buffer(bev,
            bufferevent_get_output(inf->other_bev));
}

struct count {
        unsigned long last_fib[2];
};

void
write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
        /* Here's a callback that adds some Fibonacci numbers to the
           output buffer of bev.  It stops once we have added 1k of
           data; once this data is drained, we'll add more. */
        struct count *c = ctx;

        struct evbuffer *tmp = evbuffer_new();
        while (evbuffer_get_length(tmp) < 1024) {
                 unsigned long next = c->last_fib[0] + c->last_fib[1];
                 c->last_fib[0] = c->last_fib[1];
                 c->last_fib[1] = next;

                 evbuffer_add_printf(tmp, "%lu", next);
        }

        /* Now we add the whole contents of tmp to bev. */
        bufferevent_write_buffer(bev, tmp);

        /* We don't need tmp any longer. */
        evbuffer_free(tmp);
}

读写超时

与其它事件一样,一定的时间过去之后,如果bufferevent没有任何数据成功写入或者读出,则可以调用超时。

void bufferevent_set_timeouts(struct bufferevent *bufev,
    const struct timeval *timeout_read, const struct timeval *timeout_write);

将超时设置为NULL,意味着将其删除。

当bufferevent等待了timeout_read秒后仍然没有数据可以读取时,将触发读超时;当bufferevent等待了timeout_write秒后,仍然无法写入数据,则将触发写入超时。

注意:超时仅仅在bufferevent可读、可写的时候才会计数。换句话说,当bufferevnet禁用读操作的时候或者输入缓冲区到达高水位的时候,读取超时将不会启用;类似地,写操作禁用的时候或者没有数据可写的时候,写超时不会启用。

当发生读或写超时时,相应的读或写操作在bufferevent上被禁用。然后使用 BEV_EVENT_TIMEOUT|BEV_EVENT_READINGBEV_EVENT_TIMEOUT|BEV_EVENT_WRITING 调用事件回调。

刷新

int bufferevent_flush(struct bufferevent *bufev,
    short iotype, enum bufferevent_flush_mode state);

刷新bufferevent,即告诉bufferevent强制尽可能多的从底层传输多读取或者多写入数据,忽略其它限制写入的因素。详细细节取决于type字段。

iotype 参数应为 EV_READEV_WRITEEV_READ|EV_WRITE 以指示是否应处理读取、写入或两者的字节。状态参数可以是 BEV_NORMALBEV_FLUSHBEV_FINISHED 之一。 BEV_FINISHED 表示应该告诉对方不再发送数据; BEV_NORMALBEV_FLUSH 之间的区别取决于缓冲事件的类型。

该函数在失败时返回-1,没有数据可以刷新时返回0,刷新了部分数据时返回1。

特定的bufferevent操作

这些函数并不是在所有的bufferevent类型上均有效。

int bufferevent_priority_set(struct bufferevent *bufev, int pri);
int bufferevent_get_priority(struct bufferevent *bufev);

该函数将用于实现 bufev 的事件的优先级调整为 pri。有关优先级的更多信息,请参阅 event_priority_set()

该函数在成功时返回0,失败时返回-1,仅仅在基于套接字的bufferevent上有效。

int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);

这些函数设置或者获取基于fd事件的文件描述符,只有基于套接字的bufferevent支持setfd()。这两个函数在失败时返回-1,bufferevent_setfd()函数在成功时返回0。

struct event_base *bufferevent_get_base(struct bufferevent *bev);

返回bufferevent的event_base。

struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev);

如果存在的话,此函数返回bufev的底层传输的bufferevent。

手动对bufferevent进行上锁和解锁

与evbuffer一样,有时你想确保对bufferevent的操作为原子的。libevent公开了以下两个接口,可用于手动上锁和解锁。

void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);

注意:如果当创建bufferevent的时候没有指定BEV_OPT_THREADSAFE,或者libevent没有启用线程支持,那么上锁将没有任何作用。

当调用此函数锁住bufferevent的时候,将同时锁住evbuffer。这些函数是递归的:对一个已经持有锁的bufferevent进行上锁是安全的,但是对应的要调用同等次数的解锁函数。

标签: libevent

添加新评论