套接字

重要

在网络 IO 中,一定不要多线程使用同一个 fd。多线程如果使用同一个 fd 在读写之前都需要加排它锁,这样实际上和单线程也没什么区别了。之所以如此,是因为其它线程可能导致 fd 进入不可读写状态

大端序和小端序

大端序就是高位字节储存到低地址处,小端则相反。由于 PC 大多使用小端序,因此 小端序也被称为主机字节序 。但是在进行网络传输时统一使用大端序,因此 大端序也被成为网络字节序

大端序的判别很简单:

bool isBigEnd() {
   char b = (char)0x1234;
   return b == 0x12;
}

大小端之间的转换:

uint64_t ByteReverse(uint64_t data) {
   uint8_t* i = (uint8_t*)&data;
   uint8_t* j = (uint8_t*)&data + sizeof(data) - 1;
   for(; i < j; ++i, --j) swap(*i, *j);
   return data;
}

这样,对于:

uint64_t a = ByteReverse(0x1234567891234567);
cout << hex << a;

的结果为:

6745239178563412

备注

  • Java 虚拟机采用大端序

Linux 提供了一组函数用来在网络字节序和主机字节序之间进行转换:

uint32_t ntohl (uint32_t __netlong);
uint16_t ntohs (uint16_t __netshort);
uint32_t htonl (uint32_t __hostlong);
uint16_t htons (uint16_t __hostshort);

函数的名字含义为: host to network long、host to network short 等。

长整型一般用来转换 IP 地址,短整型一般用来转换端口号

手动进行大小端转换也比较简单,分为两步:

  1. 字节反转

  2. 数组反转

地址格式

一个完整的地址格式由 IP 地址加上端口号组成,这种形式唯一地标示了计算机网络中的一个进程。因此套接字实际上就是网络中的 IPC。

逻辑上,Unix 中表示地址格式的结构体为:

classDiagram sockaddr <|--sockaddr_in sockaddr <|--sockaddr_in6 class sockaddr { sa_family_t sa_family; char sa_data[]; } class sockaddr_in { sa_family_t sin_family; // 地址族 in_port_t sin_port; // 端口号 in_addr sin_addr; // IPv4 地址 } class sockaddr_in6 { sa_family_t sin6_family; in_port_t sin6_port; in6_addr sin6_addr; }

这只是逻辑上的关系,实际上由于 C 语言的限制,sockaddr_int 和 sockaddr_in6 需要使用强制类型转换转为 sockaddr。

in_addr_t 和 in6_addr_t 也是结构体,其内容物是一个名为 in_addr_t 和 in6_addr_t 的数据成员。

Unix 还提供了几组函数用来在二进制数据和点分十进制之间进行转换:

const char *inet_ntop(int af, const void *src,char *dst, socklen_t size);
 int inet_pton(int af, const char *src, void *dst);

这两个函数都同时支持 IPv4 和 IPv6

套接字描述符

套接字描述符代表了通信端点,可视为文件描述符的特化。因此,很多用与文件描述符的函数也可用于套接字描述符,套接字的定义为:

int socket(int domain, int type, int protocol);

参数 domain 用来确定通信特性,其参数为:

参数

描述

AF_INET

IPv4 因特网域

AF_INET6

IPv6 因特网域

AF_UNIX

UNIX 域

AF_UNSPEC

未指定

AF 的含义是 Address Family。根据实现,其参数可能有很多种,但是常用的就是这几种

type 确定了套接字的类型,其值为:

类型

描述

SOCK_DGRAM

UDP

SOCK_RAW

IP 协议的数据报接口

SOCK_SEQPACKET

基于报文的 TCP

SOCK_STREAM

TCP

绑定地址

在创建套接字文件描述符后,还需要将其与特定的地址关联起来(类似于将 fd 转换为 FILE*)。对于客户端而言,可以让操作系统选择一个任意的端口,但是服务器需要明确指出需要绑定的地址:

int bind(int sockfd, const sockaddr *addr, socklen_t addrlen);

监听地址

对于服务器而言,绑定地址后还需要对端口进行持续监听是否有连接进入,并决定是否接受连接:

int listen(int sockfd, int backlog);

backlog 用来提示连接队列的大小。其硬上限被储存到 SOMAXCONN 中

建立连接

对于面向连接的网络服务而言,在发送数据前必须先进行连接以建立一个可靠的网络链路

int connect(int sockfd, const sockaddr *addr, socklen_t addrlen);

要想连接成功,必须:

  • 目标服务是开启的

  • 服务器的等待队列未满

  • 服务器接受连接

在连接过程中,可能会产生一些瞬时错误,应用程序必须具备处理它的能力

重要

对于基于 BSD 的套接字实现(FreeBSD/MacOS)中,connect 一旦失败就没法继续用了。因此可移植的用法是每次 connect 失败后都关闭套接字并创建一个新的来用

接受连接

一旦服务器开始监听地址,并有连接进入,服务器就能够选择是否接受连接:

int accept(int sockfd, sockaddr * addr, socklen_t * addrlen);

accept 的返回值是代表客户端的套接字描述符。参数 addr 是输出参数,用来获取服务端的信息,如果不管兴趣可以设为 NULL。addrlen 代表了 addr 的长度

创建套接字

socker 中代表套接字的是结构体 sockaddr,但是为了使用方便,从中又特化出 sockaddr_in 用于 IPv4、sockaddr_in6 用于 IPv6:

struct sockaddr_in{
   sa_family_t sin_family; // 地址族
   u_int16_t sin_port; // 端口号
   struct in_addr sin_addr; // IPv4 地址
}

struct in_addr{
    u_int32_t s_addr;
}

其中,地址族为:

协议族

地址族

含义

PF_INET

AF_INET

IPv4

PF_INET6

AF_INET6

IPv6

地址族和协议族的值是相等的,因此可以相互替换

下面的函数可用于将点分十进制的地址转换为网络字节序整数表示的 IP 地址:

#include <arpa/inet.h>
int inet_pton(int af, const char* src, void* dst);

inet_pton 将 src 表示的 IP 地址转换到网络字节序,并将其储存到 dst 中。af 用来指定地址族

而函数 inet_ntop 则进行相反的转换

使用 listen 可以用来监听套接字。listen 有两个参数:第一个参数用来指定 fd,第二个参数用来指定最大的监听队列。当队列的长度超过指定的长度时,新的连接会被拒绝,同时客户端得到 ECONNREFUSED 信息

#include <spdlog/spdlog.h>

#include <thread>

extern "C" {
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
}

int main() {
   auto fd = socket(PF_INET, SOCK_STREAM, 0);
   if(fd == -1) {
      spdlog::critical("套接字创建失败");
      exit(-1);
   }
   sockaddr_in address;
   bzero(&address, sizeof(address));
   address.sin_family = AF_INET;
   inet_pton(AF_INET, "127.0.0.1", &address.sin_addr);
   address.sin_port = htons(9020);
   if(bind(fd, (sockaddr*)&address, sizeof(address)) == -1) {
      spdlog::critical("绑定端口失败");
      exit(-1);
   }
   if(listen(fd, 5) == -1) {
      spdlog::critical("监听失败");
      exit(-1);
   }
   while(true) { std::this_thread::yield(); }
   return 0;
}

接收连接

我们现在可以接收来自客户端的连接:

while(true) {
   sockaddr_in client;
   socklen_t   clientLen = sizeof(client);
   int         connfd    = accept(fd, (sockaddr*)&client, &clientLen);
   if(connfd == -1) {
      spdlog::error("连接客户端套接字失败");
      continue;
   }
   char* remote     = new char[INET_ADDRSTRLEN];
   auto  remotePtr  = std::make_shared<char*>(remote);
   auto  clientIp   = inet_ntop(AF_INET, &client.sin_addr, remote, INET_ADDRSTRLEN);
   auto  clientPort = ntohs(client.sin_port);
   std::cout << clientIp << ":" << clientPort << std::endl;
   close(connfd);
   std::this_thread::yield();
}

Linux 每次 fork 都会导致父进程中打开的套接字的引用计数加一,因此必须在父子进程中都调用 close 才能将连接关闭。如果需要立即关闭,则可以使用 shutdown 系统调用:

#include <sys/socket.h>
int shutdown(int sockfd, int howto);

第二个参数指定了 shutdown 的行为:

可选值

含义

SHUT_RD

关闭读端

SHUT_WR

关闭写端

SHUT_RDWR

关闭写端和读端

接收和发送数据

将数据接收再发送回去,就完成了一个 echo 服务器:

while(true) {
   sockaddr_in client;
   socklen_t   clientLen = sizeof(client);
   int         connfd    = accept(fd, (sockaddr*)&client, &clientLen);
   if(connfd == -1) {
      spdlog::error("连接客户端套接字失败");
      continue;
   }
   char buffer[1024];
   memset(buffer, '\0', 1024);
   int recvLen = recv(connfd, buffer, 1023, 0);
   std::cout << buffer << std::endl;
   send(connfd, buffer, recvLen, 0);
   close(connfd);
   std::this_thread::yield();
}

客户端

客户端与服务器前半部分相同,唯一不同的是客户端无需绑定到端口上,直接 connect 即可:

int main() {
   sockaddr_in serverAddress;
   bzero(&serverAddress, sizeof(serverAddress));
   serverAddress.sin_family = AF_INET;
   inet_pton(AF_INET, "127.0.0.1", &serverAddress.sin_addr);
   serverAddress.sin_port = htons(9020);
   int fd                 = socket(PF_INET, SOCK_STREAM, 0);
   if(fd == -1) {
      spdlog::critical("套接字创建失败");
      exit(-1);
   }
   if(connect(fd, (sockaddr*)&serverAddress, sizeof(serverAddress)) < 0){
      spdlog::critical("连接失败");
      exit(-1);
            }
   const char* oobData = "abc\n";
   const char* normalData = "123\b";
   send(fd, normalData, strlen(normalData), 0);
   send(fd, oobData, strlen(oobData), MSG_OOB);
   send(fd, normalData, strlen(normalData), 0);
   close(fd);
   return 0;
}

Select

select 是一种 IO 复用手段:

while(true) {
   sockaddr_in client;
   socklen_t   clientLen = sizeof(client);
   int         connfd    = accept(fd, (sockaddr*)&client, &clientLen);
   if(connfd == -1) {
      spdlog::error("连接客户端套接字失败");
      continue;
   }
   char buffer[1024];
   // 进入 select 阶段
   fd_set readFds;
   fd_set exceptionFds;
   FD_ZERO(&readFds);
   FD_ZERO(&exceptionFds);
   while(true) {
      memset(buffer, '\0', sizeof(buffer));
      FD_SET(connfd, &readFds);
      FD_SET(connfd, &exceptionFds);
      if(select(connfd + 1, &readFds, nullptr, &exceptionFds, nullptr) < 0) {
            spdlog::error("Select 失败");
            break;
      }
      // 解析可读事件
      if(FD_ISSET(connfd, &readFds)) {
            if(recv(connfd, buffer, sizeof(buffer) - 1, 0) <= 0) break;
            std::cout << "得到数据:" << buffer << std::endl;
      }
      if(FD_ISSET(connfd, &exceptionFds)) {
            if(recv(connfd, buffer, sizeof(buffer) - 1, MSG_OOB) <= 0) break;
            std::cout << "得到 OOB 数据:" << buffer << std::endl;
      }
   }
   close(connfd);
}

select 唯一能够处理的异常情况就是带外数据

epoll

epoll 可以支持的最大文件描述符数量为 /proc/sys/fs/epoll/max_user_watches

epoll 使用 RB-Tree 监听并维护所有文件描述符,但是还额外维护一个活跃链表。当某个监听的套接字活跃时,操作系统通过回调函数将活跃的套接字插入到此 list 中,因此 epoll_wait 只需要从内核态拷贝少量句柄到用户态即可

epoll相比于select并不是在所有情况下都要高效,例如在如果有少于1024个文件描述符监听,且大多数socket都是出于活跃繁忙的状态,这种情况下,select要比epoll更为高效,因为epoll会有更多次的系统调用,用户态和内核态会有更加频繁的切换。 1

epoll高效的本质在于:

  • 减少了用户态和内核态的文件句柄拷贝

  • 减少了对可读可写文件句柄的遍历

  • mmap 加速了内核与用户空间的信息传递,epoll是通过内核与用户mmap同一块内存,避免了无谓的内存拷贝

  • IO性能不会随着监听的文件描述的数量增长而下降

  • 使用红黑树存储fd,以及对应的回调函数,其插入,查找,删除的性能不错,相比于hash,不必预先分配很多的空间

epoll 使用流程可以简述为:

graph TD A[创建并监听套接字] --> B[调用 epoll_create 得到 ep 句柄] B --> C[调用 epoll_ctl 将套接字添加到 ep 列表中] C --> D[使用 epoll_wait 等待事件发生] D --> E[从 events 中攥取事件类型并进行处理] E --> D

epoll 具有 lt 和 et 两种模式:

#include <spdlog/spdlog.h>

#include <iostream>
#include <string>
#include <thread>

extern "C" {
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
}
#define BUFFER_SIZE      10
#define MAX_EVENT_NUMBER 1024
// 将文件描述符设置为非阻塞方式
int setNonBlocking(int fd) {
   int oldOption = fcntl(fd, F_GETFL);
   int newOption = oldOption | O_NONBLOCK;
   fcntl(fd, newOption);
   return oldOption;
}
// 将 fd 上的 EPOLLIN 注册到 efd 指向的 epoll 内核事件表中
void addFd(int epfd, int fd, bool enableEt) {
   epoll_event event;
   event.data.fd = fd;
   event.events  = EPOLLIN;
   if(enableEt) event.events |= EPOLLET;
   epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
   setNonBlocking(fd);
}

// LT 模式的工作流程
void ltMode(epoll_event* events, int number, int epfd, int listenfd) {
   char buf[BUFFER_SIZE];
   for(int i = 0; i < number; ++i) {
      int sockFd = events[i].data.fd;
      if(sockFd == listenfd) {
            sockaddr_in clientAddress;
            socklen_t   clientAddrLen = sizeof(clientAddress);
            int         connfd        = accept(listenfd, (sockaddr*)&clientAddress, &clientAddrLen);
            addFd(epfd, connfd, false);
      } else if(events[i].events & EPOLLIN) {
            // 读取数据
            std::cout << "触发事件" << std::endl;
            memset(buf, '\0', BUFFER_SIZE);
            if(recv(sockFd, buf, BUFFER_SIZE - 1, 0) <= 0) continue;
            std::cout << "得到数据:" << buf << std::endl;
      } else {
            std::cout << "没有事件发生" << std::endl;
      }
   }
}

void etMode(epoll_event* events, int number, int epfd, int listenfd) {
   char buf[BUFFER_SIZE];
   for(int i = 0; i < number; ++i) {
      int sockfd = events[i].data.fd;
      if(sockfd == listenfd) {
            sockaddr_in clientAddress;
            socklen_t   clientAddrLen = sizeof(clientAddress);
            int         connfd        = accept(listenfd, (sockaddr*)&clientAddress, &clientAddrLen);
            addFd(epfd, connfd, true);
      } else if(events[i].events & EPOLLIN) {
            // 这段代码不会被重复出发,因此一次就要独读出所有数据
            std::cout << "触发事件\n";
            while(true) {
               memset(buf, '\0', BUFFER_SIZE);
               int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
               if(ret < 0) {
                  if((errno == EAGAIN) || (errno == EWOULDBLOCK)) { std::cout << "数据读取完毕\n"; }
                  close(sockfd);
                  break;
               } else if(ret == 0)
                  close(sockfd);
               else
                  std::cout << "得到数据:" << buf << std::endl;
            }
      } else {
            std::cout << "没有触发事件" << std::endl;
      }
   }
}

int main() {
   sockaddr_in address;
   bzero(&address, sizeof(address));
   address.sin_family = AF_INET;
   inet_pton(AF_INET, "127.0.0.1", &address.sin_addr);
   address.sin_port = htons(9020);

   int fd = socket(PF_INET, SOCK_STREAM, 0);
   if(fd == -1) {
      spdlog::critical("套接字创建失败");
      exit(-1);
   }
   if(bind(fd, (sockaddr*)&address, sizeof(address)) == -1) {
      spdlog::critical("绑定端口失败");
      exit(-1);
   }
   if(listen(fd, 5) == -1) {
      spdlog::critical("监听失败");
      exit(-1);
   }

   epoll_event events[MAX_EVENT_NUMBER];
   int         epfd = epoll_create(5);
   if(epfd == -1) {
      spdlog::critical("创建 epoll 失败");
      exit(-1);
   }
   addFd(epfd, fd, true);
   while(true) {
      int ret = epoll_wait(epfd, events, MAX_EVENT_NUMBER, -1);
      if(ret < 0) {
            std::cout << "epoll 失败\n";
            break;
      }
      // ltMode(events, ret, epfd, fd);
      etMode(events, ret, epfd, fd);

   }
   close(fd);
   return 0;
}

也就是说边缘触发只触发一次,水平触发会一直触发

重要

ET 模式中的文件描述符应当是非阻塞的,否则读写操作会因为没有后续事件而一直处于阻塞状态

1

深入理解 Epoll