目录

epoll 惊群效应实测

惊群效应

惊群简单来说就是多个进程或者线程在等待同一个事件,当事件发生时,所有线程和进程都会被内核唤醒。唤醒后通常只有一个进程获得了该事件并进行处理,其他进程发现获取事件失败后又继续进入了等待状态,在一定程度上降低了系统性能。
常见的惊群问题有两种:
Accept惊群问题,多个accept的进程同时被唤醒,该问题已于 linux2.6 解决,本文不再讨论
Epoll惊群问题,虽然accept惊群问题已被内核解决,但epoll仍旧会触发fd的可读状态,触发读事件

epoll 惊群测试

测试思路

  • 主进程创建socket
  • 从进程通过把该socket注册为epoll的可读事件,需要在fork之后创建epoll,否则多个进程会公用同一个epoll,进程不能识别其他进程产生的fd
  • 注册listen fd的可读状态,并触发accept
  • 观察输出信息
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>



const int MAX_PROC_NUM = 4;
const int MAX_EVENTS = 128;
const int PORT = 8081;
const int BUF_SIZE = 1024;
const int CLIENT_SIZE = 128;
int g_pids[MAX_PROC_NUM] = {0};

void sig_handler(int signo) {
    int i;
    for (i = 0; i &lt; MAX_PROC_NUM; i++) {
        kill(g_pids[i], SIGKILL);
    }
}

int child_procedure(int listenfd) {
    struct epoll_event ev;
    struct epoll_event events[MAX_EVENTS];
    int epfd = 0;
    int pid = 0;
    char buf[BUF_SIZE];
    int cnt = 0;
    int i = 0;

pid = getpid();

epfd = epoll_create(CLIENT_SIZE + 1);
if (0 == epfd) {
    printf("Create epoll failed\n");
    return -1;
}

ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listenfd;

if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &amp;ev) &lt; 0) {
    printf("Add ev failed\n");
    return -1;
}   

printf("Epoll init finished, process pid: %d\n", pid);

while (1) {
    cnt = epoll_wait(epfd, events, MAX_EVENTS, 0);
    if (cnt &lt;= 0) {
        continue;
    }

    for (i = 0; i &lt; cnt; i++) {
        if (events[i].data.fd == listenfd) {
            // 新连接请求
            int newfd;
            printf("Process %d receive a connection request\n", pid);
            newfd = accept(listenfd, NULL, 0);
            if(newfd &lt;=0) {
                printf("Process %d accept failed\n", pid);
                continue;
            }
            fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFD, 0)|O_NONBLOCK);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = newfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, newfd, &amp;ev);
    
        } else {
            int n = read(events[i].data.fd, buf, 1024);
            printf("Process %d receive a msg, length %d\n", pid, n);
            if (n != 0) {
                write(events[i].data.fd, buf, n);
            }
            close(events[i].data.fd);
            epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
        }
    
    }

}
return 0;

}

int main()
{
    int listenfd = 0;
    int cnt = 0;
    int i = 0;
    struct sockaddr_in servaddr;

    // signal(SIGINT, sig_handler);
    // signal(SIGKILL, sig_handler);
    
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
    
    fcntl(listenfd, F_SETFL, fcntl(listenfd, F_GETFD, 0)|O_NONBLOCK);
    
    if (bind(listenfd, (struct sockaddr *) &amp;servaddr, sizeof(struct sockaddr)) == -1) {
        printf("bind error\n");
        return -1;
    }
    
    if (listen(listenfd, CLIENT_SIZE) == -1) {
        printf("Listen failed\n");
        return -1;
    }
    
    for (i = 0; i &lt; MAX_PROC_NUM; i++) {
        g_pids[i] = fork();
        if (0 == g_pids[i]) {
            // 子进程
            break;
        }
    }
    
    if (i == MAX_PROC_NUM) {
        // 注册信号,父进程退出,子进程一起kill掉
        signal(SIGINT, sig_handler);
        signal(SIGKILL, sig_handler);
    
        // 父进程,阻塞
        while(1) {
            sleep(100);
        }
    } else {
        // 子进程,进入子进程流程
        return child_procedure(listenfd);
    }
    
    return 0;

}

测试结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Epoll init finished, process pid: 20628
Epoll init finished, process pid: 20629
Epoll init finished, process pid: 20630
Epoll init finished, process pid: 20631
Process 20630 receive a connection request
Process 20631 receive a connection request
Process 20628 receive a connection request
Process 20629 receive a connection request
Process 20630 receive a msg, length 169
Process 20631 accept failed
Process 20628 accept failed
Process 20629 accept failed

惊群问题改进

同一个listen fd添加到多个epoll中,其中一个典型应用就是nginx,nginx增加了一把锁,同一时刻只有一个进程在wait状态,这样就保证了同一个可读事件不会触发给多个进程,为了减少加锁的时间,采用先将事件放入队列,处理完accept立即解锁,收发包并不占用这个全局锁,这把锁不单单用于解决惊群问题,还是进程间负载均衡重要的一环
详细可参考:https://blog.csdn.net/initphp/article/details/52266844