datetime:2022/12/30 11:00
author:nzb
I/O复用-select
1、导语
多进程/线程并发模型,为每个socket分配一个进程/线程。
IO多路复用:通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
应用:适用于针对大量的io请求的情况,对于服务器必须在同时处理来自客户端的大量的io操作的时候,就非常适合
与多进程和多线程技术相比,I/O多路复用技术的最大优势就是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
目前支持I/O多路复用的系统调用有select
, pselect
, poll
, epoll
,
但他们本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select
, pselect
, poll
, epoll
都是属于IO设计模式Reactor的IO策略。
2、IO多路复用使用场景
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:
- 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。
- 当一个客户同时处理多个套接口时,这种情况是可能的,但很少出现。
- 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
- 如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
- 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
3、select
3.1、select基本原理
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
3.2、select基本流程
3.3、select函数原型
该函数准许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒自己。函数原型如下:
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
// 返回值:就绪描述符的数目,超时返回0,出错返回-1
// 函数参数介绍如下:
//(1)第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1)描述字0、1、2...(maxfdp1-1)均将被测试(文件描述符是从0开始的)。
//(2)中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。
// writeset的write会阻塞,但是阻塞时间是非常短的,所以一般需要监听,设置为空
struct fd_set; //可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:
void FD_ZERO(fd_set *fdset); //清空集合
void FD_SET(int fd, fd_set *fdset); //将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset); //将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset); // 检查集合中指定的文件描述符是否可以读写
//(3)timeout指定等待的时间,告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。
struct timeval {
long tv_sec; //seconds
long tv_usec; //microseconds
};
/*
这个参数有三种可能:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
*/
位图Bitmap的原理
3.4、select优点
- 跨平台。(几乎所有的平台都支持)
- 时间精度高。(ns级别)
3.6、select缺点
- 最大限制:单个进程能够监视的文件描述符的数量存在最大限制。(基于数组存储的赶脚)一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。它由FD_SETSIZE设置,32位机默认是1024个。64位机默认是2048.
- 时间复杂度: 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低,时间复杂度O(n)。 当套接字比较多的时候,每次select()
都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。
它仅仅知道有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度 ,同时处理的流越多,无差别轮询时间就越长。 - 内存拷贝:需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
3.7、 Select的超时机制
- int maxfdp 是指集合中所有描述符的最大值加1
- fd_set *readfds 监视是否有新的socket连接,或现有的描述符是否有数据可读。
- fd_set *writefds 监视是否可以向描述符中写入数据,只要缓存没满,所监视的描述符都可以写,select立即返回。
- fd_set *exceptfds 监视描述符中的异常,从未使用过
- struct timeval *timeout 超时机制。
3.8、select模型会丢失事件和数据吗?
答:不会。select采用水平触发的方式,如果报告fd后事件没有被处理或者数据没有被完全读取,那么下次select时会再次报告该id,也就是说select不会丢失事件和数据。
3.9、select的其它用途
在Unix(Linux)世界里,一切皆文件,文件就是一串二进制流,不管socket、管道、终端、设备等都是文件,一切都是流,在信息交换的过程中, 都是对这些流进行数据的收发操作,简称为I/O操作(input and output), 往流中读出数据,系统调用read,写入数据,系统调用write。
select是I/O复用函数,除了用于网络通信,还可以用于文件、管道、终端、设备等操作,但开发场景比较少。
3.7、示例代码
tcpselect.cpp
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/fcntl.h>
// 初始化服务端的监听端口。
int initserver(int port);
int main(int argc, char *argv[]) {
if (argc != 2) {
printf("usage: ./tcpselect port\n");
return -1;
}
// 初始化服务端用于监听的socket。
int listensock = initserver(atoi(argv[1]));
printf("listensock=%d\n", listensock);
if (listensock < 0) {
printf("initserver() failed.\n");
return -1;
}
fd_set readfdset; // 读事件的集合,包括监听socket和客户端连接上来的socket。
int maxfd; // readfdset中socket的最大值。
// 初始化结构体,把listensock添加到集合中。
FD_ZERO(&readfdset);
FD_SET(listensock, &readfdset);
maxfd = listensock;
while (1) {
// 调用select函数时,会改变socket集合的内容,所以要把socket集合保存下来,传一个临时的给select。
fd_set tmpfdset = readfdset;
int infds = select(maxfd + 1, &tmpfdset, NULL, NULL, NULL);
// printf("select infds=%d\n",infds);
// 返回失败。
if (infds < 0) {
printf("select() failed.\n");
perror("select()");
break;
}
// 超时,在本程序中,select函数最后一个参数为空,不存在超时的情况,但以下代码还是留着。
if (infds == 0) {
printf("select() timeout.\n");
continue;
}
// 检查有事情发生的socket,包括监听和客户端连接的socket。
// 这里是客户端的socket事件,每次都要遍历整个集合,因为可能有多个socket有事件。
for (int eventfd = 0; eventfd <= maxfd; eventfd++) {
if (FD_ISSET(eventfd, &tmpfdset) <= 0) continue;
if (eventfd == listensock) {
// 如果发生事件的是listensock,表示有新的客户端连上来。
struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientsock = accept(listensock, (struct sockaddr *) &client, &len);
if (clientsock < 0) {
printf("accept() failed.\n");
continue;
}
printf("client(socket=%d) connected ok.\n", clientsock);
// 把新的客户端socket加入集合。
FD_SET(clientsock, &readfdset);
if (maxfd < clientsock) maxfd = clientsock;
continue;
} else {
// 客户端有数据过来或客户端的socket连接被断开。
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
// 读取客户端的数据。
ssize_t isize = read(eventfd, buffer, sizeof(buffer));
// 发生了错误或socket被对方关闭。
if (isize <= 0) {
printf("client(eventfd=%d) disconnected.\n", eventfd);
close(eventfd); // 关闭客户端的socket。
FD_CLR(eventfd, &readfdset); // 从集合中移去客户端的socket。
// 重新计算maxfd的值,注意,只有当eventfd==maxfd时才需要计算。
if (eventfd == maxfd) {
for (int ii = maxfd; ii > 0; ii--) {
if (FD_ISSET(ii, &readfdset)) {
maxfd = ii;
break;
}
}
printf("maxfd=%d\n", maxfd);
}
continue;
}
printf("recv(eventfd=%d,size=%d):%s\n", eventfd, isize, buffer);
// 把收到的报文发回给客户端。
write(eventfd, buffer, strlen(buffer));
}
}
}
return 0;
}
// 初始化服务端的监听端口。
int initserver(int port) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
printf("socket() failed.\n");
return -1;
}
// Linux如下
int opt = 1;
unsigned int len = sizeof(opt);
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, len);
setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &opt, len);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(sock, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
printf("bind() failed.\n");
close(sock);
return -1;
}
if (listen(sock, 5) != 0) {
printf("listen() failed.\n");
close(sock);
return -1;
}
return sock;
}
client.cpp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage:./tcpclient ip port\n");
return -1;
}
int sockfd;
struct sockaddr_in servaddr;
char buf[1024];
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("socket() failed.\n");
return -1;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(atoi(argv[2]));
servaddr.sin_addr.s_addr = inet_addr(argv[1]);
if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) != 0) {
printf("connect(%s:%s) failed.\n", argv[1], argv[2]);
close(sockfd);
return -1;
}
printf("connect ok.\n");
for (int ii = 0; ii < 10000; ii++) {
// 从命令行输入内容。
memset(buf, 0, sizeof(buf));
printf("please input:");
scanf("%s", buf);
// sprintf(buf,"1111111111111111111111ii=%08d",ii);
if (write(sockfd, buf, strlen(buf)) <= 0) {
printf("write() failed.\n");
close(sockfd);
return -1;
}
memset(buf, 0, sizeof(buf));
if (read(sockfd, buf, sizeof(buf)) <= 0) {
printf("read() failed.\n");
close(sockfd);
return -1;
}
printf("recv:%s\n", buf);
// close(sockfd); break;
}
}