当前位置 博文首页 > modao:网络编程-I/O复用

    modao:网络编程-I/O复用

    作者:modao 时间:2021-01-28 01:23

    I/O模型

    Unix下可用的I/O模型有五种:

    • 阻塞式I/O
    • 非阻塞式I/O
    • I/O复用(select和poll、epoll)
    • 信号驱动式I/O(SIGIO)
    • 异步I/O(POSIX的aio_系列函数)

    详见Unix网络编程卷一第六章

    select()和poll()在Unix系统中存在时间长,主要优势在于可移植性,主要缺点在于当同时检查大量的文件描述符时性能拓展性不佳。

    epoll API的关键优势在于能让应用高效地检查大量的文件描述符,主要缺点是专属于Linux系统的API。

    I/O复用-select

    select()首次出现在BSD系统的套接字API中。

    select()系统调用的用途:在一段指定的时间内,监听用户感兴趣的文件描述符上的可读、可写和异常事件。

    系统调用select()会一直阻塞,直到一个或多个文件描述符集合成为就绪态。

    #include <sys/select.h>
    #include <sys/time.h>
    
    //若有就绪描述符则返回其数目,若超时则返回0,若出错则返回-1
    int select(int maxfdp1, fd_set *readset, fd_set *writeset, 
               fd_set *exceptset, const struct timeval *timeout);
    

    探究下fd_set的结构

    /*typesizes.h*/
    #define __FD_SETSIZE		1024
    
    /*select.h*/
    typedef long int __fd_mask;
    
    //long int类型共有多少bits
    #define __NFDBITS	(8 * (int) sizeof (__fd_mask))
    
    typedef struct
    {
        //long int型数组,数组大小 = 描述符最大数 / long int的位数
        //数组大小为 __FD_SETSIZE bits
        __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
    } fd_set;
    

    select()程序示例:

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <stdarg.h>
    
    static void usageError(const char* progName){
        fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName);
        fprintf(stderr, "    - means infinite timeout; \n");
        fprintf(stderr, "    r = monitor for read\n");
        fprintf(stderr, "    w = monitor for wirite\n\n");
        fprintf(stderr, "    e.g.: %s - 0rw 1w\n", progName);
        exit(1);
    }
    void cmdLineErr(const char *format, ...)
    {
        va_list argList;
    
        fflush(stdout);           /* Flush any pending stdout */
    
        fprintf(stderr, "Command-line usage error: ");
        va_start(argList, format);
        vfprintf(stderr, format, argList);
        va_end(argList);
    
        fflush(stderr);           /* In case stderr is not line-buffered */
        exit(EXIT_FAILURE);
    }
    
    int main(int argc, char* argv[]){
        fd_set readfds, writefds;
        int ready, nfds, fd, numRead, j;
        struct timeval timeout;
        struct timeval *pto;
        char buf[10];
    
        if(argc < 2 || strcmp(argv[1], "--help") == 0){
            usageError(argv[0]);
        }
    
        if(strcmp(argv[1], "-") == 0){
            pto = NULL;
        }
        else{
            pto = &timeout;
            timeout.tv_sec = strtol(argv[1], NULL, 0);
            timeout.tv_usec = 0;
        }
    
        nfds = 0;
        FD_ZERO(&readfds);
        FD_ZERO(&writefds);
    
        for(j = 2; j < argc; j++){
            numRead = sscanf(argv[j], "%d%2[rw]", &fd, buf);
            if(numRead != 2){
                usageError(argv[0]);
            }
            if(fd >= FD_SETSIZE){
                cmdLineErr("file descriptor exceeds limit (%d)\n", FD_SETSIZE);
            }
            if(fd >= nfds){
                nfds = fd + 1;
            }
            if(strchr(buf, 'r') != NULL){
                FD_SET(fd, &readfds);
            }
            if(strchr(buf, 'w') != NULL){
                FD_SET(fd, &writefds);
            }
        }
        ready = select(nfds, &readfds, &writefds, NULL, pto);
        if(ready == -1){
            printf("errExit(select)");
            exit(1);
        }
        printf("ready = %d\n", ready);
        for(fd = 0; fd < nfds; fd++){
            printf("%d: %s%s\n",fd, FD_ISSET(fd, &readfds) ? "r" : "", 
            FD_ISSET(fd, &writefds) ? "w" : "");
        }
        if(pto != NULL){
            printf("timeout after select(): %ld.%03ld\n",
                   (long) timeout.tv_sec, (long) timeout.tv_usec / 1000);
        }
        exit(0);
    }
    

    select处理正常数据和带外数据:

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    
    int main(int argc, char* argv[]){
        if(argc <= 2){
            printf("usage: %s ip_adress port_number\n", basename(argv[0]));
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi(argv[2]);
    
        int ret = 0;
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
    
        int listenfd = socket(PF_INET, SOCK_STREAM, 0);
        assert(listenfd >= 0);
        ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
        assert(ret != -1);
        ret = listen(listenfd, 5);
        assert(ret != -1);
    
        struct sockaddr_in client_address;
        socklen_t client_addrlength = sizeof(client_address);
        int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
        if(connfd < 0){
            printf("error is: %d\n", errno);
            close(listenfd);
        }
    
        char buf[1024];
        fd_set read_fds;
        fd_set exception_fds;
        FD_ZERO(&read_fds);
        FD_ZERO(&exception_fds);
    
        while(true){
            memset(buf, '\0', sizeof(buf));
            FD_SET(connfd, &read_fds);
            FD_SET(connfd, &exception_fds);
            ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
            if(ret < 0){
                printf("selection failure\n");
                break;
            }
            if(FD_ISSET(connfd, &read_fds)){
                ret = recv(connfd, buf, sizeof(buf)-1, 0);
                if(ret <= 0){
                    break;
                }
                printf("get %d bytes of normal data: %s\n", ret, buf);
            }
            else if(FD_ISSET(connfd, &exception_fds)){
                ret = recv(connfd, buf, sizeof(buf)-1, MSG_OOB);
                if(ret <= 0){
                    break;
                }
                printf("get %d bytes of oob data: %s\n", ret, buf);
            }
        }
        close(connfd);
        close(listenfd);
        return 0;
    }
    

    I/O复用-poll

    poll函数起源于SVR3,最初局限于流设备,SVR4取消了这种限制,允许poll工作在任何描述符上。

    poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息。

    #include <poll.h>
    
    struct pollfd{
        int		fd;
        short	events;	//指定要测试的条件
        short	revents;//返回描述符的状态
    }
    //若有就绪描述符返回其数目,超时返回0,出错返回-1
    int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
    

    select()同poll()返回正整数的区别:如果一个文件描述符在返回的集合中出现了不止一次,系统调用select()会将同一个文件描述符计数多次。而系统调用poll()返回的是就绪态文件描述符个数,且一个文件描述符只会统计一次,就算在相应的revents字段中设定了多个位掩码也是如此。

    poll示例程序:

    #include <time.h>
    #include <poll.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <stdarg.h>
    
    static void usageError(const char* progName){
        fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName);
        fprintf(stderr, "    - means infinite timeout; \n");
        fprintf(stderr, "    r = monitor for read\n");
        fprintf(stderr, "    w = monitor for wirite\n\n");
        fprintf(stderr, "    e.g.: %s - 0rw 1w\n", progName);
        exit(1);
    }
    
    int main(int argc, char* argv[]){
        int numPipes, j, ready, randPipe, numWrites;
        int (*pfds)[2];//指向数组的指针
        struct pollfd *pollFd;
    
        if(argc < 2 || strcmp(argv[1], "--help") == 0){
            printf("%s num-pipes [num-writes]\n", argv[0]);
            exit(1);
        }
    
        numPipes = strtol(argv[1], NULL, 10);
    
        pfds = (int (*)[2])calloc(numPipes, sizeof(int [2]));
        if(pfds == NULL){
            printf("error malloc");
            exit(1);
        }
        pollFd = (pollfd*)calloc(numPipes, sizeof(struct pollfd));
        if(pollFd == NULL){
            printf("error malloc");
            exit(1);
        }
    
        for(j = 0; j < numPipes; j++){
            if(pipe(pfds[j]) == -1){
                printf("error pipe %d", j);
                exit(1);
            }
        }
    
        numWrites = (argc > 2) ? strtol(argv[2], NULL, 10) : 1;
        srandom((int)time(NULL));
        for(j = 0; j < numWrites; j++){
            randPipe = random() % numPipes;
            printf("Writing to fd: %3d (read fd: %3d)\n",
                    pfds[randPipe][1], pfds[randPipe][0]);
            if (write(pfds[randPipe][1], "a", 1) == -1){
                printf("write %d", pfds[randPipe][1]);
                exit(1);
            }
        }
    
        for(j = 0; j < numPipes; j++){
            pollFd[j].fd = pfds[j][0];
            pollFd[j].events = POLLIN;
        }
    
        ready = poll(pollFd, numPipes, -1);
        if(ready == -1){
            printf("poll error");
            exit(1);
        }
    
        printf("poll() returned: %d\n", ready);
    
        for(j = 0; j < numPipes; j++){
            if(pollFd[j].revents & POLLIN){
                printf("Readable: %d %3d\n", j, pollFd[j].fd);
            }
        }
        return 0;
    }
    

    I/O复用-epoll

    epoll API由三组系统调用组成;

    • epoll_create()创建一个epoll实例
    • epoll_ctl()操作同epoll实例相关联的兴趣列表
    • epoll_wait()返回与epoll相关联的就绪列表中的成员

    epoll实例:epoll API的核心数据结构,和一个打开的文件描述符相关联。这个文件描述符不用来做IO操作,相反它是内核数据结构的句柄,这些内核数据结构实现了两个目的:

    • 记录兴趣列表
    • 维护就绪列表

    epoll_create

    #include <sys/epoll.h>
    int epoll_create(int size);
    

    参数size指定我们想要通过epoll实例来检查的描述符个数,不是上限,只是告知内核应该如何为内部数据结构划分初始大小。

    函数返回epoll实例的文件描述符,该文件描述符不需要时需要close()。

    当所有与epoll实例相关的文件描述符都被关闭时,实例被销毁,相关资源释放。(多个文件描述符可能引用到相同的epoll实例,这是由于调用了fork()或dup()这样的类似函数所致)。

    linux2.6.8版以来,size参数被忽略不用。

    linux2.6.27以来,Linux支持一个新的系统调用epoll_create1():

    • 去掉了无用的参数size
    • 增加了一个可用来修改系统调用行为的flags参数
      • flag目前只支持一个标志:EPOLL_CLOEXEC,使内核在新的文件描述符上启动了执行即关闭(close-on-exec)标志(FD_CLOEXEC)

    epoll_ctl

    #include <sys/epoll.h>
    int epoll_ctl(int epfd, int op, int fd, struct epoll *ev);
    

    成功返回0,失败返回-1并设置errno。

    参数fd:指明修改兴趣列表中哪一个文件描述符的设定

    参数op:指定需要执行的操作

    • EPOLL_CTL_ADD:添加
    • EPOLL_CTL_MOD:修改
    • EPOLL_CTL_DEL:删除

    参数ev:

    struct epoll_event{
        uint32_t	 events;//epoll事件,位掩码
        epoll_data_t data;	//用户数据 
    }
    
    typedef union epoll_data{
        void		*ptr;
        int			fd;
        uint32_t	u32;
        uint64_t	u64;
    }epoll_data_t;
    
    • 结构体epoll_event在的events字段是一个位掩码,指定待检查的描述符fd上感兴趣的事件集合
    • data字段是一个联合体,当描述符fd成为就绪态时,联合体的成员可用来指定传回给调用进程的信息
      • 联合体成员不能一起使用,常用fd
      • 想要将文件描述符和用户数据关联起来,以实现快速的数据访问,只能使用其它手段,比如放弃使用fd,而在ptr指向的用户数据中包含fd

    max_user_watches上限

    每个注册到epoll实例上的文件描述符需要占用一小段不能被交换的内核内存空间,因此内核提供了一个接口用来定义每个用户可以注册到epoll实例上的文件描述符总数。

    这个上限值可以通过max_user_watches来查看和修改,max_user_watches是专属于Linux系统的/proc/sys/fd/epoll目录下的一个文件。默认上限值根据可用系统内存计算得出。

    epoll_wait

    #include <sys/epoll.h>
    int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
    

    成功返回就绪态的文件描述符的个数,失败返回-1并设置errno

    参数evlist指向的结构体数组中返回的是有关就绪态文件描述符的信息。数组evlist的空间由调用者负责申请,所包含的元素个数在参数maxevents中指定。

    在数组evlist中每个元素返回的都是单个就绪态文件描述符的信息:

    • events字段返回在该描述符上已经发生的事件掩码
    • data字段返回的是适用epoll_ctl()注册监听事件时在ev.data中所指定的值。data字段是唯一可获知同这个事件相关的文件描述符号的途径,因此,在调用epoll_ctl()时要么将ev.data.fd设为文件描述符号,要么将ev.data.ptr设为指向包含文件描述符号的结构体

    参数timeout用来确定epoll_wait()的阻塞行为:

    • timeout为-1,调用将一直阻塞,直到兴趣列表中的文件描述符上有事件发生,或者直到捕获到一个信号为止
    • timeout为0,执行一次非阻塞式的检查
    • timeout大于0,调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止

    在多线程程序中,可以在一个线程中使用epoll_ctl()将文件描述符添加到另一个线程中由epoll_wait()所监视的epoll实例的兴趣列表中去。这些对兴趣列表的修改将立刻得到处理,而epoll_wait()调用将返回有关新添加的文件描述符的就绪信息。

    epoll事件:除了有一个额外的前缀E外,大多数位掩码的名称同poll中对应的事件掩码名称相同。例外情况:

    • EPOLLET:epoll支持边缘触发
    • EPOLLONESHOT:只触发一次,触发完标记为非激活状态,需要使用EPOLL_CTL_MOD操作重新激活对这个文件描述符的检查

    epoll程序示例:

    #include <sys/epoll.h>
    #include <fcntl.h>
    #include <string.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <errno.h>
    
    
    #define MAX_BUF     1000
    #define MAX_EVENTS  5
    
    int main(int argc, char* argv[]){
        int epfd, ready, fd, s, j, numOpenFds;
        struct epoll_event ev;
        struct epoll_event evlist[MAX_EVENTS];
        char buf[MAX_BUF];
    
        if(argc < 2 || strcmp(argv[1], "--help")==0){
            printf("usage: %s file...\n", argv[0]);
            exit(1);
        }
    
        epfd = epoll_create(argc - 1);
        if(epfd == -1){
            printf("error epoll_create");
            exit(1);
        }
    
        for(j = 1; j < argc; j++){
            fd = open(argv[j], O_RDONLY);
            if(fd == -1){
                printf("error open");
                exit(1);
            }
            printf("Opened \"%s\" on fd %d\n", argv[j], fd);
    
            ev.events = EPOLLIN;
            ev.data.fd = fd;
            if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1){
                printf("error epoll_ctl");
                exit(1);
            }
        }
    
        numOpenFds = argc - 1;
    
        while(numOpenFds > 0){
            printf("About to epoll_wait()\n");
            ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
            if(ready == -1){
                if(errno == EINTR)continue;
                else{
                    printf("error epoll_wait");
                    exit(1);
                }
            }
            printf("Ready: %d\n", ready);
    
            for(j = 0; j < ready; j++){
                printf("  fd = %d; events: %s%s%s\n", evlist[j].data.fd,
                    (evlist[j].events & EPOLLIN)  ? "EPOLLIN ":"",
                    (evlist[j].events & EPOLLHUP) ? "EPOLLHUP":"",
                    (evlist[j].events & EPOLLERR) ? "EPOLLERR":"");
                if(evlist[j].events & EPOLLIN){
                    s = read(evlist[j].data.fd, buf, MAX_BUF);
                    if(s == -1){
                        printf("error read");
                    }
                    printf("    read %d bytes : %.*s",s,s,buf);
                }
                else if(evlist[j].events & (EPOLLHUP | EPOLLERR)){
                    printf("    closing fd %d\n", evlist[j].data.fd);
                    if(close(evlist[j].data.fd) == -1){
                        printf("error close");
                        exit(1);
                    }
                    numOpenFds--;
                }
            }
        }
        printf("All file descriptors closed; bye\n");
        exit(0);
    }
    

    ET模式比LT模式触发事件的次数更少:

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    int setnonblocking(int fd){
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
    
    void addfd(int epollfd, int fd, bool enable_et){
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN;
        if(enable_et){
            event.events |= EPOLLET;
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
        setnonblocking(fd);
    }
    
    void lt(epoll_event *events, int number, int epollfd, int listenfd){
        char buf[BUFFER_SIZE];
        for(int i = 0; i < number; i++){
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd){
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                addfd(epollfd, connfd, false);
            }
            else if(events[i].events & EPOLLIN){
                printf("event trigger once\n");
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE-1,0);
                if(ret <= 0){
                    close(sockfd);
                    continue;
                }
                printf("get %d bytes of content: %s\n", ret, buf);
            }
            else{
                printf("something else happened \n");
            }
        }
    }
    
    void et(epoll_event* events, int number, int epollfd, int listenfd){
        char buf[BUFFER_SIZE];
        for(int i = 0; i < number; i++){
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd){
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                addfd(epollfd, connfd, true);
            }
            else if(events[i].events & EPOLLIN){
                printf("event trigger once\n");
                while(true){
                    memset(buf, '\0',BUFFER_SIZE);
                    int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
                    if(ret < 0){
                        if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
                            printf("read later\n");
                            break;
                        }
                        close(sockfd);
                        break;
                    }
                    else if(ret == 0){
                        close(sockfd);
                    }
                    else{
                        printf("get %d bytes of content: %s\n",ret, buf);
                    }
                }
            }
            else{
                printf("something else happend \n");
            }
        }
    }
    
    int main(int argc, char* argv[]){
        if(argc <= 2){
            printf("usage: %s ip_address port_number\n", basename(argv[0]));
            return 1;
        }
        const char *ip = argv[1];
        int port  = atoi(argv[2]);
    
        int ret = 0;
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
    
        int listenfd = socket(PF_INET, SOCK_STREAM, 0);
        assert(listenfd >= 0);
    
        ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
        assert(ret != -1);
    
        ret = listen(listenfd, 5);
        assert(ret != -1);
    
        epoll_event events[MAX_EVENT_NUMBER];
        int epollfd = epoll_create(5);
        assert(epollfd != -1);
        addfd(epollfd, listenfd, true);
    
        while(true){
            int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
            if(ret < 0){
                printf("epoll failure\n");
                break;
            }
            lt(events, ret, epollfd, listenfd);
            //et(events, ret, epollfd, listenfd);
        }
        close(listenfd);
        return 0;
    }
    
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER    1024
    #define BUFFER_SIZE         1024
    struct fds{
        int epollfd;
        int sockfd;
    };
    
    int setnonblocking(int fd){
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
    
    void addfd(int epollfd, int fd, bool oneshot){
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET;
        if(oneshot){
            event.events |= EPOLLONESHOT;
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
        setnonblocking(fd);
    }
    
    void reset_oneshot(int epollfd, int fd){
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
    }
    
    void *worker(void *arg){
        int sockfd = ((fds*)arg)->sockfd;
        int epollfd = ((fds*)arg)->epollfd;
        printf("start new thread to receive data on fd: %d\n", sockfd);
        char buf[BUFFER_SIZE];
        memset(buf, '\0', BUFFER_SIZE);
        while(1){
            int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
            if(ret == 0){
                close(sockfd);
                printf("foreiner closed the connection\n");
                break;
            }
            else if(ret < 0){
                if(errno == EAGAIN){
                    reset_oneshot(epollfd, sockfd);
                    printf("read later\n");
                    break;
                }
            }
            else{
                printf("get content: %s\n", buf);
                sleep(5);
            }
        }
        printf("end thread receving data on fd : %d\n", sockfd);
    }
    
    int main(int argc, char* argv[]){
        if(argc < 2){
            printf("usage: %s ip_address port_number\n", basename(argv[0]));
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi(argv[2]);
    
        int ret = 0;
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
    
        int listenfd = socket(PF_INET, SOCK_STREAM, 0);
        assert(listenfd >= 0);
    
        ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
        assert(ret != -1);
    
        ret = listen(listenfd, 5);
        assert(ret != -1);
    
        epoll_event events[MAX_EVENT_NUMBER];
        int epollfd = epoll_create(5);
        assert(epollfd != -1);
    
        addfd(epollfd, listenfd, false);
    
        while(1){
            int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
            if(ret < 0){
                printf("epoll failure\n");
                break;
            }
            for(int i = 0; i < ret; i++){
                int sockfd = events[i].data.fd;
                if(sockfd == listenfd){
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof(client_address);
                    int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                    addfd(epollfd, connfd, true);
                }
                else if(events[i].events & EPOLLIN){
                    pthread_t thread;
                    fds fds_for_new_worker;
                    fds_for_new_worker.epollfd = epollfd;
                    fds_for_new_worker.sockfd = sockfd;
                    pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker);
                }
                else{
                    printf("something else happened \n");
                }
            }
        }
        close(listenfd);
        return 0;
    }xxxxxxxxxx c
    
    下一篇:没有了