大彬

大彬 查看完整档案

杭州编辑浙江大学  |  计算机 编辑云象区块链  |  区块链架构师 编辑 lessisbetter.site 编辑
编辑

公众号:Go语言充电站
二维码:https://segmentfault.com/img/...

个人动态

大彬 赞了文章 · 2020-08-17

Linux IO模式及 select、poll、epoll详解

注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

一 概念说明

在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

注:总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二 IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
clipboard.png

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
clipboard.png

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

clipboard.png

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:
clipboard.png

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IO Model的比较如图所示:
clipboard.png

通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

三 I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)

select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二 工作模式

 epoll对文件描述符的操作有两种模式:LT(level trigger)ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
  LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

1. LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

2. ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

3. 总结

假如有这样一个例子:
1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2KB的数据
3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
4. 然后我们读取了1KB的数据
5. 调用epoll_wait(2)......

LT模式:
如果是LT模式,那么在第5步调用epoll_wait(2)之后,仍然能受到通知。

ET模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。

当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,
读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:

while(rs){
  buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  if(buflen < 0){
    // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
    // 在这里就当作是该次事件已处理处.
    if(errno == EAGAIN){
        break;
    }
    else{
        return;
    }
  }
  else if(buflen == 0){
     // 这里表示对端的socket已正常关闭.
  }

 if(buflen == sizeof(buf){
      rs = 1;   // 需要再次读取
 }
 else{
      rs = 0;
 }
}

Linux中的EAGAIN含义

Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。

例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。

三 代码演示

下面是一段不完整的代码且格式不对,意在表述上面的过程,去掉了一些模板代码。

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

listenfd = socket_bind(IPADDRESS,PORT);

struct epoll_event events[EPOLLEVENTS];

//创建一个描述符
epollfd = epoll_create(FDSIZE);

//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);

//循环等待
for ( ; ; ){
    //该函数返回已经准备好的描述符事件数目
    ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
    //处理接收到的连接
    handle_events(epollfd,events,ret,listenfd,buf);
}

//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
     int i;
     int fd;
     //进行遍历;这里只要遍历已经准备好的io事件。num并不是当初epoll_create时的FDSIZE。
     for (i = 0;i < num;i++)
     {
         fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
         if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
         else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
         else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
     }
}

//添加事件
static void add_event(int epollfd,int fd,int state){
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd){
     int clifd;     
     struct sockaddr_in cliaddr;     
     socklen_t  cliaddrlen;     
     clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);     
     if (clifd == -1)         
     perror("accpet error:");     
     else {         
         printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);                       //添加一个客户描述符和事件         
         add_event(epollfd,clifd,EPOLLIN);     
     } 
}

//读处理
static void do_read(int epollfd,int fd,char *buf){
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)     {         
        perror("read error:");         
        close(fd); //记住close fd        
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }
    else if (nread == 0)     {         
        fprintf(stderr,"client close.\n");
        close(fd); //记住close fd       
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }     
    else {         
        printf("read message is : %s",buf);        
        //修改描述符对应的事件,由读改为写         
        modify_event(epollfd,fd,EPOLLOUT);     
    } 
}

//写处理
static void do_write(int epollfd,int fd,char *buf) {     
    int nwrite;     
    nwrite = write(fd,buf,strlen(buf));     
    if (nwrite == -1){         
        perror("write error:");        
        close(fd);   //记住close fd       
        delete_event(epollfd,fd,EPOLLOUT);  //删除监听    
    }else{
        modify_event(epollfd,fd,EPOLLIN); 
    }    
    memset(buf,0,MAXSIZE); 
}

//删除事件
static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

//修改事件
static void modify_event(int epollfd,int fd,int state){     
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

//注:另外一端我就省了

四 epoll总结

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

epoll的优点主要是一下几个方面:
1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

  1. IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

参考

用户空间与内核空间,进程上下文与中断上下文[总结]
进程切换
维基百科-文件描述符
Linux 中直接 I/O 机制的介绍
IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)
Linux中select poll和epoll的区别
IO多路复用之select总结
IO多路复用之poll总结
IO多路复用之epoll总结

查看原文

赞 597 收藏 1054 评论 64

大彬 发布了文章 · 2019-09-09

Go内存分配那些事,就这么简单!

原文链接:https://mp.weixin.qq.com/s/3g...

新老朋友好久不见,我是大彬,这篇文章准备了很久,不是在拖延,而是中间做了一些其他事情,耽搁了一些。

这篇文章主要介绍Go内存分配和Go内存管理,会轻微涉及内存申请和释放,以及Go垃圾回收。

从非常宏观的角度看,Go的内存管理就是下图这个样子,我们今天主要关注其中标红的部分。

Go内存管理

友情提醒:

文章有点长,建议先收藏,后阅读,绝对是学习内存管理的好资料。

本文基于go1.11.2,不同版本Go的内存管理可能存在差别,比如1.9与1.11的mheap定义就是差别比较大的,后续看源码的时候,请注意你的go版本,但无论你用哪个go版本,这都是一个优秀的资料,因为内存管理的思想和框架始终未变。

Go这门语言抛弃了C/C++中的开发者管理内存的方式:主动申请与主动释放,增加了逃逸分析和GC,将开发者从内存管理中释放出来,让开发者有更多的精力去关注软件设计,而不是底层的内存问题。这是Go语言成为高生产力语言的原因之一。

我们不需要精通内存的管理,因为它确实很复杂,但掌握内存的管理,可以让你写出更高质量的代码,另外,还能助你定位Bug。

这篇文章采用层层递进的方式,依次会介绍关于存储的基本知识,Go内存管理的“前辈”TCMalloc,然后是Go的内存管理和分配,最后是总结。这么做的目的是,希望各位能通过全局的认识和思考,拥有更好的编码思维和架构思维。

最后,这不是一篇源码分析文章,因为Go源码分析的文章已经有很多了,这些源码文章能够帮助你去学习具体的工程实践和奇淫巧计了,文章的末尾会推荐一些优秀文章,如果你对内存感兴趣,建议每一篇都去看一下,挑出自己喜欢的,多花时间研究下。

1. 存储基础知识回顾

这部分我们简单回顾一下计算机存储体系、虚拟内存、栈和堆,以及堆内存的管理,这部分内容对理解和掌握Go内存管理比较重要,建议忘记或不熟悉的朋友不要跳过。

存储金字塔

img

这幅图表达了计算机的存储体系,从上至下依次是:

  • CPU寄存器
  • Cache
  • 内存
  • 硬盘等辅助存储设备
  • 鼠标等外接设备

从上至下,访问速度越来越慢,访问时间越来越长。

你有没有思考过下面2个简单的问题,如果没有不妨想想:

  1. 如果CPU直接访问硬盘,CPU能充分利用吗?
  2. 如果CPU直接访问内存,CPU能充分利用吗?

CPU速度很快,但硬盘等持久存储很慢,如果CPU直接访问磁盘,磁盘可以拉低CPU的速度,机器整体性能就会低下,为了弥补这2个硬件之间的速率差异,所以在CPU和磁盘之间增加了比磁盘快很多的内存。

CPU和内存速率差异

然而,CPU跟内存的速率也不是相同的,从上图可以看到,CPU的速率提高的很快(摩尔定律),然而内存速率增长的很慢,虽然CPU的速率现在增加的很慢了,但是内存的速率也没增加多少,速率差距很大,从1980年开始CPU和内存速率差距在不断拉大,为了弥补这2个硬件之间的速率差异,所以在CPU跟内存之间增加了比内存更快的Cache,Cache是内存数据的缓存,可以降低CPU访问内存的时间。

不要以为有了Cache就万事大吉了,CPU的速率还在不断增大,Cache也在不断改变,从最初的1级,到后来的2级,到当代的3级Cache,(有兴趣看cache历史)

MBP的CPU和Cache信息

三级Cache分别是L1、L2、L3,它们的速率是三个不同的层级,L1速率最快,与CPU速率最接近,是RAM速率的100倍,L2速率就降到了RAM的25倍,L3的速率更靠近RAM的速率。

看到这了,你有没有Get到整个存储体系的分层设计自顶向下,速率越来越低,访问时间越来越长,从磁盘到CPU寄存器,上一层都可以看做是下一层的缓存。

看了分层设计,我们看一下内存,毕竟我们是介绍内存管理的文章。

虚拟内存

虚拟内存是当代操作系统必备的一项重要功能了,它向进程屏蔽了底层了RAM和磁盘,并向进程提供了远超物理内存大小的内存空间。我们看一下虚拟内存的分层设计

虚拟内存原理

上图展示了某进程访问数据,当Cache没有命中的时候,访问虚拟内存获取数据的过程。

访问内存,实际访问的是虚拟内存,虚拟内存通过页表查看,当前要访问的虚拟内存地址,是否已经加载到了物理内存,如果已经在物理内存,则取物理内存数据,如果没有对应的物理内存,则从磁盘加载数据到物理内存,并把物理内存地址和虚拟内存地址更新到页表。

有没有Get到:物理内存就是磁盘存储缓存层

另外,在没有虚拟内存的时代,物理内存对所有进程是共享的,多进程同时访问同一个物理内存存在并发访问问题。引入虚拟内存后,每个进程都要各自的虚拟内存,内存的并发访问问题的粒度从多进程级别,可以降低到多线程级别

栈和堆

我们现在从虚拟内存,再进一层,看虚拟内存中的栈和堆,也就是进程对内存的管理。

虚拟内存布局

上图展示了一个进程的虚拟内存划分,代码中使用的内存地址都是虚拟内存地址,而不是实际的物理内存地址。栈和堆只是虚拟内存上2块不同功能的内存区域:

  • 栈在高地址,从高地址向低地址增长。
  • 堆在低地址,从低地址向高地址增长。

栈和堆相比有这么几个好处

  1. 栈的内存管理简单,分配比堆上快。
  2. 栈的内存不需要回收,而堆需要,无论是主动free,还是被动的垃圾回收,这都需要花费额外的CPU。
  3. 栈上的内存有更好的局部性,堆上内存访问就不那么友好了,CPU访问的2块数据可能在不同的页上,CPU访问数据的时间可能就上去了。

堆内存管理

内存管理

我们再进一层,当我们说内存管理的时候,主要是指堆内存的管理,因为栈的内存管理不需要程序去操心。这小节看下堆内存管理干的是啥,如上图所示主要是3部分:分配内存块,回收内存块和组织内存块

在一个最简单的内存管理中,堆内存最初会是一个完整的大块,即未分配内存,当来申请的时候,就会从未分配内存,分割出一个小内存块(block),然后用链表把所有内存块连接起来。需要一些信息描述每个内存块的基本信息,比如大小(size)、是否使用中(used)和下一个内存块的地址(next),内存块实际数据存储在data中。

内存块链表

一个内存块包含了3类信息,如下图所示,元数据、用户数据和对齐字段,内存对齐是为了提高访问效率。下图申请5Byte内存的时候,就需要进行内存对齐。

内存块和对齐

释放内存实质是把使用的内存块从链表中取出来,然后标记为未使用,当分配内存块的时候,可以从未使用内存块中有先查找大小相近的内存块,如果找不到,再从未分配的内存中分配内存。

上面这个简单的设计中还没考虑内存碎片的问题,因为随着内存不断的申请和释放,内存上会存在大量的碎片,降低内存的使用率。为了解决内存碎片,可以将2个连续的未使用的内存块合并,减少碎片。

以上就是内存管理的基本思路,关于基本的内存管理,想了解更多,可以阅读这篇文章《Writing a Memory Allocator》,本节的3张图片也是来自这片文章。

2. TCMalloc

TCMalloc是Thread Cache Malloc的简称,是Go内存管理的起源,Go的内存管理是借鉴了TCMalloc,随着Go的迭代,Go的内存管理与TCMalloc不一致地方在不断扩大,但其主要思想、原理和概念都是和TCMalloc一致的,如果跳过TCMalloc直接去看Go的内存管理,也许你会似懂非懂。

掌握TCMalloc的理念,无需去关注过多的源码细节,就可以为掌握Go的内存管理打好基础,基础打好了,后面知识才扎实。

在Linux里,其实有不少的内存管理库,比如glibc的ptmalloc,FreeBSD的jemalloc,Google的tcmalloc等等,为何会出现这么多的内存管理库?本质都是在多线程编程下,追求更高内存管理效率:更快的分配是主要目的。

那如何更快的分配内存?

我们前面提到:

引入虚拟内存后,让内存的并发访问问题的粒度从多进程级别,降低到多线程级别。

这是更快分配内存的第一个层次

同一进程的所有线程共享相同的内存空间,他们申请内存时需要加锁,如果不加锁就存在同一块内存被2个线程同时访问的问题。

TCMalloc的做法是什么呢?为每个线程预分配一块缓存,线程申请小内存时,可以从缓存分配内存,这样有2个好处:

  1. 为线程预分配缓存需要进行1次系统调用,后续线程申请小内存时,从缓存分配,都是在用户态执行,没有系统调用,缩短了内存总体的分配和释放时间,这是快速分配内存的第二个层次
  2. 多个线程同时申请小内存时,从各自的缓存分配,访问的是不同的地址空间,无需加锁,把内存并发访问的粒度进一步降低了,这是快速分配内存的第三个层次

基本原理

下面就简单介绍下TCMalloc,细致程度够我们理解Go的内存管理即可。

声明:我没有研究过TCMalloc,以下介绍根据TCMalloc官方资料和其他博主资料总结而来,错误之处请朋友告知我。

TCMalloc概要图

结合上图,介绍TCMalloc的几个重要概念:

  1. Page:操作系统对内存管理以页为单位,TCMalloc也是这样,只不过TCMalloc里的Page大小与操作系统里的大小并不一定相等,而是倍数关系。《TCMalloc解密》里称x64下Page大小是8KB。
  2. Span:一组连续的Page被称为Span,比如可以有2个页大小的Span,也可以有16页大小的Span,Span比Page高一个层级,是为了方便管理一定大小的内存区域,Span是TCMalloc中内存管理的基本单位。
  3. ThreadCache:每个线程各自的Cache,一个Cache包含多个空闲内存块链表,每个链表连接的都是内存块,同一个链表上内存块的大小是相同的,也可以说按内存块大小,给内存块分了个类,这样可以根据申请的内存大小,快速从合适的链表选择空闲内存块。由于每个线程有自己的ThreadCache,所以ThreadCache访问是无锁的。
  4. CentralCache:是所有线程共享的缓存,也是保存的空闲内存块链表,链表的数量与ThreadCache中链表数量相同,当ThreadCache内存块不足时,可以从CentralCache取,当ThreadCache内存块多时,可以放回CentralCache。由于CentralCache是共享的,所以它的访问是要加锁的。
  5. PageHeap:PageHeap是堆内存的抽象,PageHeap存的也是若干链表,链表保存的是Span,当CentralCache没有内存的时,会从PageHeap取,把1个Span拆成若干内存块,添加到对应大小的链表中,当CentralCache内存多的时候,会放回PageHeap。如下图,分别是1页Page的Span链表,2页Page的Span链表等,最后是large span set,这个是用来保存中大对象的。毫无疑问,PageHeap也是要加锁的。

PageHeap

上文提到了小、中、大对象,Go内存管理中也有类似的概念,我们瞄一眼TCMalloc的定义:

  1. 小对象大小:0~256KB
  2. 中对象大小:257~1MB
  3. 大对象大小:>1MB

小对象的分配流程:ThreadCache -> CentralCache -> HeapPage,大部分时候,ThreadCache缓存都是足够的,不需要去访问CentralCache和HeapPage,无锁分配加无系统调用,分配效率是非常高的。

中对象分配流程:直接在PageHeap中选择适当的大小即可,128 Page的Span所保存的最大内存就是1MB。

大对象分配流程:从large span set选择合适数量的页面组成span,用来存储数据。

通过本节的介绍,你应当对TCMalloc主要思想有一定了解了,我建议再回顾一下上面的内容。

本节图片皆来自《TCMalloc解密》,图片版权归原作者所有。

精彩文章推荐

本文对于TCMalloc的介绍并不多,重要的是3个快速分配内存的层次,如果想了解更多,可阅读下面文章。

  1. TCMalloc

必读,通过这篇你能掌握TCMalloc的原理和性能,对掌握Go的内存管理有非常大的帮助,虽然如今Go的内存管理与TCMalloc已经相差很大,但是,这是Go内存管理的起源和“大道”,这篇文章顶看十几篇Go内存管理的文章。

  1. TCMalloc解密

可选异常详细,包含大量精美图片,看完得花小时级别,理解就需要更多时间了,看完这篇不需要看其他TCMalloc的文章了。

  1. TCMalloc介绍

可选,算是TCMalloc的文档的中文版,多数是从英文版翻译过来的,如果你英文不好,看看。

3. Go内存管理

前面铺垫了那么多,终于到了本文核心的地方。前面的铺垫不是不重要,相反它们很重要,Go语言内存管理源自前面的基础知识和内存管理思维,如果你跳过了前面的内容,建议你回头看一看,它可以帮助你更好的掌握Go内存管理。

前文提到Go内存管理源自TCMalloc,但它比TCMalloc还多了2件东西:逃逸分析和垃圾回收,这是2项提高生产力的绝佳武器。

这一大章节,我们先介绍Go内存管理和Go内存分配,最后涉及一点垃圾回收和内存释放。

Go内存管理的基本概念

前面计算机基础知识回顾,是一种自上而下,从宏观到微观的介绍方式,把目光引入到今天的主题。

Go内存管理的许多概念在TCMalloc中已经有了,含义是相同的,只是名字有一些变化。先给大家上一幅宏观的图,借助图一起来介绍。

Go内存管理

Page

与TCMalloc中的Page相同,x64下1个Page的大小是8KB。上图的最下方,1个浅蓝色的长方形代表1个Page。

Span

与TCMalloc中的Span相同,Span是内存管理的基本单位,代码中为mspan一组连续的Page组成1个Span,所以上图一组连续的浅蓝色长方形代表的是一组Page组成的1个Span,另外,1个淡紫色长方形为1个Span。

mcache

mcache与TCMalloc中的ThreadCache类似,mcache保存的是各种大小的Span,并按Span class分类,小对象直接从mcache分配内存,它起到了缓存的作用,并且可以无锁访问

但mcache与ThreadCache也有不同点,TCMalloc中是每个线程1个ThreadCache,Go中是每个P拥有1个mcache,因为在Go程序中,当前最多有GOMAXPROCS个线程在用户态运行,所以最多需要GOMAXPROCS个mcache就可以保证各线程对mcache的无锁访问,线程的运行又是与P绑定的,把mcache交给P刚刚好。

mcentral

mcentral与TCMalloc中的CentralCache类似,是所有线程共享的缓存,需要加锁访问,它按Span class对Span分类,串联成链表,当mcache的某个级别Span的内存被分配光时,它会向mcentral申请1个当前级别的Span。

但mcentral与CentralCache也有不同点,CentralCache是每个级别的Span有1个链表,mcache是每个级别的Span有2个链表,这和mcache申请内存有关,稍后我们再解释。

mheap

mheap与TCMalloc中的PageHeap类似,它是堆内存的抽象,把从OS申请出的内存页组织成Span,并保存起来。当mcentral的Span不够用时会向mheap申请,mheap的Span不够用时会向OS申请,向OS的内存申请是按页来的,然后把申请来的内存页生成Span组织起来,同样也是需要加锁访问的。

但mheap与PageHeap也有不同点:mheap把Span组织成了树结构,而不是链表,并且还是2棵树,然后把Span分配到heapArena进行管理,它包含地址映射和span是否包含指针等位图,这样做的主要原因是为了更高效的利用内存:分配、回收和再利用。

大小转换

除了以上内存块组织概念,还有几个重要的大小概念,一定要拿出来讲一下,不要忽视他们的重要性,他们是内存分配、组织和地址转换的基础。

Go内存大小转换

  1. object size:代码里简称size,指申请内存的对象大小。
  2. size class:代码里简称class,它是size的级别,相当于把size归类到一定大小的区间段,比如size[1,8]属于size class 1,size(8,16]属于size class 2。
  3. span class:指span的级别,但span class的大小与span的大小并没有正比关系。span class主要用来和size class做对应,1个size class对应2个span class,2个span class的span大小相同,只是功能不同,1个用来存放包含指针的对象,一个用来存放不包含指针的对象,不包含指针对象的Span就无需GC扫描了。
  4. num of page:代码里简称npage,代表Page的数量,其实就是Span包含的页数,用来分配内存。

在介绍这几个大小之间的换算前,我们得先看下图这个表,这个表决定了映射关系。

最上面2行是我手动加的,前3列分别是size class,object size和span size,根据这3列做size、size class和num of page之间的转换。

仔细看一遍这个表,再向下看转换是如何实现的。

Go内存分配表

在Go内存大小转换那幅图中已经标记各大小之间的转换,分别是数组:class_to_sizesize_to_class*class_to_allocnpages,这3个数组内容,就是跟上表的映射关系匹配的。比如class_to_size,从上表看class 1对应的保存对象大小为8,所以class_to_size[1]=8,span大小为8192Byte,即8KB,为1页,所以class_to_allocnpages[1]=1

Size转换

为何不使用函数计算各种转换,而是写成数组?

有1个很重要的原因:空间换时间。你如果仔细观察了,上表中的转换,并不能通过简单的公式进行转换,比如size和size class的关系,并不是正比的。这些数据是使用较复杂的公式计算出来的,公式在makesizeclass.go中,这其中存在指数运算与for循环,造成每次大小转换的时间复杂度为O(N*2^N)。另外,对一个程序而言,内存的申请和管理操作是很多的,如果不能快速完成,就是非常的低效。把以上大小转换写死到数组里,做到了把大小转换的时间复杂度直接降到O(1)。

其他转换表字段

第4列num of objects代表是当前size class级别的Span可以保存多少对象数量,第5列tail waste是span%obj计算的结果,因为span的大小并不一定是对象大小的整数倍。

最后一列max waste代表最大浪费的内存百分比,计算方法在printComment函数中:

func printComment(w io.Writer, classes []class) {
    fmt.Fprintf(w, "// %-5s  %-9s  %-10s  %-7s  %-10s  %-9s\n", "class", "bytes/obj", "bytes/span", "objects", "tail waste", "max waste")
    prevSize := 0
    for i, c := range classes {
        if i == 0 {
            continue
        }
        spanSize := c.npages * pageSize
        objects := spanSize / c.size
        tailWaste := spanSize - c.size*(spanSize/c.size)
        maxWaste := float64((c.size-prevSize-1)*objects+tailWaste) / float64(spanSize)
        prevSize = c.size
        fmt.Fprintf(w, "// %5d  %9d  %10d  %7d  %10d  %8.2f%%\n", i, c.size, spanSize, objects, tailWaste, 100*maxWaste)
    }
    fmt.Fprintf(w, "\n")
}

Span最浪费内存的场景是:Span内的每一个对象空间保存的对象,实际占用内存是前一个class中对象的大小加1,这样无法占用低一级的Span。一个对象空间未被占用的内存就被浪费了,所以一个Span内对象空间所浪费的内存为:所有对象空间浪费的内存之和+tail waste。

((c.size - (preSize+1)) * objects + tailWaste) / spanSize

Span内object分布情况

感谢foobar的提醒max waste的计算。

Go内存分配

涉及的概念已经讲完了,我们看下Go内存分配原理。

Go中的内存分类并不像TCMalloc那样分成小、中、大对象,但是它的小对象里又细分了一个Tiny对象,Tiny对象指大小在1Byte到16Byte之间并且不包含指针的对象。小对象和大对象只用大小划定,无其他区分。

Go内存对象分类

小对象是在mcache中分配的,而大对象是直接从mheap分配的,从小对象的内存分配看起。

小对象分配

Go内存管理

大小转换这一小节,我们介绍了转换表,size class从1到66共66个,代码中_NumSizeClasses=67代表了实际使用的size class数量,即67个,从0到67,size class 0实际并未使用到。

上文提到1个size class对应2个span class:

numSpanClasses = _NumSizeClasses * 2

numSpanClasses为span class的数量为134个,所以span class的下标是从0到133,所以上图中mcache标注了的span class是,span class 0span class 133。每1个span class都指向1个span,也就是mcache最多有134个span。

为对象寻找span

寻找span的流程如下:

  1. 计算对象所需内存大小size
  2. 根据size到size class映射,计算出所需的size class
  3. 根据size class和对象是否包含指针计算出span class
  4. 获取该span class指向的span。

以分配一个不包含指针的,大小为24Byte的对象为例。

根据映射表:

// class  bytes/obj  bytes/span  objects  tail waste  max waste
//     1          8        8192     1024           0     87.50%
//     2         16        8192      512           0     43.75%
//     3         32        8192      256           0     46.88%
//     4         48        8192      170          32     31.52%

size class 3,它的对象大小范围是(16,32]Byte,24Byte刚好在此区间,所以此对象的size class为3。

Size class到span class的计算如下:

// noscan为true代表对象不包含指针
func makeSpanClass(sizeclass uint8, noscan bool) spanClass {
    return spanClass(sizeclass<<1) | spanClass(bool2int(noscan))
}

所以,对应的span class为:

span class = 3 << 1 | 1 = 7

所以该对象需要的是span class 7指向的span。

从span分配对象空间

Span可以按对象大小切成很多份,这些都可以从映射表上计算出来,以size class 3对应的span为例,span大小是8KB,每个对象实际所占空间为32Byte,这个span就被分成了256块,可以根据span的起始地址计算出每个对象块的内存地址。

Span内对象

随着内存的分配,span中的对象内存块,有些被占用,有些未被占用,比如上图,整体代表1个span,蓝色块代表已被占用内存,绿色块代表未被占用内存。

当分配内存时,只要快速找到第一个可用的绿色块,并计算出内存地址即可,如果需要还可以对内存块数据清零。

span没有空间怎么分配对象

span内的所有内存块都被占用时,没有剩余空间继续分配对象,mcache会向mcentral申请1个span,mcache拿到span后继续分配对象。

mcentral向mcache提供span

mcentral和mcache一样,都是0~133这134个span class级别,但每个级别都保存了2个span list,即2个span链表:

  1. nonempty:这个链表里的span,所有span都至少有1个空闲的对象空间。这些span是mcache释放span时加入到该链表的。
  2. empty:这个链表里的span,所有的span都不确定里面是否有空闲的对象空间。当一个span交给mcache的时候,就会加入到empty链表。

这2个东西名称一直有点绕,建议直接把empty理解为没有对象空间就好了。

mcentral

实际代码中每1个span class对应1个mcentral,图里把所有mcentral抽象成1个整体了。

mcache向mcentral要span时,mcentral会先从nonempty搜索满足条件的span,如果每找到再从emtpy搜索满足条件的span,然后把找到的span交给mcache。

mheap的span管理

mheap里保存了2棵二叉排序树,按span的page数量进行排序:

  1. free:free中保存的span是空闲并且非垃圾回收的span。
  2. scav:scav中保存的是空闲并且已经垃圾回收的span。

如果是垃圾回收导致的span释放,span会被加入到scav,否则加入到free,比如刚从OS申请的的内存也组成的Span。

mheap

mheap中还有arenas,有一组heapArena组成,每一个heapArena都包含了连续的pagesPerArena个span,这个主要是为mheap管理span和垃圾回收服务。

mheap本身是一个全局变量,它其中的数据,也都是从OS直接申请来的内存,并不在mheap所管理的那部分内存内。

mcentral向mheap要span

mcentral向mcache提供span时,如果emtpy里也没有符合条件的span,mcentral会向mheap申请span。

mcentral需要向mheap提供需要的内存页数和span class级别,然后它优先从free中搜索可用的span,如果没有找到,会从scav中搜索可用的span,如果还没有找到,它会向OS申请内存,再重新搜索2棵树,必然能找到span。如果找到的span比需求的span大,则把span进行分割成2个span,其中1个刚好是需求大小,把剩下的span再加入到free中去,然后设置需求span的基本信息,然后交给mcentral。

mheap向OS申请内存

当mheap没有足够的内存时,mheap会向OS申请内存,把申请的内存页保存到span,然后把span插入到free树 。

在32位系统上,mheap还会预留一部分空间,当mheap没有空间时,先从预留空间申请,如果预留空间内存也没有了,才向OS申请。

大对象分配

大对象的分配比小对象省事多了,99%的流程与mcentral向mheap申请内存的相同,所以不重复介绍了,不同的一点在于mheap会记录一点大对象的统计信息,见mheap.alloc_m()

Go垃圾回收和内存释放

如果只申请和分配内存,内存终将枯竭,Go使用垃圾回收收集不再使用的span,调用mspan.scavenge()把span释放给OS(并非真释放,只是告诉OS这片内存的信息无用了,如果你需要的话,收回去好了),然后交给mheap,mheap对span进行span的合并,把合并后的span加入scav树中,等待再分配内存时,由mheap进行内存再分配,Go垃圾回收也是一个很强的主题,计划后面单独写一篇文章介绍。

现在我们关注一下,Go程序是怎么把内存释放给操作系统的?

释放内存的函数是sysUnused,它会被mspan.scavenge()调用:

// MAC下的实现
func sysUnused(v unsafe.Pointer, n uintptr) {
    // MADV_FREE_REUSABLE is like MADV_FREE except it also propagates
    // accounting information about the process to task_info.
    madvise(v, n, _MADV_FREE_REUSABLE)
}

注释说_MADV_FREE_REUSABLEMADV_FREE的功能类似,它的功能是给内核提供一个建议:这个内存地址区间的内存已经不再使用,可以回收。但内核是否回收,以及什么时候回收,这就是内核的事情了。如果内核真把这片内存回收了,当Go程序再使用这个地址时,内核会重新进行虚拟地址到物理地址的映射。所以在内存充足的情况下,内核也没有必要立刻回收内存。

4. Go栈内存

最后提一下栈内存。从一个宏观的角度看,内存管理不应当只有堆,也应当有栈。

每个goroutine都有自己的栈,栈的初始大小是2KB,100万的goroutine会占用2G,但goroutine的栈会在2KB不够用时自动扩容,当扩容为4KB的时候,百万goroutine会占用4GB。

关于goroutine栈内存管理,有篇很好的文章,饿了么框架技术部的专栏文章:《聊一聊goroutine stack》,把里面的一段内容摘录下,你感受下:

可以看到在rpc调用(grpc invoke)时,栈会发生扩容(runtime.morestack),也就意味着在读写routine内的任何rpc调用都会导致栈扩容, 占用的内存空间会扩大为原来的两倍,4kB的栈会变为8kB,100w的连接的内存占用会从8G扩大为16G(全双工,不考虑其他开销),这简直是噩梦。

另外,再推荐一篇曹大翻译的一篇汇编入门文章,里面也介绍了扩栈:第一章: Go 汇编入门 ,顺便入门一下汇编。

5. 总结

内存分配原理就不再回顾了,强调2个重要的思想:

  1. 使用缓存提高效率。在存储的整个体系中到处可见缓存的思想,Go内存分配和管理也使用了缓存,利用缓存一是减少了系统调用的次数,二是降低了锁的粒度,减少加锁的次数,从这2点提高了内存管理效率。
  2. 以空间换时间,提高内存管理效率。空间换时间是一种常用的性能优化思想,这种思想其实非常普遍,比如Hash、Map、二叉排序树等数据结构的本质就是空间换时间,在数据库中也很常见,比如数据库索引、索引视图和数据缓存等,再如Redis等缓存数据库也是空间换时间的思想。

6. 参考资料

除了文章中已经推荐的文章,再推荐几篇值得读的文章:

  1. 全成的内存分配文章,有不少帮助:https://juejin.im/post/5c888a...
  2. 异常详细的源码分析文章,看完这篇我就不想写源码分析的文章了:https://www.cnblogs.com/zkweb...
  3. 从硬件讲起的一篇文章,也是有点意思:https://www.infoq.cn/article/...
  4. 这篇文章的总流程图很棒:http://media.newbmiao.com/blo...

7. 彩蛋

在查阅资料时,多篇文章都提到了这本书《The Linux Programming Interface》,关于Thread Cache有兴趣去读一下本书第31章。


  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/07/06/go-memory-allocation/

<div style="color:#0096FF; text-align:center">关注公众号,获取最新Golang文章</div>
<img data-original="http://img.lessisbetter.site/...; style="border:0" align=center />

查看原文

赞 29 收藏 16 评论 2

大彬 发布了文章 · 2019-09-09

Go是如何实现protobuf的编解码的(2):源码

原文链接:https://mp.weixin.qq.com/s/oY...

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第二篇。

前言

上一篇文章Go是如何实现protobuf的编解码的(1):原理
中已经指出了Go语言数据和Protobuf数据的编解码是由包github.com/golang/protobuf/proto完成的,本编就来分析一下proto包是如何实现编解码的。

编解码原理

编解码包都有支持的编解码类型,我们暂且把这些类型称为底层类型,编解码的本质是:

  1. 为每一个底层类型配备一个或多个编解码函数
  2. 把一个结构体的字段,递归的拆解成底层类型,然后选择合适的函数进行编码或解码操作

接下来先看编码,再看解码。

编码

约定:以下所有的代码片,如果是request.pb.go或main.go中的代码,会在第一行标记文件名,否则都是proto包的源码。
// main.go
package main

import (
    "fmt"

    "./types"
    "github.com/golang/protobuf/proto"
)

func main() {
    req := &types.Request{Data: "Hello Dabin"}

    // Marshal
    encoded, err := proto.Marshal(req)
    if err != nil {
        fmt.Printf("Encode to protobuf data error: %v", err)
    }
    ...
}

编码调用的是proto.Marshal函数,它可以完成的是Go语言数据序列化成protobuf数据,返回序列化结果或错误。

proto编译成的Go结构体都是符合Message接口的,从Marshal可知Go结构体有3种序列化方式:

  1. pb Message满足newMarshaler接口,则调用XXX_Marshal()进行序列化。
  2. pb满足Marshaler接口,则调用Marshal()进行序列化,这种方式适合某类型自定义序列化规则的情况。
  3. 否则,使用默认的序列化方式,创建一个Warpper,利用wrapper对pb进行序列化,后面会介绍方式1实际就是使用方式3。
// Marshal takes a protocol buffer message
// and encodes it into the wire format, returning the data.
// This is the main entry point.
func Marshal(pb Message) ([]byte, error) {
    if m, ok := pb.(newMarshaler); ok {
        siz := m.XXX_Size()
        b := make([]byte, 0, siz)
        return m.XXX_Marshal(b, false)
    }
    if m, ok := pb.(Marshaler); ok {
        // If the message can marshal itself, let it do it, for compatibility.
        // NOTE: This is not efficient.
        return m.Marshal()
    }
    // in case somehow we didn't generate the wrapper
    if pb == nil {
        return nil, ErrNil
    }
    var info InternalMessageInfo
    siz := info.Size(pb)
    b := make([]byte, 0, siz)
    return info.Marshal(b, pb, false)
}

newMarshalerMarshaler如下:

// newMarshaler is the interface representing objects that can marshal themselves.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newMarshaler interface {
    XXX_Size() int
    XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
}

// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {
    Marshal() ([]byte, error)
}

Request实现了newMarshaler接口,XXX_Marshal实现如下,它实际是调用了xxx_messageInfo_Request.Marshalxxx_messageInfo_Request是定义在request.pb.go中的一个全局变量,类型就是InternalMessageInfo,实际就是前文提到的wrapper。

// request.pb.go
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    print("Called xxx marshal\n")
    panic("I want see stack trace")
    return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

本质上,XXX_Marshal也是wrapper,后面才是真正序列化的主体函数在proto包中。

InternalMessageInfo主要是用来缓存序列化和反序列化需要用到的信息。

// InternalMessageInfo is a type used internally by generated .pb.go files.
// This type is not intended to be used by non-generated code.
// This type is not subject to any compatibility guarantee.
type InternalMessageInfo struct {
    marshal   *marshalInfo   // marshal信息
    unmarshal *unmarshalInfo // unmarshal信息
    merge     *mergeInfo
    discard   *discardInfo
}

InternalMessageInfo.Marshal首先是获取待序列化类型的序列化信息u marshalInfo,然后利用u.marshal进行序列化。

// Marshal is the entry point from generated code,
// and should be ONLY called by generated code.
// It marshals msg to the end of b.
// a is a pointer to a place to store cached marshal info.
func (a *InternalMessageInfo) Marshal(b []byte, msg Message, deterministic bool) ([]byte, error) {
    // 获取该message类型的MarshalInfo,这些信息都缓存起来了
    // 大量并发时无需重复创建
    u := getMessageMarshalInfo(msg, a)
    // 入参校验
    ptr := toPointer(&msg)
    if ptr.isNil() {
        // We get here if msg is a typed nil ((*SomeMessage)(nil)),
        // so it satisfies the interface, and msg == nil wouldn't
        // catch it. We don't want crash in this case.
        return b, ErrNil
    }
    // 根据MarshalInfo对数据进行marshal
    return u.marshal(b, ptr, deterministic)
}

由于每种类型的序列化信息是一致的,所以getMessageMarshalInfo对序列化信息进行了缓存,缓存在a.marshal中,如果a中不存在marshal信息,则去生成,但不进行初始化,然后保存到a中。

func getMessageMarshalInfo(msg interface{}, a *InternalMessageInfo) *marshalInfo {
    // u := a.marshal, but atomically.
    // We use an atomic here to ensure memory consistency.
    // 从InternalMessageInfo中读取
    u := atomicLoadMarshalInfo(&a.marshal)
    // 读取不到代表未保存过
    if u == nil {
        // Get marshal information from type of message.
        t := reflect.ValueOf(msg).Type()
        if t.Kind() != reflect.Ptr {
            panic(fmt.Sprintf("cannot handle non-pointer message type %v", t))
        }
        u = getMarshalInfo(t.Elem())
        // Store it in the cache for later users.
        // a.marshal = u, but atomically.
        atomicStoreMarshalInfo(&a.marshal, u)
    }
    return u
}

getMarshalInfo只是创建了一个marshalInfo对象,填充了字段typ,剩余的字段未填充。

// getMarshalInfo returns the information to marshal a given type of message.
// The info it returns may not necessarily initialized.
// t is the type of the message (NOT the pointer to it).
// 获取MarshalInfo结构体,如果不存在则使用message类型t创建1个
func getMarshalInfo(t reflect.Type) *marshalInfo {
    marshalInfoLock.Lock()
    u, ok := marshalInfoMap[t]
    if !ok {
        u = &marshalInfo{typ: t}
        marshalInfoMap[t] = u
    }
    marshalInfoLock.Unlock()
    return u
}

// marshalInfo is the information used for marshaling a message.
type marshalInfo struct {
    typ          reflect.Type
    fields       []*marshalFieldInfo
    unrecognized field                      // offset of XXX_unrecognized
    extensions   field                      // offset of XXX_InternalExtensions
    v1extensions field                      // offset of XXX_extensions
    sizecache    field                      // offset of XXX_sizecache
    initialized  int32                      // 0 -- only typ is set, 1 -- fully initialized
    messageset   bool                       // uses message set wire format
    hasmarshaler bool                       // has custom marshaler
    sync.RWMutex                            // protect extElems map, also for initialization
    extElems     map[int32]*marshalElemInfo // info of extension elements
}

marshalInfo.marshal是Marshal真实主体,会判断u是否已经初始化,如果未初始化调用computeMarshalInfo计算Marshal需要的信息,实际就是填充marshalInfo中的各种字段。

u.hasmarshaler代表当前类型是否实现了Marshaler接口,直接调用Marshal函数进行序列化。可以确定Marshal函数的序列化方式2,即实现Marshaler接口的方法,最后肯定也会调用marshalInfo.marshal

该函数的主体是一个for循环,依次遍历该类型的每一个字段,对required属性进行校验,然后按字段类型,调用f.marshaler对该字段类型进行序列化。这个f.marshaler哪来的呢?

// marshal is the main function to marshal a message. It takes a byte slice and appends
// the encoded data to the end of the slice, returns the slice and error (if any).
// ptr is the pointer to the message.
// If deterministic is true, map is marshaled in deterministic order.
// 该函数是Marshal的主体函数,把消息编码为数据后,追加到b之后,最后返回b。
// deterministic为true代表map会以确定的顺序进行编码。
func (u *marshalInfo) marshal(b []byte, ptr pointer, deterministic bool) ([]byte, error) {
    // 初始化marshalInfo的基础信息
    // 主要是根据已有信息填充该结构体的一些字段
    if atomic.LoadInt32(&u.initialized) == 0 {
        u.computeMarshalInfo()
    }

    // If the message can marshal itself, let it do it, for compatibility.
    // NOTE: This is not efficient.
    // 如果该类型实现了Marshaler接口,即能够对自己Marshal,则自行Marshal
    // 结果追加到b
    if u.hasmarshaler {
        m := ptr.asPointerTo(u.typ).Interface().(Marshaler)
        b1, err := m.Marshal()
        b = append(b, b1...)
        return b, err
    }

    var err, errLater error
    // The old marshaler encodes extensions at beginning.
    // 检查扩展字段,把message的扩展字段追加到b
    if u.extensions.IsValid() {
        // offset函数用来根据指针偏移量获取message的指定字段
        e := ptr.offset(u.extensions).toExtensions()
        if u.messageset {
            b, err = u.appendMessageSet(b, e, deterministic)
        } else {
            b, err = u.appendExtensions(b, e, deterministic)
        }
        if err != nil {
            return b, err
        }
    }
    if u.v1extensions.IsValid() {
        m := *ptr.offset(u.v1extensions).toOldExtensions()
        b, err = u.appendV1Extensions(b, m, deterministic)
        if err != nil {
            return b, err
        }
    }

    // 遍历message的每一个字段,检查并做编码,然后追加到b
    for _, f := range u.fields {
        if f.required {
            // 如果required的字段未设置,则记录错误,所有的marshal工作完成后再处理
            if ptr.offset(f.field).getPointer().isNil() {
                // Required field is not set.
                // We record the error but keep going, to give a complete marshaling.
                if errLater == nil {
                    errLater = &RequiredNotSetError{f.name}
                }
                continue
            }
        }
        // 字段为指针类型,并且为nil,代表未设置,该字段无需编码
        if f.isPointer && ptr.offset(f.field).getPointer().isNil() {
            // nil pointer always marshals to nothing
            continue
        }
        // 利用这个字段的marshaler进行编码
        b, err = f.marshaler(b, ptr.offset(f.field), f.wiretag, deterministic)
        if err != nil {
            if err1, ok := err.(*RequiredNotSetError); ok {
                // required字段但未设置错误
                // Required field in submessage is not set.
                // We record the error but keep going, to give a complete marshaling.
                if errLater == nil {
                    errLater = &RequiredNotSetError{f.name + "." + err1.field}
                }
                continue
            }
            // “动态数组”中包含nil元素
            if err == errRepeatedHasNil {
                err = errors.New("proto: repeated field " + f.name + " has nil element")
            }
            if err == errInvalidUTF8 {
                if errLater == nil {
                    fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
                    errLater = &invalidUTF8Error{fullName}
                }
                continue
            }
            return b, err
        }
    }
    // 为识别的类型字段,直接转为bytes,追加到b
    // computeMarshalInfo中已经收集这些字段
    if u.unrecognized.IsValid() {
        s := *ptr.offset(u.unrecognized).toBytes()
        b = append(b, s...)
    }
    return b, errLater
}

computeMarshalInfo实际上就是对要序列化的类型,进行一次全面检查,设置好序列化要使用的数据,这其中就包含了各字段的序列化函数f.marshaler。我们就重点关注下这部分,struct的每一个字段都会分配一个marshalFieldInfo,代表这个字段序列化需要的信息,会调用computeMarshalFieldInfo会填充这个对象。

// computeMarshalInfo initializes the marshal info.
func (u *marshalInfo) computeMarshalInfo() {
    // 加锁,代表了不能同时计算marshal信息
    u.Lock()
    defer u.Unlock()
    // 计算1次即可
    if u.initialized != 0 { // non-atomic read is ok as it is protected by the lock
        return
    }

    // 获取要marshal的message类型
    t := u.typ
    u.unrecognized = invalidField
    u.extensions = invalidField
    u.v1extensions = invalidField
    u.sizecache = invalidField

    // If the message can marshal itself, let it do it, for compatibility.
    // 判断当前类型是否实现了Marshal接口,如果实现标记为类型自有marshaler
    // 没用类型断言是因为t是Type类型,不是保存在某个接口的变量
    // NOTE: This is not efficient.
    if reflect.PtrTo(t).Implements(marshalerType) {
        u.hasmarshaler = true
        atomic.StoreInt32(&u.initialized, 1)
        // 可以直接返回了,后面使用自有的marshaler编码
        return
    }

    // get oneof implementers
    // 看*t实现了以下哪个接口,oneof特性
    var oneofImplementers []interface{}
    switch m := reflect.Zero(reflect.PtrTo(t)).Interface().(type) {
    case oneofFuncsIface:
        _, _, _, oneofImplementers = m.XXX_OneofFuncs()
    case oneofWrappersIface:
        oneofImplementers = m.XXX_OneofWrappers()
    }

    n := t.NumField()

    // deal with XXX fields first
    // 遍历t的每一个XXX字段
    for i := 0; i < t.NumField(); i++ {
        f := t.Field(i)
        // 跳过非XXX开头的字段
        if !strings.HasPrefix(f.Name, "XXX_") {
            continue
        }
        // 处理以下几个protobuf自带的字段
        switch f.Name {
        case "XXX_sizecache":
            u.sizecache = toField(&f)
        case "XXX_unrecognized":
            u.unrecognized = toField(&f)
        case "XXX_InternalExtensions":
            u.extensions = toField(&f)
            u.messageset = f.Tag.Get("protobuf_messageset") == "1"
        case "XXX_extensions":
            u.v1extensions = toField(&f)
        case "XXX_NoUnkeyedLiteral":
            // nothing to do
        default:
            panic("unknown XXX field: " + f.Name)
        }
        n--
    }

    // normal fields
    // 处理message的普通字段
    fields := make([]marshalFieldInfo, n) // batch allocation
    u.fields = make([]*marshalFieldInfo, 0, n)
    for i, j := 0, 0; i < t.NumField(); i++ {
        f := t.Field(i)

        // 跳过XXX字段
        if strings.HasPrefix(f.Name, "XXX_") {
            continue
        }

        // 取fields的下一个有效字段,指针类型
        // j代表了fields有效字段数量,n是包含了XXX字段的总字段数量
        field := &fields[j]
        j++
        field.name = f.Name
        // 填充到u.fields
        u.fields = append(u.fields, field)
        // 字段的tag里包含“protobuf_oneof”特殊处理
        if f.Tag.Get("protobuf_oneof") != "" {
            field.computeOneofFieldInfo(&f, oneofImplementers)
            continue
        }
        // 字段里不包含“protobuf”,代表不是protoc自动生成的字段
        if f.Tag.Get("protobuf") == "" {
            // field has no tag (not in generated message), ignore it
            // 删除刚刚保存的字段信息
            u.fields = u.fields[:len(u.fields)-1]
            j--
            continue
        }
        // 填充字段的marshal信息
        field.computeMarshalFieldInfo(&f)
    }

    // fields are marshaled in tag order on the wire.
    // 字段排序
    sort.Sort(byTag(u.fields))

    // 初始化完成
    atomic.StoreInt32(&u.initialized, 1)
}

回顾一下Request的定义,它包含1个字段Data,后面protobuf:...描述了protobuf要使用的信息,"bytes,..."这段被称为tags,用逗号进行分割后,其中:

  • tags[0]: bytes,代表Data类型的数据要被转换为bytes
  • tags[1]: 1,代表了字段的ID
  • tags[2]: opt,代表可行,非必须
  • tags[3]: name=data,proto文件中的名称
  • tags[4]: proto3,代表使用的protobuf版本
// request.pb.go
type Request struct{
    Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
    ...
}

computeMarshalFieldInfo首先要获取字段ID和要转换的类型,填充到marshalFieldInfo,然后调用setMarshaler利用字段f和tags获取该字段类型的序列化函数。

// computeMarshalFieldInfo fills up the information to marshal a field.
func (fi *marshalFieldInfo) computeMarshalFieldInfo(f *reflect.StructField) {
    // parse protobuf tag of the field.
    // tag has format of "bytes,49,opt,name=foo,def=hello!"
    // 获取"protobuf"的完整tag,然后使用,分割,得到上面的格式
    tags := strings.Split(f.Tag.Get("protobuf"), ",")
    if tags[0] == "" {
        return
    }
    // tag的编号,即message中设置的string name = x,则x就是这个字段的tag id
    tag, err := strconv.Atoi(tags[1])
    if err != nil {
        panic("tag is not an integer")
    }
    // 要转换成的类型,bytes,varint等等
    wt := wiretype(tags[0])
    // 设置字段是required还是opt
    if tags[2] == "req" {
        fi.required = true
    }
    // 设置field和tag信息到marshalFieldInfo
    fi.setTag(f, tag, wt)
    // 根据当前的tag信息(类型等),选择marshaler函数
    fi.setMarshaler(f, tags)
}

setMarshaler的重点是typeMarshalertypeMarshaler这个函数非常长,其实就是根据类型设置返回对于的序列化函数,比如Bool、Int32、Uint32...,如果是结构体、切片等复合类型,就可以形成递归了。

// setMarshaler fills up the sizer and marshaler in the info of a field.
func (fi *marshalFieldInfo) setMarshaler(f *reflect.StructField, tags []string) {
    // map类型字段特殊处理
    switch f.Type.Kind() {
    case reflect.Map:
        // map field
        fi.isPointer = true
        fi.sizer, fi.marshaler = makeMapMarshaler(f)
        return
    case reflect.Ptr, reflect.Slice:
        // 指针字段和切片字段标记指针类型
        fi.isPointer = true
    }

    // 根据字段类型和tag选择marshaler
    fi.sizer, fi.marshaler = typeMarshaler(f.Type, tags, true, false)
}

// typeMarshaler returns the sizer and marshaler of a given field.
// t is the type of the field.
// tags is the generated "protobuf" tag of the field.
// If nozero is true, zero value is not marshaled to the wire.
// If oneof is true, it is a oneof field.
// 函数非常长,省略内容
func typeMarshaler(t reflect.Type, tags []string, nozero, oneof bool) (sizer, marshaler) {
    ...
    switch t.Kind() {
    case reflect.Bool:
        if pointer {
            return sizeBoolPtr, appendBoolPtr
        }
        if slice {
            if packed {
                return sizeBoolPackedSlice, appendBoolPackedSlice
            }
            return sizeBoolSlice, appendBoolSlice
        }
        if nozero {
            return sizeBoolValueNoZero, appendBoolValueNoZero
        }
        return sizeBoolValue, appendBoolValue
    case reflect.Uint32:
    ...
    case reflect.Int32:
    ....
    case reflect.Struct:
    ...
}

以下是Bool和String类型的2个序列化函数示例:

func appendBoolValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
    v := *ptr.toBool()
    b = appendVarint(b, wiretag)
    if v {
        b = append(b, 1)
    } else {
        b = append(b, 0)
    }
    return b, nil
}
func appendStringValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
    v := *ptr.toString()
    b = appendVarint(b, wiretag)
    b = appendVarint(b, uint64(len(v)))
    b = append(b, v...)
    return b, nil
}

所以序列化后的[]byte,应当是符合这种模式:

| wiretag | data | wiretag | data | ... | data |

OK,以上就是编码的主要流程,简单回顾一下:

  1. proto.Marshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行序列化,然后才步入序列化的正题
  2. 首先获取要序列化类型的marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,并把加过追加到[]byte,所以字段编码完成,则返回序列化的结果[]byte或者错误。

解码

解码的流程其实与编码很类似,会是上面回顾的3大步骤,主要的区别在步骤2:它要获取的是序列化类型的unmarshal信息u,如果u没有初始化,会进行初始化,设置的是结构体每个字段的反序列化函数,以及其他信息。

所以解码的函数解析会简要的过一遍,不再有编码那么详细的解释。

下面是proto包中反序列化的接口和函数定义:

// Unmarshaler is the interface representing objects that can
// unmarshal themselves.  The argument points to data that may be
// overwritten, so implementations should not keep references to the
// buffer.
// Unmarshal implementations should not clear the receiver.
// Any unmarshaled data should be merged into the receiver.
// Callers of Unmarshal that do not want to retain existing data
// should Reset the receiver before calling Unmarshal.
type Unmarshaler interface {
    Unmarshal([]byte) error
}

// newUnmarshaler is the interface representing objects that can
// unmarshal themselves. The semantics are identical to Unmarshaler.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newUnmarshaler interface {
    // 实现了XXX_Unmarshal
    XXX_Unmarshal([]byte) error
}

// Unmarshal parses the protocol buffer representation in buf and places the
// decoded result in pb.  If the struct underlying pb does not match
// the data in buf, the results can be unpredictable.
//
// Unmarshal resets pb before starting to unmarshal, so any
// existing data in pb is always removed. Use UnmarshalMerge
// to preserve and append to existing data.
func Unmarshal(buf []byte, pb Message) error {
    pb.Reset()
    // pb自己有unmarshal函数,实现了newUnmarshaler接口
    if u, ok := pb.(newUnmarshaler); ok {
        return u.XXX_Unmarshal(buf)
    }
    // pb自己有unmarshal函数,实现了Unmarshaler接口
    if u, ok := pb.(Unmarshaler); ok {
        return u.Unmarshal(buf)
    }
    // 使用默认的Unmarshal
    return NewBuffer(buf).Unmarshal(pb)
}

Request实现了Unmarshaler接口:

// request.pb.go
func (m *Request) XXX_Unmarshal(b []byte) error {
    return xxx_messageInfo_Request.Unmarshal(m, b)
}

反序列化也是使用InternalMessageInfo进行。

// Unmarshal is the entry point from the generated .pb.go files.
// This function is not intended to be used by non-generated code.
// This function is not subject to any compatibility guarantee.
// msg contains a pointer to a protocol buffer struct.
// b is the data to be unmarshaled into the protocol buffer.
// a is a pointer to a place to store cached unmarshal information.
func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
    // Load the unmarshal information for this message type.
    // The atomic load ensures memory consistency.
    // 获取保存在a中的unmarshal信息
    u := atomicLoadUnmarshalInfo(&a.unmarshal)
    if u == nil {
        // Slow path: find unmarshal info for msg, update a with it.
        u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
        atomicStoreUnmarshalInfo(&a.unmarshal, u)
    }
    // Then do the unmarshaling.
    // 执行unmarshal
    err := u.unmarshal(toPointer(&msg), b)
    return err
}

以下是反序列化的主题函数,u未初始化时会调用computeUnmarshalInfo设置反序列化需要的信息。

// unmarshal does the main work of unmarshaling a message.
// u provides type information used to unmarshal the message.
// m is a pointer to a protocol buffer message.
// b is a byte stream to unmarshal into m.
// This is top routine used when recursively unmarshaling submessages.
func (u *unmarshalInfo) unmarshal(m pointer, b []byte) error {
    if atomic.LoadInt32(&u.initialized) == 0 {
        // 为u填充unmarshal信息,以及设置每个字段类型的unmarshaler函数
        u.computeUnmarshalInfo()
    }
    if u.isMessageSet {
        return unmarshalMessageSet(b, m.offset(u.extensions).toExtensions())
    }
    var reqMask uint64 // bitmask of required fields we've seen.
    var errLater error
    for len(b) > 0 {
        // Read tag and wire type.
        // Special case 1 and 2 byte varints.
        var x uint64
        if b[0] < 128 {
            x = uint64(b[0])
            b = b[1:]
        } else if len(b) >= 2 && b[1] < 128 {
            x = uint64(b[0]&0x7f) + uint64(b[1])<<7
            b = b[2:]
        } else {
            var n int
            x, n = decodeVarint(b)
            if n == 0 {
                return io.ErrUnexpectedEOF
            }
            b = b[n:]
        }
        // 获取tag和wire标记
        tag := x >> 3
        wire := int(x) & 7

        // Dispatch on the tag to one of the unmarshal* functions below.
        // 根据tag选择该类型的unmarshalFieldInfo:f
        var f unmarshalFieldInfo
        if tag < uint64(len(u.dense)) {
            f = u.dense[tag]
        } else {
            f = u.sparse[tag]
        }
        // 如果该类型有unmarshaler函数,则执行解码和错误处理
        if fn := f.unmarshal; fn != nil {
            var err error
            // 从b解析,然后填充到f的对应字段
            b, err = fn(b, m.offset(f.field), wire)
            if err == nil {
                reqMask |= f.reqMask
                continue
            }
            if r, ok := err.(*RequiredNotSetError); ok {
                // Remember this error, but keep parsing. We need to produce
                // a full parse even if a required field is missing.
                if errLater == nil {
                    errLater = r
                }
                reqMask |= f.reqMask
                continue
            }
            if err != errInternalBadWireType {
                if err == errInvalidUTF8 {
                    if errLater == nil {
                        fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
                        errLater = &invalidUTF8Error{fullName}
                    }
                    continue
                }
                return err
            }
            // Fragments with bad wire type are treated as unknown fields.
        }

        // Unknown tag.
        // 跳过未知tag,可能是proto中的message定义升级了,增加了一些字段,使用老版本的,就不识别新的字段
        if !u.unrecognized.IsValid() {
            // Don't keep unrecognized data; just skip it.
            var err error
            b, err = skipField(b, wire)
            if err != nil {
                return err
            }
            continue
        }
        // 检查未识别字段是不是extension
        // Keep unrecognized data around.
        // maybe in extensions, maybe in the unrecognized field.
        z := m.offset(u.unrecognized).toBytes()
        var emap map[int32]Extension
        var e Extension
        for _, r := range u.extensionRanges {
            if uint64(r.Start) <= tag && tag <= uint64(r.End) {
                if u.extensions.IsValid() {
                    mp := m.offset(u.extensions).toExtensions()
                    emap = mp.extensionsWrite()
                    e = emap[int32(tag)]
                    z = &e.enc
                    break
                }
                if u.oldExtensions.IsValid() {
                    p := m.offset(u.oldExtensions).toOldExtensions()
                    emap = *p
                    if emap == nil {
                        emap = map[int32]Extension{}
                        *p = emap
                    }
                    e = emap[int32(tag)]
                    z = &e.enc
                    break
                }
                panic("no extensions field available")
            }
        }

        // Use wire type to skip data.
        var err error
        b0 := b
        b, err = skipField(b, wire)
        if err != nil {
            return err
        }
        *z = encodeVarint(*z, tag<<3|uint64(wire))
        *z = append(*z, b0[:len(b0)-len(b)]...)

        if emap != nil {
            emap[int32(tag)] = e
        }
    }
    // 校验解析到的required字段的数量,如果与u中记录的不匹配,则报错
    if reqMask != u.reqMask && errLater == nil {
        // A required field of this message is missing.
        for _, n := range u.reqFields {
            if reqMask&1 == 0 {
                errLater = &RequiredNotSetError{n}
            }
            reqMask >>= 1
        }
    }
    return errLater
}

设置字段反序列化函数的过程不看了,看一下怎么选函数的,typeUnmarshaler是为字段类型,选择反序列化函数,这些函数选择与序列化函数是一一对应的。

// typeUnmarshaler returns an unmarshaler for the given field type / field tag pair.
func typeUnmarshaler(t reflect.Type, tags string) unmarshaler {
    ...
    // Figure out packaging (pointer, slice, or both)
    slice := false
    pointer := false
    if t.Kind() == reflect.Slice && t.Elem().Kind() != reflect.Uint8 {
        slice = true
        t = t.Elem()
    }
    if t.Kind() == reflect.Ptr {
        pointer = true
        t = t.Elem()
    }
    ...
    switch t.Kind() {
    case reflect.Bool:
        if pointer {
            return unmarshalBoolPtr
        }
        if slice {
            return unmarshalBoolSlice
        }
        return unmarshalBoolValue
    }
}

unmarshalBoolValue是默认的Bool类型反序列化函数,会把protobuf数据b解码,然后转换为bool类型v,最后赋值给字段f。

func unmarshalBoolValue(b []byte, f pointer, w int) ([]byte, error) {
    if w != WireVarint {
        return b, errInternalBadWireType
    }
    // Note: any length varint is allowed, even though any sane
    // encoder will use one byte.
    // See https://github.com/golang/protobuf/issues/76
    x, n := decodeVarint(b)
    if n == 0 {
        return nil, io.ErrUnexpectedEOF
    }
    // TODO: check if x>1? Tests seem to indicate no.
    // toBool是返回bool类型的指针
    // 完成对字段f的赋值
    v := x != 0
    *f.toBool() = v
    return b[n:], nil
}

总结

本文分析了Go语言protobuf数据的序列化和反序列过程,可以简要概括为:

  1. proto.Marshalproto.Unmarshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行(反)序列化,然后才步入(反)序列化的正题
  2. 首先获取要目标类型的(um)marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的(反)序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,生成序列化的结果,或进行解码,给结构体成员进行赋值

参考文章

以下参考文章都值得阅读:

查看原文

赞 3 收藏 2 评论 0

大彬 发布了文章 · 2019-09-09

Go是如何实现protobuf的编解码的(1):原理

原文链接:https://mp.weixin.qq.com/s/O8...

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第一篇。

Protocol Buffers介绍

Protocol buffers缩写为protobuf,是由Google创造的一种用于序列化的标记语言,项目Github仓库:https://github.com/protocolbu...

Protobuf主要用于不同的编程语言的协作RPC场景下,定义需要序列化的数据格式。Protobuf本质上仅仅是一种用于交互的结构式定义,从功能上和XML、JSON等各种其他的交互形式都并无本质不同,只负责定义不负责数据编解码

其官方介绍如下:

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Protocol buffers的多语言支持

protobuf是支持多种编程语言的,即多种编程语言的类型数据可以转换成protobuf定义的类型数据,各种语言的类型对应可以看此介绍

我们介绍一下protobuf对多语言的支持原理。protobuf有个程序叫protoc,它是一个编译程序,负责把proto文件编译成对应语言的文件,它已经支持了C++、C#、Java、Python,而对于Go和Dart需要安装插件才能配合生成对于语言的文件。

对于C++,protoc可以把a.proto,编译成a.pb.ha.pb.cc

对于Go,protoc需要使用插件protoc-gen-go,把a.proto,编译成a.pb.go,其中包含了定义的数据类型,它的序列化和反序列化函数等。

敲黑板,对Go语言,protoc只负责利用protoc-gen-go把proto文件编译成Go语言文件,并不负责序列化和反序列化,生成的Go语言文件中的序列化和反序列化操作都是只是wrapper。

那Go语言对protobuf的序列化和反序列化,是由谁完成的?

github.com/golang/protobuf/proto完成,它负责把结构体等序列化成proto数据([]byte),把proto数据反序列化成Go结构体。

OK,原理部分就铺垫这些,看一个简单样例,了解protoc和protoc-gen-go的使用,以及进行序列化和反序列化操作。

一个Hello World样例

根据上面的介绍,Go语言使用protobuf我们要先安装2个工具:protoc和protoc-gen-go。

安装protoc和protoc-gen-go

首先去下载页下载符合你系统的protoc,本文示例版本如下:

➜  protoc-3.9.0-osx-x86_64 tree .
.
├── bin
│   └── protoc
├── include
│   └── google
│       └── protobuf
│           ├── any.proto
│           ├── api.proto
│           ├── compiler
│           │   └── plugin.proto
│           ├── descriptor.proto
│           ├── duration.proto
│           ├── empty.proto
│           ├── field_mask.proto
│           ├── source_context.proto
│           ├── struct.proto
│           ├── timestamp.proto
│           ├── type.proto
│           └── wrappers.proto
└── readme.txt

5 directories, 14 files

protoc的安装步骤在readme.txt中:

To install, simply place this binary somewhere in your PATH.

protoc-3.9.0-osx-x86_64/bin加入到PATH。

If you intend to use the included well known types then don't forget to
copy the contents of the 'include' directory somewhere as well, for example
into '/usr/local/include/'.

如果使用已经定义好的类型,即上面include目录*.proto文件中的类型,把include目录下文件,拷贝到/usr/local/include/

安装protoc-gen-go:

go get –u github.com/golang/protobuf/protoc-gen-go

检查安装,应该能查到这2个程序的位置:

➜  fabric git:(release-1.4) which protoc
/usr/local/bin/protoc
➜  fabric git:(release-1.4) which protoc-gen-go
/Users/shitaibin/go/bin/protoc-gen-go

Hello world

创建了一个使用protoc的小玩具,项目地址Github: golang_step_by_step

它的目录结构如下:

➜  protobuf git:(master) tree helloworld1
helloworld1
├── main.go
├── request.proto
└── types
    └── request.pb.go

定义proto文件

使用proto3,定义一个Request,request.proto内容如下:

// file: request.proto
syntax = "proto3";
package helloworld;
option go_package="./types";

message Request {
    string data = 1;
}
  • syntax:protobuf版本,现在是proto3
  • package:不完全等价于Go的package,最好另行设定go_package,指定根据protoc文件生成的go语言文件的package名称。
  • message:会编译成Go的struct

    • string data = 1:代表request的成员data是string类型,该成员的id是1,protoc给每个成员都定义一个编号,编解码的时候使用编号代替使用成员名称,压缩数据量。

编译proto文件

$ protoc --go_out=. ./request.proto

--go_out指明了要把./request.proto编译成Go语言文件,生成的是./types/request.pb.go,注意观察一下为Request结构体生产的2个方法XXX_UnmarshalXXX_Marshal,文件内容如下:

// file: ./types/request.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: request.proto

package types

import (
    fmt "fmt"
    math "math"

    proto "github.com/golang/protobuf/proto"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type Request struct {
    Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
    // 以下是protobuf自动填充的字段,protobuf需要使用
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

func (m *Request) Reset()         { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage()    {}
func (*Request) Descriptor() ([]byte, []int) {
    return fileDescriptor_7f73548e33e655fe, []int{0}
}

// 反序列化函数
func (m *Request) XXX_Unmarshal(b []byte) error {
    return xxx_messageInfo_Request.Unmarshal(m, b)
}
// 序列化函数
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}
func (m *Request) XXX_Merge(src proto.Message) {
    xxx_messageInfo_Request.Merge(m, src)
}
func (m *Request) XXX_Size() int {
    return xxx_messageInfo_Request.Size(m)
}
func (m *Request) XXX_DiscardUnknown() {
    xxx_messageInfo_Request.DiscardUnknown(m)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

// 获取字段
func (m *Request) GetData() string {
    if m != nil {
        return m.Data
    }
    return ""
}

func init() {
    proto.RegisterType((*Request)(nil), "helloworld.Request")
}

func init() { proto.RegisterFile("request.proto", fileDescriptor_7f73548e33e655fe) }

var fileDescriptor_7f73548e33e655fe = []byte{
    // 91 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x4a, 0x2d, 0x2c,
    0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x48, 0xcd, 0xc9, 0xc9,
    0x2f, 0xcf, 0x2f, 0xca, 0x49, 0x51, 0x92, 0xe5, 0x62, 0x0f, 0x82, 0x48, 0x0a, 0x09, 0x71, 0xb1,
    0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4e, 0x9c, 0x51,
    0xec, 0x7a, 0xfa, 0x25, 0x95, 0x05, 0xa9, 0xc5, 0x49, 0x6c, 0x60, 0xcd, 0xc6, 0x80, 0x00, 0x00,
    0x00, 0xff, 0xff, 0x2e, 0x52, 0x69, 0xb5, 0x4d, 0x00, 0x00, 0x00,
}

编写Go语言程序

下面这段测试程序就是创建了一个请求,序列化又反序列化的过程。

// file: main.go
package main

import (
    "fmt"

    "./types"
    "github.com/golang/protobuf/proto"
)

func main() {
    req := &types.Request{Data: "Hello LIB"}

    // Marshal
    encoded, err := proto.Marshal(req)
    if err != nil {
        fmt.Printf("Encode to protobuf data error: %v", err)
    }

    // Unmarshal
    var unmarshaledReq types.Request
    err = proto.Unmarshal(encoded, &unmarshaledReq)
    if err != nil {
        fmt.Printf("Unmarshal to struct error: %v", err)
    }

    fmt.Printf("req: %v\n", req.String())
    fmt.Printf("unmarshaledReq: %v\n", unmarshaledReq.String())
}

运行结果:

➜  helloworld1 git:(master) go run main.go
req: data:"Hello LIB"
unmarshaledReq: data:"Hello LIB"

以上都是铺垫,下一节的proto包怎么实现编解码才是重点,protobuf用法可以去翻:

  1. 官方介绍:protoc3介绍编码介绍Go教程
  2. 煎鱼grpc系列文章

参考文章

查看原文

赞 3 收藏 3 评论 0

大彬 赞了文章 · 2019-07-22

从实践到原理,带你参透 gRPC

image

原文地址:从实践到原理,带你参透 gRPC

gRPC 在 Go 语言中大放异彩,越来越多的小伙伴在使用,最近也在公司安利了一波,希望能通过这篇文章能带你一览 gRPC 的爱与恨。本文篇幅较长,希望你做好阅读准备,目录如下:

image

简述

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特性。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

调用模型

image

1、客户端(gRPC Stub)调用 A 方法,发起 RPC 调用。

2、对请求信息使用 Protobuf 进行对象序列化压缩(IDL)。

3、服务端(gRPC Server)接收到请求后,解码请求体,进行业务逻辑处理并返回。

4、对响应结果使用 Protobuf 进行对象序列化压缩(IDL)。

5、客户端接受到服务端响应,解码请求体。回调被调用的 A 方法,唤醒正在等待响应(阻塞)的客户端调用并返回响应结果。

调用方式

一、Unary RPC:一元 RPC

image

Server

type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
    return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const PORT = "9001"

func main() {
    server := grpc.NewServer()
    pb.RegisterSearchServiceServer(server, &SearchService{})

    lis, err := net.Listen("tcp", ":"+PORT)
    ...

    server.Serve(lis)
}
  • 创建 gRPC Server 对象,你可以理解为它是 Server 端的抽象对象。
  • 将 SearchService(其包含需要被调用的服务端接口)注册到 gRPC Server。 的内部注册中心。这样可以在接受到请求时,通过内部的 “服务发现”,发现该服务端接口并转接进行逻辑处理。
  • 创建 Listen,监听 TCP 端口。
  • gRPC Server 开始 lis.Accept,直到 Stop 或 GracefulStop。

Client

func main() {
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    ...
    defer conn.Close()

    client := pb.NewSearchServiceClient(conn)
    resp, err := client.Search(context.Background(), &pb.SearchRequest{
        Request: "gRPC",
    })
    ...
}
  • 创建与给定目标(服务端)的连接句柄。
  • 创建 SearchService 的客户端对象。
  • 发送 RPC 请求,等待同步响应,得到回调后返回响应结果。

二、Server-side streaming RPC:服务端流式 RPC

image

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    for n := 0; n <= 6; n++ {
        stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                ...
            },
        })
    }

    return nil
}

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.List(context.Background(), r)
    ...
    
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    return nil
}

三、Client-side streaming RPC:客户端流式 RPC

image

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}})
        }
        ...

    }

    return nil
}

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Record(context.Background())
    ...
    
    for n := 0; n < 6; n++ {
        stream.Send(r)
    }

    resp, err := stream.CloseAndRecv()
    ...

    return nil
}

四、Bidirectional streaming RPC:双向流式 RPC

image

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    for {
        stream.Send(&pb.StreamResponse{...})
        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        ...
    }

    return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Route(context.Background())
    ...

    for n := 0; n <= 6; n++ {
        stream.Send(r)
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    stream.CloseSend()

    return nil
}

客户端与服务端是如何交互的

在开始分析之前,我们要先 gRPC 的调用有一个初始印象。那么最简单的就是对 Client 端调用 Server 端进行抓包去剖析,看看整个过程中它都做了些什么事。如下图:

image

  • Magic
  • SETTINGS
  • HEADERS
  • DATA
  • SETTINGS
  • WINDOW_UPDATE
  • PING
  • HEADERS
  • DATA
  • HEADERS
  • WINDOW_UPDATE
  • PING

我们略加整理发现共有十二个行为,是比较重要的。在开始分析之前,建议你自己先想一下,它们的作用都是什么?大胆猜测一下,带着疑问去学习效果更佳。

行为分析

Magic

image

Magic 帧的主要作用是建立 HTTP/2 请求的前言。在 HTTP/2 中,要求两端都要发送一个连接前言,作为对所使用协议的最终确认,并确定 HTTP/2 连接的初始设置,客户端和服务端各自发送不同的连接前言。

而上图中的 Magic 帧是客户端的前言之一,内容为 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n,以确定启用 HTTP/2 连接。

SETTINGS

image

image

SETTINGS 帧的主要作用是设置这一个连接的参数,作用域是整个连接而并非单一的流。

而上图的 SETTINGS 帧都是空 SETTINGS 帧,图一是客户端连接的前言(Magic 和 SETTINGS 帧分别组成连接前言)。图二是服务端的。另外我们从图中可以看到多个 SETTINGS 帧,这是为什么呢?是因为发送完连接前言后,客户端和服务端还需要有一步互动确认的动作。对应的就是带有 ACK 标识 SETTINGS 帧。

HEADERS

image

HEADERS 帧的主要作用是存储和传播 HTTP 的标头信息。我们关注到 HEADERS 里有一些眼熟的信息,分别如下:

  • method:POST
  • scheme:http
  • path:/proto.SearchService/Search
  • authority::10001
  • content-type:application/grpc
  • user-agent:grpc-go/1.20.0-dev

你会发现这些东西非常眼熟,其实都是 gRPC 的基础属性,实际上远远不止这些,只是设置了多少展示多少。例如像平时常见的 grpc-timeoutgrpc-encoding 也是在这里设置的。

DATA

image

DATA 帧的主要作用是装填主体信息,是数据帧。而在上图中,可以很明显看到我们的请求参数 gRPC 存储在里面。只需要了解到这一点就可以了。

HEADERS, DATA, HEADERS

image

在上图中 HEADERS 帧比较简单,就是告诉我们 HTTP 响应状态和响应的内容格式。

imgae

在上图中 DATA 帧主要承载了响应结果的数据集,图中的 gRPC Server 就是我们 RPC 方法的响应结果。

image

在上图中 HEADERS 帧主要承载了 gRPC 状态 和 gRPC 状态消息,图中的 grpc-statusgrpc-message 就是我们的 gRPC 调用状态的结果。

其它步骤

WINDOW_UPDATE

主要作用是管理和流的窗口控制。通常情况下打开一个连接后,服务器和客户端会立即交换 SETTINGS 帧来确定流控制窗口的大小。默认情况下,该大小设置为约 65 KB,但可通过发出一个 WINDOW_UPDATE 帧为流控制设置不同的大小。

image

PING/PONG

主要作用是判断当前连接是否仍然可用,也常用于计算往返时间。其实也就是 PING/PONG,大家对此应该很熟。

小结

image

  • 在建立连接之前,客户端/服务端都会发送连接前言(Magic+SETTINGS),确立协议和配置项。
  • 在传输数据时,是会涉及滑动窗口(WINDOW_UPDATE)等流控策略的。
  • 传播 gRPC 附加信息时,是基于 HEADERS 帧进行传播和设置;而具体的请求/响应数据是存储的 DATA 帧中的。
  • 请求/响应结果会分为 HTTP 和 gRPC 状态响应两种类型。
  • 客户端发起 PING,服务端就会回应 PONG,反之亦可。

这块 gRPC 的基础使用,你可以看看我另外的 《gRPC 入门系列》,相信对你一定有帮助。

浅谈理解

服务端

image

为什么四行代码,就能够起一个 gRPC Server,内部做了什么逻辑。你有想过吗?接下来我们一步步剖析,看看里面到底是何方神圣。

一、初始化

// grpc.NewServer()
func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range opt {
        o(&opts)
    }
    s := &Server{
        lis:    make(map[net.Listener]bool),
        opts:   opts,
        conns:  make(map[io.Closer]bool),
        m:      make(map[string]*service),
        quit:   make(chan struct{}),
        done:   make(chan struct{}),
        czData: new(channelzData),
    }
    s.cv = sync.NewCond(&s.mu)
    ...

    return s
}

这块比较简单,主要是实例 grpc.Server 并进行初始化动作。涉及如下:

  • lis:监听地址列表。
  • opts:服务选项,这块包含 Credentials、Interceptor 以及一些基础配置。
  • conns:客户端连接句柄列表。
  • m:服务信息映射。
  • quit:退出信号。
  • done:完成信号。
  • czData:用于存储 ClientConn,addrConn 和 Server 的channelz 相关数据。
  • cv:当优雅退出时,会等待这个信号量,直到所有 RPC 请求都处理并断开才会继续处理。

二、注册

pb.RegisterSearchServiceServer(server, &SearchService{})

步骤一:Service API interface

// search.pb.go
type SearchServiceServer interface {
    Search(context.Context, *SearchRequest) (*SearchResponse, error)
}

func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) {
    s.RegisterService(&_SearchService_serviceDesc, srv)
}

还记得我们平时编写的 Protobuf 吗?在生成出来的 .pb.go 文件中,会定义出 Service APIs interface 的具体实现约束。而我们在 gRPC Server 进行注册时,会传入应用 Service 的功能接口实现,此时生成的 RegisterServer 方法就会保证两者之间的一致性。

步骤二:Service API IDL

你想乱传糊弄一下?不可能的,请乖乖定义与 Protobuf 一致的接口方法。但是那个 &_SearchService_serviceDesc 又有什么作用呢?代码如下:

// search.pb.go
var _SearchService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "proto.SearchService",
    HandlerType: (*SearchServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Search",
            Handler:    _SearchService_Search_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "search.proto",
}

这看上去像服务的描述代码,用来向内部表述 “我” 都有什么。涉及如下:

  • ServiceName:服务名称
  • HandlerType:服务接口,用于检查用户提供的实现是否满足接口要求
  • Methods:一元方法集,注意结构内的 Handler 方法,其对应最终的 RPC 处理方法,在执行 RPC 方法的阶段会使用。
  • Streams:流式方法集
  • Metadata:元数据,是一个描述数据属性的东西。在这里主要是描述 SearchServiceServer 服务

步骤三:Register Service

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    ...
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        ...
    }
    s.m[sd.ServiceName] = srv
}

在最后一步中,我们会将先前的服务接口信息、服务描述信息给注册到内部 service 去,以便于后续实际调用的使用。涉及如下:

  • server:服务的接口信息
  • md:一元服务的 RPC 方法集
  • sd:流式服务的 RPC 方法集
  • mdata:metadata,元数据

小结

在这一章节中,主要介绍的是 gRPC Server 在启动前的整理和注册行为,看上去很简单,但其实一切都是为了后续的实际运行的预先准备。因此我们整理一下思路,将其串联起来看看,如下:

image

三、监听

接下来到了整个流程中,最重要也是大家最关注的监听/处理阶段,核心代码如下:

func (s *Server) Serve(lis net.Listener) error {
    ...
    var tempDelay time.Duration 
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            if ne, ok := err.(interface {
                Temporary() bool
            }); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                ...
                timer := time.NewTimer(tempDelay)
                select {
                case <-timer.C:
                case <-s.quit:
                    timer.Stop()
                    return nil
                }
                continue
            }
            ...
            return err
        }
        tempDelay = 0

        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }
}

Serve 会根据外部传入的 Listener 不同而调用不同的监听模式,这也是 net.Listener 的魅力,灵活性和扩展性会比较高。而在 gRPC Server 中最常用的就是 TCPConn,基于 TCP Listener 去做。接下来我们一起看看具体的处理逻辑,如下:

image

  • 循环处理连接,通过 lis.Accept 取出连接,如果队列中没有需处理的连接时,会形成阻塞等待。
  • lis.Accept 失败,则触发休眠机制,若为第一次失败那么休眠 5ms,否则翻倍,再次失败则不断翻倍直至上限休眠时间 1s,而休眠完毕后就会尝试去取下一个 “它”。
  • lis.Accept 成功,则重置休眠的时间计数和启动一个新的 goroutine 调用 handleRawConn 方法去执行/处理新的请求,也就是大家很喜欢说的 “每一个请求都是不同的 goroutine 在处理”。
  • 在循环过程中,包含了 “退出” 服务的场景,主要是硬关闭和优雅重启服务两种情况。

客户端

image

一、创建拨号连接

// grpc.Dial(":"+PORT, grpc.WithInsecure())
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    ...
    chainUnaryClientInterceptors(cc)
    chainStreamClientInterceptors(cc)

    ...
}

grpc.Dial 方法实际上是对于 grpc.DialContext 的封装,区别在于 ctx 是直接传入 context.Background。其主要功能是创建与给定目标的客户端连接,其承担了以下职责:

  • 初始化 ClientConn
  • 初始化(基于进程 LB)负载均衡配置
  • 初始化 channelz
  • 初始化重试规则和客户端一元/流式拦截器
  • 初始化协议栈上的基础信息
  • 相关 context 的超时控制
  • 初始化并解析地址信息
  • 创建与服务端之间的连接

连没连

之前听到有的人说调用 grpc.Dial 后客户端就已经与服务端建立起了连接,但这对不对呢?我们先鸟瞰全貌,看看正在跑的 goroutine。如下:

image

我们可以有几个核心方法一直在等待/处理信号,通过分析底层源码可得知。涉及如下:

func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()

在这里主要分析 goroutine 提示的 resetTransport 方法,看看都做了啥。核心代码如下:

func (ac *addrConn) resetTransport() {
    for i := 0; ; i++ {
        if ac.state == connectivity.Shutdown {
            return
        }
        ...
        connectDeadline := time.Now().Add(dialDuration)
        ac.updateConnectivityState(connectivity.Connecting)
        newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
        if err != nil {
            if ac.state == connectivity.Shutdown {
                return
            }
            ac.updateConnectivityState(connectivity.TransientFailure)
            timer := time.NewTimer(backoffFor)
            select {
            case <-timer.C:
                ...
            }
            continue
        }

        if ac.state == connectivity.Shutdown {
            newTr.Close()
            return
        }
        ...
        if !healthcheckManagingState {
            ac.updateConnectivityState(connectivity.Ready)
        }
        ...

        if ac.state == connectivity.Shutdown {
            return
        }
        ac.updateConnectivityState(connectivity.TransientFailure)
    }
}

在该方法中会不断地去尝试创建连接,若成功则结束。否则不断地根据 Backoff 算法的重试机制去尝试创建连接,直到成功为止。从结论上来讲,单纯调用 DialContext 是异步建立连接的,也就是并不是马上生效,处于 Connecting 状态,而正式下要到达 Ready 状态才可用。

真的连了吗

image

在抓包工具上提示一个包都没有,那么这算真正连接了吗?我认为这是一个表述问题,我们应该尽可能的严谨。如果你真的想通过 DialContext 方法就打通与服务端的连接,则需要调用 WithBlock 方法,虽然会导致阻塞等待,但最终连接会到达 Ready 状态(握手成功)。如下图:

image

二、实例化 Service API

type SearchServiceClient interface {
    Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error)
}

type searchServiceClient struct {
    cc *grpc.ClientConn
}

func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient {
    return &searchServiceClient{cc}
}

这块就是实例 Service API interface,比较简单。

三、调用

// search.pb.go
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
    out := new(SearchResponse)
    err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

proto 生成的 RPC 方法更像是一个包装盒,把需要的东西放进去,而实际上调用的还是 grpc.invoke 方法。如下:

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

通过概览,可以关注到三块调用。如下:

  • newClientStream:获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
  • cs.SendMsg:发送 RPC 请求出去,但其并不承担等待响应的功能。
  • cs.RecvMsg:阻塞等待接受到的 RPC 方法响应结果。

连接

// clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
        FullMethodName: method,
    })
    if err != nil {
        return nil, nil, toRPCErr(err)
    }
    return t, done, nil
}

newClientStream 方法中,我们通过 getTransport 方法获取了 Transport 层中抽象出来的 ClientTransport 和 ServerTransport,实际上就是获取一个连接给后续 RPC 调用传输使用。

四、关闭连接

// conn.Close()
func (cc *ClientConn) Close() error {
    defer cc.cancel()
    ...
    cc.csMgr.updateState(connectivity.Shutdown)
    ...
    cc.blockingpicker.close()
    if rWrapper != nil {
        rWrapper.close()
    }
    if bWrapper != nil {
        bWrapper.close()
    }

    for ac := range conns {
        ac.tearDown(ErrClientConnClosing)
    }
    if channelz.IsOn() {
        ...
        channelz.AddTraceEvent(cc.channelzID, ted)
        channelz.RemoveEntry(cc.channelzID)
    }
    return nil
}

该方法会取消 ClientConn 上下文,同时关闭所有底层传输。涉及如下:

  • Context Cancel
  • 清空并关闭客户端连接
  • 清空并关闭解析器连接
  • 清空并关闭负载均衡连接
  • 添加跟踪引用
  • 移除当前通道信息

Q&A

1. gRPC Metadata 是通过什么传输?

image

2. 调用 grpc.Dial 会真正的去连接服务端吗?

会,但是是异步连接的,连接状态为正在连接。但如果你设置了 grpc.WithBlock 选项,就会阻塞等待(等待握手成功)。另外你需要注意,当未设置 grpc.WithBlock 时,ctx 超时控制对其无任何效果。

3. 调用 ClientConn 不 Close 会导致泄露吗?

会,除非你的客户端不是常驻进程,那么在应用结束时会被动地回收资源。但如果是常驻进程,你又真的忘记执行 Close 语句,会造成的泄露。如下图:

3.1. 客户端

image

3.2. 服务端

image

3.3. TCP

image

4. 不控制超时调用的话,会出现什么问题?

短时间内不会出现问题,但是会不断积蓄泄露,积蓄到最后当然就是服务无法提供响应了。如下图:

image

5. 为什么默认的拦截器不可以传多个?

func chainUnaryClientInterceptors(cc *ClientConn) {
    interceptors := cc.dopts.chainUnaryInts
    if cc.dopts.unaryInt != nil {
        interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
    }
    var chainedInt UnaryClientInterceptor
    if len(interceptors) == 0 {
        chainedInt = nil
    } else if len(interceptors) == 1 {
        chainedInt = interceptors[0]
    } else {
        chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
            return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
        }
    }
    cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

6. 真的需要用到多个拦截器的话,怎么办?

可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptorgrpc.StreamInterceptor 链式方法,方便快捷省心。

单单会用还不行,我们再深剖一下,看看它是怎么实现的。核心代码如下:

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
    n := len(interceptors)
    if n > 1 {
        lastI := n - 1
        return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
            var (
                chainHandler grpc.UnaryInvoker
                curI         int
            )

            chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
                if curI == lastI {
                    return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
                }
                curI++
                err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
                curI--
                return err
            }

            return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
        }
    }
    ...
}

当拦截器数量大于 1 时,从 interceptors[1] 开始递归,每一个递归的拦截器 interceptors[i] 会不断地执行,最后才真正的去执行 handler 方法。同时也经常有人会问拦截器的执行顺序是什么,通过这段代码你得出结论了吗?

7. 频繁创建 ClientConn 有什么问题?

这个问题我们可以反向验证一下,假设不公用 ClientConn 看看会怎么样?如下:

func BenchmarkSearch(b *testing.B) {
    for i := 0; i < b.N; i++ {
        conn, err := GetClientConn()
        if err != nil {
            b.Errorf("GetClientConn err: %v", err)
        }
        _, err = Search(context.Background(), conn)
        if err != nil {
            b.Errorf("Search err: %v", err)
        }
    }
}

输出结果:

    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
FAIL
exit status 1

当你的应用场景是存在高频次同时生成/调用 ClientConn 时,可能会导致系统的文件句柄占用过多。这种情况下你可以变更应用程序生成/调用 ClientConn 的模式,又或是池化它,这块可以参考 grpc-go-pool 项目。

8. 客户端请求失败后会默认重试吗?

会不断地进行重试,直到上下文取消。而重试时间方面采用 backoff 算法作为的重连机制,默认的最大重试时间间隔是 120s。

9. 为什么要用 HTTP/2 作为传输协议?

许多客户端要通过 HTTP 代理来访问网络,gRPC 全部用 HTTP/2 实现,等到代理开始支持 HTTP/2 就能透明转发 gRPC 的数据。不光如此,负责负载均衡、访问控制等等的反向代理都能无缝兼容 gRPC,比起自己设计 wire protocol 的 Thrift,这样做科学不少。@ctiller @滕亦飞

10. 在 Kubernetes 中 gRPC 负载均衡有问题?

gRPC 的 RPC 协议是基于 HTTP/2 标准实现的,HTTP/2 的一大特性就是不需要像 HTTP/1.1 一样,每次发出请求都要重新建立一个新连接,而是会复用原有的连接。

所以这将导致 kube-proxy 只有在连接建立时才会做负载均衡,而在这之后的每一次 RPC 请求都会利用原本的连接,那么实际上后续的每一次的 RPC 请求都跑到了同一个地方。

注:使用 k8s service 做负载均衡的情况下

总结

  • gRPC 基于 HTTP/2 + Protobuf。
  • gRPC 有四种调用方式,分别是一元、服务端/客户端流式、双向流式。
  • gRPC 的附加信息都会体现在 HEADERS 帧,数据在 DATA 帧上。
  • Client 请求若使用 grpc.Dial 默认是异步建立连接,当时状态为 Connecting。
  • Client 请求若需要同步则调用 WithBlock(),完成状态为 Ready。
  • Server 监听是循环等待连接,若没有则休眠,最大休眠时间 1s;若接收到新请求则起一个新的 goroutine 去处理。
  • grpc.ClientConn 不关闭连接,会导致 goroutine 和 Memory 等泄露。
  • 任何内/外调用如果不加超时控制,会出现泄漏和客户端不断重试。
  • 特定场景下,如果不对 grpc.ClientConn 加以调控,会影响调用。
  • 拦截器如果不用 go-grpc-middleware 链式处理,会覆盖。
  • 在选择 gRPC 的负责均衡模式时,需要谨慎。

参考

查看原文

赞 111 收藏 68 评论 4

大彬 评论了文章 · 2019-05-22

Golang并发模型:轻松入门协程池

goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理。但有时候,我们还是想控制一下。

比如,我们有A、B两类工作,不想把太多资源花费在B类务上,而是花在A类任务上。对于A,我们可以来1个开一个goroutine去处理,对于B,我们可以使用一个协程池,协程池里有5个线程去处理B类任务,这样B消耗的资源就不会太多。

控制使用资源并不是协程池目的,使用协程池是为了更好并发、程序鲁棒性、容错性等。废话少说,快速入门协程池才是这篇文章的目的。

协程池指的是预先分配固定数量的goroutine处理相同的任务,和线程池是类似的,不同点是协程池中处理任务的是协程,线程池中处理任务的是线程。

最简单的协程池模型

简单协程池模型

上面这个图展示了最简单的协程池的样子。先把协程池作为一个整体看,它使用2个通道,左边的jobCh是任务通道,任务会从这个通道中流进来,右边的retCh是结果通道,协程池处理任务后得到的结果会写入这个通道。至于协程池中,有多少协程处理任务,这是外部不关心的。

看一下协程池内部,图中画了5个goroutine,实际goroutine的数量是依具体情况而定的。协程池内每个协程都从jobCh读任务、处理任务,然后将结果写入到retCh

示例

模型看懂了,看个小例子吧。

示例代码1

workerPool()会创建1个简单的协程池,协程的数量可以通入参数n执行,并且还指定了jobChretCh两个参数。

worker()是协程池中的协程,入参分布是它的ID、job通道和结果通道。使用for-rangejobCh读取任务,直到jobCh关闭,然后一个最简单的任务:生成1个字符串,证明自己处理了某个任务,并把字符串作为结果写入retCh

示例代码2

main()启动genJob获取存放任务的通道jobCh,然后创建retCh,它的缓存空间是200,并使用workerPool启动一个有5个协程的协程池。1s之后,关闭retCh,然后开始从retCh中读取协程池处理结果,并打印。

genJob启动一个协程,并生产n个任务,写入到jobCh

示例运行结果如下,一共产生了10个任务,显示大部分工作都被worker 2这个协程抢走了,如果我们设置的任务成千上万,协程池长时间处理任务,每个协程处理的工作数量就会均衡很多。

➜ go run simple_goroutine_pool.go
worker 2 processed job: 4
worker 2 processed job: 5
worker 2 processed job: 6
worker 2 processed job: 7
worker 2 processed job: 8
worker 2 processed job: 9
worker 0 processed job: 1
worker 3 processed job: 2
worker 4 processed job: 3
worker 1 processed job: 0

回顾

最简单的协程池模型就这么简单,再回头看下协程池及周边由哪些组成:

  1. 协程池内的一定数量的协程。
  2. 任务队列,即jobCh,存在协程池不能立即处理任务的情况,所以需要队列把任务先暂存。
  3. 结果队列,即retCh,同上,协程池处理任务的结果,也存在不能被下游立刻提取的情况,要暂时保存。

协程池最简要(核心)的逻辑是所有协程从任务读取任务,处理后把结果存放到结果队列。

Go并发系列文章

  1. Golang并发模型:轻松入门流水线模型
  2. Golang并发模型:轻松入门流水线FAN模式
  3. Golang并发模型:并发协程的优雅退出
  4. Golang并发模型:轻松入门select
  5. Golang并发模型:select进阶
  6. Golang并发模型:轻松入门协程池
  1. 如果这篇文章对你有帮助,请点个赞/喜欢,鼓励我持续分享,感谢
  2. 如果喜欢本文,随意转载,但请保留此原文链接
  3. 博客文章列表,点此可查看

一起学Golang-分享有料的Go语言技术

查看原文

大彬 评论了文章 · 2019-05-20

实战Go内存泄露

最近解决了我们项目中的一个内存泄露问题,事实再次证明pprof是一个好工具,但掌握好工具的正确用法,才能发挥好工具的威力,不然就算你手里有屠龙刀,也成不了天下第一,本文就是带你用pprof定位内存泄露问题。

关于Go的内存泄露有这么一句话不知道你听过没有:

10次内存泄露,有9次是goroutine泄露。

我所解决的问题,也是goroutine泄露导致的内存泄露,所以这篇文章主要介绍Go程序的goroutine泄露,掌握了如何定位和解决goroutine泄露,就掌握了内存泄露的大部分场景

本文草稿最初数据都是生产坏境数据,为了防止敏感内容泄露,全部替换成了demo数据,demo的数据比生产环境数据简单多了,更适合入门理解,有助于掌握pprof。

go pprof基本知识

定位goroutine泄露会使用到pprof,pprof是Go的性能工具,在开始介绍内存泄露前,先简单介绍下pprof的基本使用,更详细的使用给大家推荐了资料。

什么是pprof

pprof是Go的性能分析工具,在程序运行过程中,可以记录程序的运行信息,可以是CPU使用情况、内存使用情况、goroutine运行情况等,当需要性能调优或者定位Bug时候,这些记录的信息是相当重要。

基本使用

使用pprof有多种方式,Go已经现成封装好了1个:net/http/pprof,使用简单的几行命令,就可以开启pprof,记录运行信息,并且提供了Web服务,能够通过浏览器和命令行2种方式获取运行数据。

看个最简单的pprof的例子:

文件:golang_step_by_step/pprof/pprof/demo.go

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    // 开启pprof,监听请求
    ip := "0.0.0.0:6060"
    if err := http.ListenAndServe(ip, nil); err != nil {
        fmt.Printf("start pprof failed on %s\n", ip)
    }
}

提醒:本文所有代码部分可左右滑动

浏览器方式

image-20190516173924325

输入网址ip:port/debug/pprof/打开pprof主页,从上到下依次是5类profile信息

  1. block:goroutine的阻塞信息,本例就截取自一个goroutine阻塞的demo,但block为0,没掌握block的用法
  2. goroutine:所有goroutine的信息,下面的full goroutine stack dump是输出所有goroutine的调用栈,是goroutine的debug=2,后面会详细介绍。
  3. heap:堆内存的信息
  4. mutex:锁的信息
  5. threadcreate:线程信息

这篇文章我们主要关注goroutine和heap,这两个都会打印调用栈信息,goroutine里面还会包含goroutine的数量信息,heap则是内存分配信息,本文用不到的地方就不展示了,最后推荐几篇文章大家去看。

命令行方式

当连接在服务器终端上的时候,是没有浏览器可以使用的,Go提供了命令行的方式,能够获取以上5类信息,这种方式用起来更方便。

使用命令go tool pprof url可以获取指定的profile文件,此命令会发起http请求,然后下载数据到本地,之后进入交互式模式,就像gdb一样,可以使用命令查看运行信息,以下是5类请求的方式:

# 下载cpu profile,默认从当前开始收集30s的cpu使用情况,需要等待30s
go tool pprof http://localhost:6060/debug/pprof/profile   # 30-second CPU profile
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=120     # wait 120s

# 下载heap profile
go tool pprof http://localhost:6060/debug/pprof/heap      # heap profile

# 下载goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine # goroutine profile

# 下载block profile
go tool pprof http://localhost:6060/debug/pprof/block     # goroutine blocking profile

# 下载mutex profile
go tool pprof http://localhost:6060/debug/pprof/mutex

上面的pprof/demo.go太简单了,如果去获取内存profile,几乎获取不到什么,换一个Demo进行内存profile的展示:

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

上面这个demo会不断的申请内存,把它编译运行起来,然后执行:

$ go tool pprof http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz       //<--- 下载到的内存profile文件
File: demo // 程序名称
Build ID: a9069a125ee9c0df3713b2149ca859e8d4d11d5a
Type: inuse_space
Time: May 16, 2019 at 8:55pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) help  // 使用help打印所有可用命令
  Commands:
    callgrind        Outputs a graph in callgrind format
    comments         Output all profile comments
    disasm           Output assembly listings annotated with samples
    dot              Outputs a graph in DOT format
    eog              Visualize graph through eog
    evince           Visualize graph through evince
    gif              Outputs a graph image in GIF format
    gv               Visualize graph through gv
    kcachegrind      Visualize report in KCachegrind
    list             Output annotated source for functions matching regexp
    pdf              Outputs a graph in PDF format
    peek             Output callers/callees of functions matching regexp
    png              Outputs a graph image in PNG format
    proto            Outputs the profile in compressed protobuf format
    ps               Outputs a graph in PS format
    raw              Outputs a text representation of the raw profile
    svg              Outputs a graph in SVG format
    tags             Outputs all tags in the profile
    text             Outputs top entries in text form
    top              Outputs top entries in text form
    topproto         Outputs top entries in compressed protobuf format
    traces           Outputs all profile samples in text form
    tree             Outputs a text rendering of call graph
    web              Visualize graph through web browser
    weblist          Display annotated source in a web browser
    o/options        List options and their current values
    quit/exit/^D     Exit pprof
    
    ....

以上信息我们只关注2个地方:

  1. 下载得到的文件:/home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz,这其中包含了程序名demo,profile类型alloc已分配的内存,inuse代表使用中的内存。
  2. help可以获取帮助,最先会列出支持的命令,想掌握pprof,要多看看,多尝试。

关于命令,本文只会用到3个,我认为也是最常用的:toplisttraces,分别介绍一下。

top

按指标大小列出前10个函数,比如内存是按内存占用多少,CPU是按执行时间多少。

(pprof) top
Showing nodes accounting for 814.62MB, 100% of 814.62MB total
      flat  flat%   sum%        cum   cum%
  814.62MB   100%   100%   814.62MB   100%  main.main
         0     0%   100%   814.62MB   100%  runtime.main

top会列出5个统计数据:

  • flat: 本函数占用的内存量。
  • flat%: 本函数内存占使用中内存总量的百分比。
  • sum%: 前面每一行flat百分比的和,比如第2行虽然的100% 是 100% + 0%。
  • cum: 是累计量,加入main函数调用了函数f,函数f占用的内存量,也会记进来。
  • cum%: 是累计量占总量的百分比。

list

查看某个函数的代码,以及该函数每行代码的指标信息,如果函数名不明确,会进行模糊匹配,比如list main会列出main.mainruntime.main

(pprof) list main.main  // 精确列出函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  814.62MB   814.62MB     25:        buf = append(buf, make([]byte, 1024*1024)...)
         .          .     26:    }
         .          .     27:}
         .          .     28:
(pprof) list main  // 匹配所有函数名带main的函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
..... // 省略几行
         .          .     28:
ROUTINE ======================== runtime.main in /usr/lib/go-1.10/src/runtime/proc.go
         0   814.62MB (flat, cum)   100% of Total
         .          .    193:        // A program compiled with -buildmode=c-archive or c-shared
..... // 省略几行

可以看到在main.main中的第25行占用了814.62MB内存,左右2个数据分别是flat和cum,含义和top中解释的一样。

traces

打印所有调用栈,以及调用栈的指标信息。

(pprof) traces
File: demo2
Type: inuse_space
Time: May 16, 2019 at 7:08pm (CST)
-----------+-------------------------------------------------------
     bytes:  813.46MB
  813.46MB   main.main
             runtime.main
-----------+-------------------------------------------------------
     bytes:  650.77MB
         0   main.main
             runtime.main
....... // 省略几十行

每个- - - - - 隔开的是一个调用栈,能看到runtime.main调用了main.main,并且main.main中占用了813.46MB内存。

其他的profile操作和内存是类似的,这里就不展示了。

这里只是简单介绍本文用到的pprof的功能,pprof功能很强大,也经常和benchmark结合起来,但这不是本文的重点,所以就不多介绍了,为大家推荐几篇文章,一定要好好研读、实践:

  1. Go官方博客关于pprof的介绍,很详细,也包含样例,可以实操:Profiling Go Programs
  2. 跟煎鱼也讨论过pprof,煎鱼的这篇文章也很适合入门: Golang 大杀器之性能剖析 PProf

什么是内存泄露

内存泄露指的是程序运行过程中已不再使用的内存,没有被释放掉,导致这些内存无法被使用,直到程序结束这些内存才被释放的问题。

Go虽然有GC来回收不再使用的堆内存,减轻了开发人员对内存的管理负担,但这并不意味着Go程序不再有内存泄露问题。在Go程序中,如果没有Go语言的编程思维,也不遵守良好的编程实践,就可能埋下隐患,造成内存泄露问题。

怎么发现内存泄露

在Go中发现内存泄露有2种方法,一个是通用的监控工具,另一个是go pprof:

  1. 监控工具:固定周期对进程的内存占用情况进行采样,数据可视化后,根据内存占用走势(持续上升),很容易发现是否发生内存泄露。
  2. go pprof:适合没有监控工具的情况,使用Go提供的pprof工具判断是否发生内存泄露。

这2种方式分别介绍一下。

监控工具查看进程内在占用情况

如果使用云平台部署Go程序,云平台都提供了内存查看的工具,可以查看OS的内存占用情况和某个进程的内存占用情况,比如阿里云,我们在1个云主机上只部署了1个Go服务,所以OS的内存占用情况,基本是也反映了进程内存占用情况,OS内存占用情况如下,可以看到随着时间的推进,内存的占用率在不断的提高,这是内存泄露的最明显现象

image-20190512111200988

如果没有云平台这种内存监控工具,可以制作一个简单的内存记录工具。

1、建立一个脚本prog_mem.sh,获取进程占用的物理内存情况,脚本内容如下:

#!/bin/bash
prog_name="your_programe_name"
prog_mem=$(pidstat  -r -u -h -C $prog_name |awk 'NR==4{print $12}')
time=$(date "+%Y-%m-%d %H:%M:%S")
echo $time"\tmemory(Byte)\t"$prog_mem >>~/record/prog_mem.log

2、然后使用crontab建立定时任务,每分钟记录1次。使用crontab -e编辑crontab配置,在最后增加1行:

*/1 * * * * ~/record/prog_mem.sh

脚本输出的内容保存在prog_mem.log,只要大体浏览一下就可以发现内存的增长情况,判断是否存在内存泄露。如果需要可视化,可以直接黏贴prog_mem.log内容到Excel等表格工具,绘制内存占用图。

image-20190512172935195

go pprof发现存在内存问题

有情提醒:如果对pprof不了解,可以先看go pprof基本知识,这是下一节,看完再倒回来看。

如果你Google或者百度,Go程序内存泄露的文章,它总会告诉你使用pprof heap,能够生成漂亮的调用路径图,火焰图等等,然后你根据调用路径就能定位内存泄露问题,我最初也是对此深信不疑,尝试了若干天后,只是发现内存泄露跟某种场景有关,根本找不到内存泄露的根源,如果哪位朋友用heap就能定位内存泄露的线上问题,麻烦介绍下

后来读了Dave的《High Performance Go Workshop》,刷新了对heap的认识,内存pprof的简要内容如下:

image-20190512114048868

Dave讲了以下几点:

  1. 内存profiling记录的是堆内存分配的情况,以及调用栈信息,并不是进程完整的内存情况,猜测这也是在go pprof中称为heap而不是memory的原因。
  2. 栈内存的分配是在调用栈结束后会被释放的内存,所以并不在内存profile中
  3. 内存profiling是基于抽样的,默认是每1000次堆内存分配,执行1次profile记录。
  4. 因为内存profiling是基于抽样和它跟踪的是已分配的内存,而不是使用中的内存,(比如有些内存已经分配,看似使用,但实际以及不使用的内存,比如内存泄露的那部分),所以不能使用内存profiling衡量程序总体的内存使用情况
  5. Dave个人观点:使用内存profiling不能够发现内存泄露

基于目前对heap的认知,我有2个观点:

  1. heap能帮助我们发现内存问题,但不一定能发现内存泄露问题,这个看法与Dave是类似的。heap记录了内存分配的情况,我们能通过heap观察内存的变化,增长与减少,内存主要被哪些代码占用了,程序存在内存问题,这只能说明内存有使用不合理的地方,但并不能说明这是内存泄露。
  2. heap在帮助定位内存泄露原因上贡献的力量微乎其微。如第一条所言,能通过heap找到占用内存多的位置,但这个位置通常不一定是内存泄露,就算是内存泄露,也只是内存泄露的结果,并不是真正导致内存泄露的根源。

接下来,我介绍怎么用heap发现问题,然后再解释为什么heap几乎不能定位内存泄露的根因。

怎么用heap发现内存问题

使用pprof的heap能够获取程序运行时的内存信息,在程序平稳运行的情况下,每个一段时间使用heap获取内存的profile,然后使用base能够对比两个profile文件的差别,就像diff命令一样显示出增加和减少的变化,使用一个简单的demo来说明heap和base的使用,依然使用demo2进行展示。

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

将上面代码运行起来,执行以下命令获取profile文件,Ctrl-D退出,1分钟后再获取1次。

go tool pprof http://localhost:6060/debug/pprof/heap

我已经获取到了两个profile文件:

$ ls
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

使用base把001文件作为基准,然后用002和001对比,先执行toptop的对比,然后执行list main列出main函数的内存对比,结果如下:

$ go tool pprof -base pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

File: demo2
Type: inuse_space
Time: May 14, 2019 at 2:33pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 970.34MB, 32.30% of 3003.99MB total
      flat  flat%   sum%        cum   cum%
  970.34MB 32.30% 32.30%   970.34MB 32.30%  main.main   // 看这
         0     0% 32.30%   970.34MB 32.30%  runtime.main
(pprof)
(pprof)
(pprof) list main.main
Total: 2.93GB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  970.34MB   970.34MB (flat, cum) 32.30% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  970.34MB   970.34MB     25:        buf = append(buf, make([]byte, 1024*1024)...) // 看这
         .          .     26:    }
         .          .     27:}
         .          .     28:

top列出了main.mainruntime.mainmain.main就是我们编写的main函数,runtime.main是runtime包中的main函数,也就是所有main函数的入口,这里不多介绍了,有兴趣可以看之前的调度器文章《Go调度器系列(2)宏观看调度器》

top显示main.main 第2次内存占用,比第1次内存占用多了970.34MB。

list main.main告诉了我们增长的内存都在这一行:

buf = append(buf, make([]byte, 1024*1024)...)

001和002 profile的文件不进去看了,你本地测试下计算差值,绝对是刚才对比出的970.34MB。

heap“不能”定位内存泄露

heap能显示内存的分配情况,以及哪行代码占用了多少内存,我们能轻易的找到占用内存最多的地方,如果这个地方的数值还在不断怎大,基本可以认定这里就是内存泄露的位置。

曾想按图索骥,从内存泄露的位置,根据调用栈向上查找,总能找到内存泄露的原因,这种方案看起来是不错的,但实施起来却找不到内存泄露的原因,结果是事半功倍。

原因在于一个Go程序,其中有大量的goroutine,这其中的调用关系也许有点复杂,也许内存泄露是在某个三方包里。举个栗子,比如下面这幅图,每个椭圆代表1个goroutine,其中的数字为编号,箭头代表调用关系。heap profile显示g111(最下方标红节点)这个协程的代码出现了泄露,任何一个从g101到g111的调用路径都可能造成了g111的内存泄露,有2类可能:

  1. 该goroutine只调用了少数几次,但消耗了大量的内存,说明每个goroutine调用都消耗了不少内存,内存泄露的原因基本就在该协程内部
  2. 该goroutine的调用次数非常多,虽然每个协程调用过程中消耗的内存不多,但该调用路径上,协程数量巨大,造成消耗大量的内存,并且这些goroutine由于某种原因无法退出,占用的内存不会释放,内存泄露的原因在到g111调用路径上某段代码实现有问题,造成创建了大量的g111

第2种情况,就是goroutine泄露,这是通过heap无法发现的,所以heap在定位内存泄露这件事上,发挥的作用不大

image-20190512144150064


goroutine泄露怎么导致内存泄露

什么是goroutine泄露

如果你启动了1个goroutine,但并没有符合预期的退出,直到程序结束,此goroutine才退出,这种情况就是goroutine泄露。

提前思考:什么会导致goroutine无法退出/阻塞?

goroutine泄露怎么导致内存泄露

每个goroutine占用2KB内存,泄露1百万goroutine至少泄露2KB * 1000000 = 2GB内存,为什么说至少呢?

goroutine执行过程中还存在一些变量,如果这些变量指向堆内存中的内存,GC会认为这些内存仍在使用,不会对其进行回收,这些内存谁都无法使用,造成了内存泄露。

所以goroutine泄露有2种方式造成内存泄露:

  1. goroutine本身的栈所占用的空间造成内存泄露。
  2. goroutine中的变量所占用的堆内存导致堆内存泄露,这一部分是能通过heap profile体现出来的。

Dave在文章中也提到了,如果不知道何时停止一个goroutine,这个goroutine就是潜在的内存泄露:

7.1.1 Know when to stop a goroutine

If you don’t know the answer, that’s a potential memory leak as the goroutine will pin its stack’s memory on the heap, as well as any heap allocated variables reachable from the stack.

怎么确定是goroutine泄露引发的内存泄露

掌握了前面的pprof命令行的基本用法,很快就可以确认是否是goroutine泄露导致内存泄露,如果你不记得了,马上回去看一下go pprof基本知识

判断依据:在节点正常运行的情况下,隔一段时间获取goroutine的数量,如果后面获取的那次,某些goroutine比前一次多,如果多获取几次,是持续增长的,就极有可能是goroutine泄露

goroutine导致内存泄露的demo:

文件:golang_step_by_step/pprof/goroutine/leak_demo1.go

// goroutine泄露导致内存泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    outCh := make(chan int)
    // 死代码,永不读取
    go func() {
        if false {
            <-outCh
        }
        select {}
    }()

    // 每s起100个goroutine,goroutine会阻塞,不释放内存
    tick := time.Tick(time.Second / 100)
    i := 0
    for range tick {
        i++
        fmt.Println(i)
        alloc1(outCh)
    }
}

func alloc1(outCh chan<- int) {
    go alloc2(outCh)
}

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

编译并运行以上代码,然后使用go tool pprof获取gorourine的profile文件。

go tool pprof http://localhost:6060/debug/pprof/goroutine

已经通过pprof命令获取了2个goroutine的profile文件:

$ ls
/home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz
/home/ubuntu/pprof/pprof.leak_demo.goroutine.002.pb.gz

同heap一样,我们可以使用base对比2个goroutine profile文件:

$go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz

File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)

可以看到运行到runtime.gopark的goroutine数量增加了20312个。再通过002文件,看一眼执行到gopark的goroutine数量,即挂起的goroutine数量:

go tool pprof pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:47pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 24330, 100% of 24331 total
Dropped 32 nodes (cum <= 121)
      flat  flat%   sum%        cum   cum%
     24330   100%   100%      24330   100%  runtime.gopark
         0     0%   100%      24326   100%  main.alloc2
         0     0%   100%      24326   100%  main.alloc2.func1
         0     0%   100%      24326   100%  runtime.chansend
         0     0%   100%      24326   100%  runtime.chansend1
         0     0%   100%      24327   100%  runtime.goparkunlock

显示有24330个goroutine被挂起,这不是goroutine泄露这是啥?已经能确定八九成goroutine泄露了。

是什么导致如此多的goroutine被挂起而无法退出?接下来就看怎么定位goroutine泄露。


定位goroutine泄露的2种方法

使用pprof有2种方式,一种是web网页,一种是go tool pprof命令行交互,这两种方法查看goroutine都支持,但有轻微不同,也有各自的优缺点。

我们先看Web的方式,再看命令行交互的方式,这两种都很好使用,结合起来用也不错。

Web可视化查看

Web方式适合web服务器的端口能访问的情况,使用起来方便,有2种方式:

  1. 查看某条调用路径上,当前阻塞在此goroutine的数量
  2. 查看所有goroutine的运行栈(调用路径),可以显示阻塞在此的时间

方式一

url请求中设置debug=1:

http://ip:port/debug/pprof/goroutine?debug=1

效果如下:

看起来密密麻麻的,其实简单又十分有用,看上图标出来的部分,手机上图看起来可能不方便,那就放大图片,或直接看下面各字段的含义:

  1. goroutine profile: total 32023:32023是goroutine的总数量
  2. 32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 ...:32015代表当前有32015个goroutine运行这个调用栈,并且停在相同位置,@后面的十六进制,现在用不到这个数据,所以暂不深究了。
  3. 下面是当前goroutine的调用栈,列出了函数和所在文件的行数,这个行数对定位很有帮助,如下:
32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 0x6d8559 0x6d831b 0x45abe1
#    0x6d8558    main.alloc2.func1+0xf8    /home/ubuntu/heap/leak_demo.go:53
#    0x6d831a    main.alloc2+0x2a    /home/ubuntu/heap/leak_demo.go:54

根据上面的提示,就能判断32015个goroutine运行到leak_demo.go的53行:

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

阻塞的原因是outCh这个写操作无法完成,outCh是无缓冲的通道,并且由于以下代码是死代码,所以goroutine始终没有从outCh读数据,造成outCh阻塞,进而造成无数个alloc2的goroutine阻塞,形成内存泄露:

if false {
    <-outCh
}

方式二

url请求中设置debug=2:

http://ip:port/debug/pprof/goroutine?debug=2

第2种方式和第1种方式是互补的,它可以看到每个goroutine的信息:

  1. goroutine 20 [chan send, 2 minutes]:20是goroutine id,[]中是当前goroutine的状态,阻塞在写channel,并且阻塞了2分钟,长时间运行的系统,你能看到阻塞时间更长的情况。
  2. 同时,也可以看到调用栈,看当前执行停到哪了:leak_demo.go的53行,
goroutine 20 [chan send, 2 minutes]:
main.alloc2.func1(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:53 +0xf9  // 这
main.alloc2(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:54 +0x2b
created by main.alloc1
    /home/ubuntu/heap/leak_demo.go:42 +0x3f

命令行交互式方法

Web的方法是简单粗暴,无需登录服务器,浏览器打开看看就行了。但就像前面提的,没有浏览器可访问时,命令行交互式才是最佳的方式,并且也是手到擒来,感觉比Web一样方便。

命令行交互式只有1种获取goroutine profile的方法,不像Web网页分debug=1debug=22中方式,并将profile文件保存到本地:

// 注意命令没有`debug=1`,debug=1,加debug有些版本的go不支持
$ go tool pprof http://0.0.0.0:6060/debug/pprof/goroutine
Fetching profile over HTTP from http://localhost:6061/debug/pprof/goroutine
Saved profile in /home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz  // profile文件保存位置
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)

命令行只需要掌握3个命令就好了,上面介绍过了,详细的倒回去看top, list, traces

  1. top:显示正运行到某个函数goroutine的数量
  2. traces:显示所有goroutine的调用栈
  3. list:列出代码详细的信息。

我们依然使用leak_demo.go这个demo,

$  go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)
(pprof) traces
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
-----------+-------------------------------------------------------
     20312   runtime.gopark
             runtime.goparkunlock
             runtime.chansend
             runtime.chansend1 // channel发送
             main.alloc2.func1 // alloc2中的匿名函数
             main.alloc2
-----------+-------------------------------------------------------

top命令在怎么确定是goroutine泄露引发的内存泄露介绍过了,直接看traces命令,traces能列出002中比001中多的那些goroutine的调用栈,这里只有1个调用栈,有20312个goroutine都执行这个调用路径,可以看到alloc2中的匿名函数alloc2.func1调用了写channel的操作,然后阻塞挂起了goroutine,使用list列出alloc2.func1的代码,显示有20312个goroutine阻塞在53行:

(pprof) list main.alloc2.func1
Total: 20312
ROUTINE ======================== main.alloc2.func1 in /home/ubuntu/heap/leak_demo.go
         0      20312 (flat, cum)   100% of Total
         .          .     48:        // 分配内存,假用一下
         .          .     49:        buf := make([]byte, 1024*1024*10)
         .          .     50:        _ = len(buf)
         .          .     51:        fmt.Println("alloc done")
         .          .     52:
         .      20312     53:        outCh <- 0  // 看这
         .          .     54:    }()
         .          .     55:}
         .          .     56:

友情提醒:使用list命令的前提是程序的源码在当前机器,不然可没法列出源码。服务器上,通常没有源码,那我们咋办呢?刚才介绍了Web查看的方式,那里会列出代码行数,我们可以使用wget下载网页:

$ wget http://localhost:6060/debug/pprof/goroutine?debug=1

下载网页后,使用编辑器打开文件,使用关键字main.alloc2.func1进行搜索,找到与当前相同的调用栈,就可以看到该goroutine阻塞在哪一行了,不要忘记使用debug=2还可以看到阻塞了多久和原因,Web方式中已经介绍了,此处省略代码几十行。


总结

文章略长,但全是干货,感谢阅读到这。然读到着了,跟定很想掌握pprof,建议实践一把,现在和大家温习一把本文的主要内容。

goroutine泄露的本质

goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露。

goroutine泄露的发现和定位

利用好go pprof获取goroutine profile文件,然后利用3个命令top、traces、list定位内存泄露的原因。

goroutine泄露的场景

泄露的场景不仅限于以下两类,但因channel相关的泄露是最多的。

  1. channel的读或者写:

    1. 无缓冲channel的阻塞通常是写操作因为没有读而阻塞
    2. 有缓冲的channel因为缓冲区满了,写操作阻塞
    3. 期待从channel读数据,结果没有goroutine写
  2. select操作,select里也是channel操作,如果所有case上的操作阻塞,goroutine也无法继续执行。

编码goroutine泄露的建议

为避免goroutine泄露造成内存泄露,启动goroutine前要思考清楚:

  1. goroutine如何退出?
  2. 是否会有阻塞造成无法退出?如果有,那么这个路径是否会创建大量的goroutine?

示例源码

本文所有示例源码,及历史文章、代码都存储在Github,阅读原文可直接跳转,Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/pprof

推荐阅读

这些既是参考资料也是推荐阅读的文章,不容错过。

【Go Blog关于pprof详细介绍和Demo】 https://blog.golang.org/profi...

【Dave关于高性能Go程序的workshop】 https://dave.cheney.net/high-...

【煎鱼pprof文章,很适合入门 Golang大杀器之性能剖析PProf】 https://segmentfault.com/a/11...

【SO上goroutine调用栈各字段的介绍】https://stackoverflow.com/a/3...

【我的老文,有runtime.main的介绍,想学习调度器,可以看下系列文章 Go调度器系列(2)宏观看调度器】http://lessisbetter.site/2019...

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/05/18/go-goroutine-leak/

查看原文

大彬 发布了文章 · 2019-05-18

实战Go内存泄露

最近解决了我们项目中的一个内存泄露问题,事实再次证明pprof是一个好工具,但掌握好工具的正确用法,才能发挥好工具的威力,不然就算你手里有屠龙刀,也成不了天下第一,本文就是带你用pprof定位内存泄露问题。

关于Go的内存泄露有这么一句话不知道你听过没有:

10次内存泄露,有9次是goroutine泄露。

我所解决的问题,也是goroutine泄露导致的内存泄露,所以这篇文章主要介绍Go程序的goroutine泄露,掌握了如何定位和解决goroutine泄露,就掌握了内存泄露的大部分场景

本文草稿最初数据都是生产坏境数据,为了防止敏感内容泄露,全部替换成了demo数据,demo的数据比生产环境数据简单多了,更适合入门理解,有助于掌握pprof。

go pprof基本知识

定位goroutine泄露会使用到pprof,pprof是Go的性能工具,在开始介绍内存泄露前,先简单介绍下pprof的基本使用,更详细的使用给大家推荐了资料。

什么是pprof

pprof是Go的性能分析工具,在程序运行过程中,可以记录程序的运行信息,可以是CPU使用情况、内存使用情况、goroutine运行情况等,当需要性能调优或者定位Bug时候,这些记录的信息是相当重要。

基本使用

使用pprof有多种方式,Go已经现成封装好了1个:net/http/pprof,使用简单的几行命令,就可以开启pprof,记录运行信息,并且提供了Web服务,能够通过浏览器和命令行2种方式获取运行数据。

看个最简单的pprof的例子:

文件:golang_step_by_step/pprof/pprof/demo.go

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    // 开启pprof,监听请求
    ip := "0.0.0.0:6060"
    if err := http.ListenAndServe(ip, nil); err != nil {
        fmt.Printf("start pprof failed on %s\n", ip)
    }
}

提醒:本文所有代码部分可左右滑动

浏览器方式

image-20190516173924325

输入网址ip:port/debug/pprof/打开pprof主页,从上到下依次是5类profile信息

  1. block:goroutine的阻塞信息,本例就截取自一个goroutine阻塞的demo,但block为0,没掌握block的用法
  2. goroutine:所有goroutine的信息,下面的full goroutine stack dump是输出所有goroutine的调用栈,是goroutine的debug=2,后面会详细介绍。
  3. heap:堆内存的信息
  4. mutex:锁的信息
  5. threadcreate:线程信息

这篇文章我们主要关注goroutine和heap,这两个都会打印调用栈信息,goroutine里面还会包含goroutine的数量信息,heap则是内存分配信息,本文用不到的地方就不展示了,最后推荐几篇文章大家去看。

命令行方式

当连接在服务器终端上的时候,是没有浏览器可以使用的,Go提供了命令行的方式,能够获取以上5类信息,这种方式用起来更方便。

使用命令go tool pprof url可以获取指定的profile文件,此命令会发起http请求,然后下载数据到本地,之后进入交互式模式,就像gdb一样,可以使用命令查看运行信息,以下是5类请求的方式:

# 下载cpu profile,默认从当前开始收集30s的cpu使用情况,需要等待30s
go tool pprof http://localhost:6060/debug/pprof/profile   # 30-second CPU profile
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=120     # wait 120s

# 下载heap profile
go tool pprof http://localhost:6060/debug/pprof/heap      # heap profile

# 下载goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine # goroutine profile

# 下载block profile
go tool pprof http://localhost:6060/debug/pprof/block     # goroutine blocking profile

# 下载mutex profile
go tool pprof http://localhost:6060/debug/pprof/mutex

上面的pprof/demo.go太简单了,如果去获取内存profile,几乎获取不到什么,换一个Demo进行内存profile的展示:

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

上面这个demo会不断的申请内存,把它编译运行起来,然后执行:

$ go tool pprof http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz       //<--- 下载到的内存profile文件
File: demo // 程序名称
Build ID: a9069a125ee9c0df3713b2149ca859e8d4d11d5a
Type: inuse_space
Time: May 16, 2019 at 8:55pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) help  // 使用help打印所有可用命令
  Commands:
    callgrind        Outputs a graph in callgrind format
    comments         Output all profile comments
    disasm           Output assembly listings annotated with samples
    dot              Outputs a graph in DOT format
    eog              Visualize graph through eog
    evince           Visualize graph through evince
    gif              Outputs a graph image in GIF format
    gv               Visualize graph through gv
    kcachegrind      Visualize report in KCachegrind
    list             Output annotated source for functions matching regexp
    pdf              Outputs a graph in PDF format
    peek             Output callers/callees of functions matching regexp
    png              Outputs a graph image in PNG format
    proto            Outputs the profile in compressed protobuf format
    ps               Outputs a graph in PS format
    raw              Outputs a text representation of the raw profile
    svg              Outputs a graph in SVG format
    tags             Outputs all tags in the profile
    text             Outputs top entries in text form
    top              Outputs top entries in text form
    topproto         Outputs top entries in compressed protobuf format
    traces           Outputs all profile samples in text form
    tree             Outputs a text rendering of call graph
    web              Visualize graph through web browser
    weblist          Display annotated source in a web browser
    o/options        List options and their current values
    quit/exit/^D     Exit pprof
    
    ....

以上信息我们只关注2个地方:

  1. 下载得到的文件:/home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz,这其中包含了程序名demo,profile类型alloc已分配的内存,inuse代表使用中的内存。
  2. help可以获取帮助,最先会列出支持的命令,想掌握pprof,要多看看,多尝试。

关于命令,本文只会用到3个,我认为也是最常用的:toplisttraces,分别介绍一下。

top

按指标大小列出前10个函数,比如内存是按内存占用多少,CPU是按执行时间多少。

(pprof) top
Showing nodes accounting for 814.62MB, 100% of 814.62MB total
      flat  flat%   sum%        cum   cum%
  814.62MB   100%   100%   814.62MB   100%  main.main
         0     0%   100%   814.62MB   100%  runtime.main

top会列出5个统计数据:

  • flat: 本函数占用的内存量。
  • flat%: 本函数内存占使用中内存总量的百分比。
  • sum%: 前面每一行flat百分比的和,比如第2行虽然的100% 是 100% + 0%。
  • cum: 是累计量,加入main函数调用了函数f,函数f占用的内存量,也会记进来。
  • cum%: 是累计量占总量的百分比。

list

查看某个函数的代码,以及该函数每行代码的指标信息,如果函数名不明确,会进行模糊匹配,比如list main会列出main.mainruntime.main

(pprof) list main.main  // 精确列出函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  814.62MB   814.62MB     25:        buf = append(buf, make([]byte, 1024*1024)...)
         .          .     26:    }
         .          .     27:}
         .          .     28:
(pprof) list main  // 匹配所有函数名带main的函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
..... // 省略几行
         .          .     28:
ROUTINE ======================== runtime.main in /usr/lib/go-1.10/src/runtime/proc.go
         0   814.62MB (flat, cum)   100% of Total
         .          .    193:        // A program compiled with -buildmode=c-archive or c-shared
..... // 省略几行

可以看到在main.main中的第25行占用了814.62MB内存,左右2个数据分别是flat和cum,含义和top中解释的一样。

traces

打印所有调用栈,以及调用栈的指标信息。

(pprof) traces
File: demo2
Type: inuse_space
Time: May 16, 2019 at 7:08pm (CST)
-----------+-------------------------------------------------------
     bytes:  813.46MB
  813.46MB   main.main
             runtime.main
-----------+-------------------------------------------------------
     bytes:  650.77MB
         0   main.main
             runtime.main
....... // 省略几十行

每个- - - - - 隔开的是一个调用栈,能看到runtime.main调用了main.main,并且main.main中占用了813.46MB内存。

其他的profile操作和内存是类似的,这里就不展示了。

这里只是简单介绍本文用到的pprof的功能,pprof功能很强大,也经常和benchmark结合起来,但这不是本文的重点,所以就不多介绍了,为大家推荐几篇文章,一定要好好研读、实践:

  1. Go官方博客关于pprof的介绍,很详细,也包含样例,可以实操:Profiling Go Programs
  2. 跟煎鱼也讨论过pprof,煎鱼的这篇文章也很适合入门: Golang 大杀器之性能剖析 PProf

什么是内存泄露

内存泄露指的是程序运行过程中已不再使用的内存,没有被释放掉,导致这些内存无法被使用,直到程序结束这些内存才被释放的问题。

Go虽然有GC来回收不再使用的堆内存,减轻了开发人员对内存的管理负担,但这并不意味着Go程序不再有内存泄露问题。在Go程序中,如果没有Go语言的编程思维,也不遵守良好的编程实践,就可能埋下隐患,造成内存泄露问题。

怎么发现内存泄露

在Go中发现内存泄露有2种方法,一个是通用的监控工具,另一个是go pprof:

  1. 监控工具:固定周期对进程的内存占用情况进行采样,数据可视化后,根据内存占用走势(持续上升),很容易发现是否发生内存泄露。
  2. go pprof:适合没有监控工具的情况,使用Go提供的pprof工具判断是否发生内存泄露。

这2种方式分别介绍一下。

监控工具查看进程内在占用情况

如果使用云平台部署Go程序,云平台都提供了内存查看的工具,可以查看OS的内存占用情况和某个进程的内存占用情况,比如阿里云,我们在1个云主机上只部署了1个Go服务,所以OS的内存占用情况,基本是也反映了进程内存占用情况,OS内存占用情况如下,可以看到随着时间的推进,内存的占用率在不断的提高,这是内存泄露的最明显现象

image-20190512111200988

如果没有云平台这种内存监控工具,可以制作一个简单的内存记录工具。

1、建立一个脚本prog_mem.sh,获取进程占用的物理内存情况,脚本内容如下:

#!/bin/bash
prog_name="your_programe_name"
prog_mem=$(pidstat  -r -u -h -C $prog_name |awk 'NR==4{print $12}')
time=$(date "+%Y-%m-%d %H:%M:%S")
echo $time"\tmemory(Byte)\t"$prog_mem >>~/record/prog_mem.log

2、然后使用crontab建立定时任务,每分钟记录1次。使用crontab -e编辑crontab配置,在最后增加1行:

*/1 * * * * ~/record/prog_mem.sh

脚本输出的内容保存在prog_mem.log,只要大体浏览一下就可以发现内存的增长情况,判断是否存在内存泄露。如果需要可视化,可以直接黏贴prog_mem.log内容到Excel等表格工具,绘制内存占用图。

image-20190512172935195

go pprof发现存在内存问题

有情提醒:如果对pprof不了解,可以先看go pprof基本知识,这是下一节,看完再倒回来看。

如果你Google或者百度,Go程序内存泄露的文章,它总会告诉你使用pprof heap,能够生成漂亮的调用路径图,火焰图等等,然后你根据调用路径就能定位内存泄露问题,我最初也是对此深信不疑,尝试了若干天后,只是发现内存泄露跟某种场景有关,根本找不到内存泄露的根源,如果哪位朋友用heap就能定位内存泄露的线上问题,麻烦介绍下

后来读了Dave的《High Performance Go Workshop》,刷新了对heap的认识,内存pprof的简要内容如下:

image-20190512114048868

Dave讲了以下几点:

  1. 内存profiling记录的是堆内存分配的情况,以及调用栈信息,并不是进程完整的内存情况,猜测这也是在go pprof中称为heap而不是memory的原因。
  2. 栈内存的分配是在调用栈结束后会被释放的内存,所以并不在内存profile中
  3. 内存profiling是基于抽样的,默认是每1000次堆内存分配,执行1次profile记录。
  4. 因为内存profiling是基于抽样和它跟踪的是已分配的内存,而不是使用中的内存,(比如有些内存已经分配,看似使用,但实际以及不使用的内存,比如内存泄露的那部分),所以不能使用内存profiling衡量程序总体的内存使用情况
  5. Dave个人观点:使用内存profiling不能够发现内存泄露

基于目前对heap的认知,我有2个观点:

  1. heap能帮助我们发现内存问题,但不一定能发现内存泄露问题,这个看法与Dave是类似的。heap记录了内存分配的情况,我们能通过heap观察内存的变化,增长与减少,内存主要被哪些代码占用了,程序存在内存问题,这只能说明内存有使用不合理的地方,但并不能说明这是内存泄露。
  2. heap在帮助定位内存泄露原因上贡献的力量微乎其微。如第一条所言,能通过heap找到占用内存多的位置,但这个位置通常不一定是内存泄露,就算是内存泄露,也只是内存泄露的结果,并不是真正导致内存泄露的根源。

接下来,我介绍怎么用heap发现问题,然后再解释为什么heap几乎不能定位内存泄露的根因。

怎么用heap发现内存问题

使用pprof的heap能够获取程序运行时的内存信息,在程序平稳运行的情况下,每个一段时间使用heap获取内存的profile,然后使用base能够对比两个profile文件的差别,就像diff命令一样显示出增加和减少的变化,使用一个简单的demo来说明heap和base的使用,依然使用demo2进行展示。

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

将上面代码运行起来,执行以下命令获取profile文件,Ctrl-D退出,1分钟后再获取1次。

go tool pprof http://localhost:6060/debug/pprof/heap

我已经获取到了两个profile文件:

$ ls
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

使用base把001文件作为基准,然后用002和001对比,先执行toptop的对比,然后执行list main列出main函数的内存对比,结果如下:

$ go tool pprof -base pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

File: demo2
Type: inuse_space
Time: May 14, 2019 at 2:33pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 970.34MB, 32.30% of 3003.99MB total
      flat  flat%   sum%        cum   cum%
  970.34MB 32.30% 32.30%   970.34MB 32.30%  main.main   // 看这
         0     0% 32.30%   970.34MB 32.30%  runtime.main
(pprof)
(pprof)
(pprof) list main.main
Total: 2.93GB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  970.34MB   970.34MB (flat, cum) 32.30% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  970.34MB   970.34MB     25:        buf = append(buf, make([]byte, 1024*1024)...) // 看这
         .          .     26:    }
         .          .     27:}
         .          .     28:

top列出了main.mainruntime.mainmain.main就是我们编写的main函数,runtime.main是runtime包中的main函数,也就是所有main函数的入口,这里不多介绍了,有兴趣可以看之前的调度器文章《Go调度器系列(2)宏观看调度器》

top显示main.main 第2次内存占用,比第1次内存占用多了970.34MB。

list main.main告诉了我们增长的内存都在这一行:

buf = append(buf, make([]byte, 1024*1024)...)

001和002 profile的文件不进去看了,你本地测试下计算差值,绝对是刚才对比出的970.34MB。

heap“不能”定位内存泄露

heap能显示内存的分配情况,以及哪行代码占用了多少内存,我们能轻易的找到占用内存最多的地方,如果这个地方的数值还在不断怎大,基本可以认定这里就是内存泄露的位置。

曾想按图索骥,从内存泄露的位置,根据调用栈向上查找,总能找到内存泄露的原因,这种方案看起来是不错的,但实施起来却找不到内存泄露的原因,结果是事半功倍。

原因在于一个Go程序,其中有大量的goroutine,这其中的调用关系也许有点复杂,也许内存泄露是在某个三方包里。举个栗子,比如下面这幅图,每个椭圆代表1个goroutine,其中的数字为编号,箭头代表调用关系。heap profile显示g111(最下方标红节点)这个协程的代码出现了泄露,任何一个从g101到g111的调用路径都可能造成了g111的内存泄露,有2类可能:

  1. 该goroutine只调用了少数几次,但消耗了大量的内存,说明每个goroutine调用都消耗了不少内存,内存泄露的原因基本就在该协程内部
  2. 该goroutine的调用次数非常多,虽然每个协程调用过程中消耗的内存不多,但该调用路径上,协程数量巨大,造成消耗大量的内存,并且这些goroutine由于某种原因无法退出,占用的内存不会释放,内存泄露的原因在到g111调用路径上某段代码实现有问题,造成创建了大量的g111

第2种情况,就是goroutine泄露,这是通过heap无法发现的,所以heap在定位内存泄露这件事上,发挥的作用不大

image-20190512144150064


goroutine泄露怎么导致内存泄露

什么是goroutine泄露

如果你启动了1个goroutine,但并没有符合预期的退出,直到程序结束,此goroutine才退出,这种情况就是goroutine泄露。

提前思考:什么会导致goroutine无法退出/阻塞?

goroutine泄露怎么导致内存泄露

每个goroutine占用2KB内存,泄露1百万goroutine至少泄露2KB * 1000000 = 2GB内存,为什么说至少呢?

goroutine执行过程中还存在一些变量,如果这些变量指向堆内存中的内存,GC会认为这些内存仍在使用,不会对其进行回收,这些内存谁都无法使用,造成了内存泄露。

所以goroutine泄露有2种方式造成内存泄露:

  1. goroutine本身的栈所占用的空间造成内存泄露。
  2. goroutine中的变量所占用的堆内存导致堆内存泄露,这一部分是能通过heap profile体现出来的。

Dave在文章中也提到了,如果不知道何时停止一个goroutine,这个goroutine就是潜在的内存泄露:

7.1.1 Know when to stop a goroutine

If you don’t know the answer, that’s a potential memory leak as the goroutine will pin its stack’s memory on the heap, as well as any heap allocated variables reachable from the stack.

怎么确定是goroutine泄露引发的内存泄露

掌握了前面的pprof命令行的基本用法,很快就可以确认是否是goroutine泄露导致内存泄露,如果你不记得了,马上回去看一下go pprof基本知识

判断依据:在节点正常运行的情况下,隔一段时间获取goroutine的数量,如果后面获取的那次,某些goroutine比前一次多,如果多获取几次,是持续增长的,就极有可能是goroutine泄露

goroutine导致内存泄露的demo:

文件:golang_step_by_step/pprof/goroutine/leak_demo1.go

// goroutine泄露导致内存泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    outCh := make(chan int)
    // 死代码,永不读取
    go func() {
        if false {
            <-outCh
        }
        select {}
    }()

    // 每s起100个goroutine,goroutine会阻塞,不释放内存
    tick := time.Tick(time.Second / 100)
    i := 0
    for range tick {
        i++
        fmt.Println(i)
        alloc1(outCh)
    }
}

func alloc1(outCh chan<- int) {
    go alloc2(outCh)
}

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

编译并运行以上代码,然后使用go tool pprof获取gorourine的profile文件。

go tool pprof http://localhost:6060/debug/pprof/goroutine

已经通过pprof命令获取了2个goroutine的profile文件:

$ ls
/home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz
/home/ubuntu/pprof/pprof.leak_demo.goroutine.002.pb.gz

同heap一样,我们可以使用base对比2个goroutine profile文件:

$go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz

File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)

可以看到运行到runtime.gopark的goroutine数量增加了20312个。再通过002文件,看一眼执行到gopark的goroutine数量,即挂起的goroutine数量:

go tool pprof pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:47pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 24330, 100% of 24331 total
Dropped 32 nodes (cum <= 121)
      flat  flat%   sum%        cum   cum%
     24330   100%   100%      24330   100%  runtime.gopark
         0     0%   100%      24326   100%  main.alloc2
         0     0%   100%      24326   100%  main.alloc2.func1
         0     0%   100%      24326   100%  runtime.chansend
         0     0%   100%      24326   100%  runtime.chansend1
         0     0%   100%      24327   100%  runtime.goparkunlock

显示有24330个goroutine被挂起,这不是goroutine泄露这是啥?已经能确定八九成goroutine泄露了。

是什么导致如此多的goroutine被挂起而无法退出?接下来就看怎么定位goroutine泄露。


定位goroutine泄露的2种方法

使用pprof有2种方式,一种是web网页,一种是go tool pprof命令行交互,这两种方法查看goroutine都支持,但有轻微不同,也有各自的优缺点。

我们先看Web的方式,再看命令行交互的方式,这两种都很好使用,结合起来用也不错。

Web可视化查看

Web方式适合web服务器的端口能访问的情况,使用起来方便,有2种方式:

  1. 查看某条调用路径上,当前阻塞在此goroutine的数量
  2. 查看所有goroutine的运行栈(调用路径),可以显示阻塞在此的时间

方式一

url请求中设置debug=1:

http://ip:port/debug/pprof/goroutine?debug=1

效果如下:

看起来密密麻麻的,其实简单又十分有用,看上图标出来的部分,手机上图看起来可能不方便,那就放大图片,或直接看下面各字段的含义:

  1. goroutine profile: total 32023:32023是goroutine的总数量
  2. 32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 ...:32015代表当前有32015个goroutine运行这个调用栈,并且停在相同位置,@后面的十六进制,现在用不到这个数据,所以暂不深究了。
  3. 下面是当前goroutine的调用栈,列出了函数和所在文件的行数,这个行数对定位很有帮助,如下:
32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 0x6d8559 0x6d831b 0x45abe1
#    0x6d8558    main.alloc2.func1+0xf8    /home/ubuntu/heap/leak_demo.go:53
#    0x6d831a    main.alloc2+0x2a    /home/ubuntu/heap/leak_demo.go:54

根据上面的提示,就能判断32015个goroutine运行到leak_demo.go的53行:

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

阻塞的原因是outCh这个写操作无法完成,outCh是无缓冲的通道,并且由于以下代码是死代码,所以goroutine始终没有从outCh读数据,造成outCh阻塞,进而造成无数个alloc2的goroutine阻塞,形成内存泄露:

if false {
    <-outCh
}

方式二

url请求中设置debug=2:

http://ip:port/debug/pprof/goroutine?debug=2

第2种方式和第1种方式是互补的,它可以看到每个goroutine的信息:

  1. goroutine 20 [chan send, 2 minutes]:20是goroutine id,[]中是当前goroutine的状态,阻塞在写channel,并且阻塞了2分钟,长时间运行的系统,你能看到阻塞时间更长的情况。
  2. 同时,也可以看到调用栈,看当前执行停到哪了:leak_demo.go的53行,
goroutine 20 [chan send, 2 minutes]:
main.alloc2.func1(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:53 +0xf9  // 这
main.alloc2(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:54 +0x2b
created by main.alloc1
    /home/ubuntu/heap/leak_demo.go:42 +0x3f

命令行交互式方法

Web的方法是简单粗暴,无需登录服务器,浏览器打开看看就行了。但就像前面提的,没有浏览器可访问时,命令行交互式才是最佳的方式,并且也是手到擒来,感觉比Web一样方便。

命令行交互式只有1种获取goroutine profile的方法,不像Web网页分debug=1debug=22中方式,并将profile文件保存到本地:

// 注意命令没有`debug=1`,debug=1,加debug有些版本的go不支持
$ go tool pprof http://0.0.0.0:6060/debug/pprof/goroutine
Fetching profile over HTTP from http://localhost:6061/debug/pprof/goroutine
Saved profile in /home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz  // profile文件保存位置
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)

命令行只需要掌握3个命令就好了,上面介绍过了,详细的倒回去看top, list, traces

  1. top:显示正运行到某个函数goroutine的数量
  2. traces:显示所有goroutine的调用栈
  3. list:列出代码详细的信息。

我们依然使用leak_demo.go这个demo,

$  go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)
(pprof) traces
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
-----------+-------------------------------------------------------
     20312   runtime.gopark
             runtime.goparkunlock
             runtime.chansend
             runtime.chansend1 // channel发送
             main.alloc2.func1 // alloc2中的匿名函数
             main.alloc2
-----------+-------------------------------------------------------

top命令在怎么确定是goroutine泄露引发的内存泄露介绍过了,直接看traces命令,traces能列出002中比001中多的那些goroutine的调用栈,这里只有1个调用栈,有20312个goroutine都执行这个调用路径,可以看到alloc2中的匿名函数alloc2.func1调用了写channel的操作,然后阻塞挂起了goroutine,使用list列出alloc2.func1的代码,显示有20312个goroutine阻塞在53行:

(pprof) list main.alloc2.func1
Total: 20312
ROUTINE ======================== main.alloc2.func1 in /home/ubuntu/heap/leak_demo.go
         0      20312 (flat, cum)   100% of Total
         .          .     48:        // 分配内存,假用一下
         .          .     49:        buf := make([]byte, 1024*1024*10)
         .          .     50:        _ = len(buf)
         .          .     51:        fmt.Println("alloc done")
         .          .     52:
         .      20312     53:        outCh <- 0  // 看这
         .          .     54:    }()
         .          .     55:}
         .          .     56:

友情提醒:使用list命令的前提是程序的源码在当前机器,不然可没法列出源码。服务器上,通常没有源码,那我们咋办呢?刚才介绍了Web查看的方式,那里会列出代码行数,我们可以使用wget下载网页:

$ wget http://localhost:6060/debug/pprof/goroutine?debug=1

下载网页后,使用编辑器打开文件,使用关键字main.alloc2.func1进行搜索,找到与当前相同的调用栈,就可以看到该goroutine阻塞在哪一行了,不要忘记使用debug=2还可以看到阻塞了多久和原因,Web方式中已经介绍了,此处省略代码几十行。


总结

文章略长,但全是干货,感谢阅读到这。然读到着了,跟定很想掌握pprof,建议实践一把,现在和大家温习一把本文的主要内容。

goroutine泄露的本质

goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露。

goroutine泄露的发现和定位

利用好go pprof获取goroutine profile文件,然后利用3个命令top、traces、list定位内存泄露的原因。

goroutine泄露的场景

泄露的场景不仅限于以下两类,但因channel相关的泄露是最多的。

  1. channel的读或者写:

    1. 无缓冲channel的阻塞通常是写操作因为没有读而阻塞
    2. 有缓冲的channel因为缓冲区满了,写操作阻塞
    3. 期待从channel读数据,结果没有goroutine写
  2. select操作,select里也是channel操作,如果所有case上的操作阻塞,goroutine也无法继续执行。

编码goroutine泄露的建议

为避免goroutine泄露造成内存泄露,启动goroutine前要思考清楚:

  1. goroutine如何退出?
  2. 是否会有阻塞造成无法退出?如果有,那么这个路径是否会创建大量的goroutine?

示例源码

本文所有示例源码,及历史文章、代码都存储在Github,阅读原文可直接跳转,Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/pprof

推荐阅读

这些既是参考资料也是推荐阅读的文章,不容错过。

【Go Blog关于pprof详细介绍和Demo】 https://blog.golang.org/profi...

【Dave关于高性能Go程序的workshop】 https://dave.cheney.net/high-...

【煎鱼pprof文章,很适合入门 Golang大杀器之性能剖析PProf】 https://segmentfault.com/a/11...

【SO上goroutine调用栈各字段的介绍】https://stackoverflow.com/a/3...

【我的老文,有runtime.main的介绍,想学习调度器,可以看下系列文章 Go调度器系列(2)宏观看调度器】http://lessisbetter.site/2019...

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/05/18/go-goroutine-leak/

查看原文

赞 76 收藏 50 评论 9

大彬 发布了文章 · 2019-04-15

Go调度器系列(4)源码阅读与探索

各位朋友,这次想跟大家分享一下Go调度器源码阅读相关的知识和经验,网络上已经有很多剖析源码的好文章,所以这篇文章不是又一篇源码剖析文章,注重的不是源码分析分享,而是带给大家一些学习经验,希望大家能更好的阅读和掌握Go调度器的实现

本文主要分2个部分:

  1. 解决如何阅读源码的问题。阅读源码本质是把脑海里已经有的调度设计,看看到底是不是这么实现的,是怎么实现的。
  2. 带给你一个探索Go调度器实现的办法。源码都到手了,你可以修改、窥探,通过这种方式解决阅读源码过程中的疑问,验证一些想法。比如:负责调度的是g0,怎么才能schedule()在执行时,当前是g0呢?

如何阅读源码

阅读前提

阅读Go源码前,最好已经掌握Go调度器的设计和原理,如果你还无法回答以下问题:

  1. 为什么需要Go调度器?
  2. Go调度器与系统调度器有什么区别和关系/联系?
  3. G、P、M是什么,三者的关系是什么?
  4. P有默认几个?
  5. M同时能绑定几个P?
  6. M怎么获得G?
  7. M没有G怎么办?
  8. 为什么需要全局G队列?
  9. Go调度器中的负载均衡的2种方式是什么?
  10. work stealing是什么?什么原理?
  11. 系统调用对G、P、M有什么影响?
  12. Go调度器抢占是什么样的?一定能抢占成功吗?

建议阅读Go调度器系列文章,以及文章中的参考资料:

  1. Go调度器系列(1)起源
  2. Go调度器系列(2)宏观看调度器
  3. Go调度器系列(3)图解调度原理

优秀源码资料推荐

既然你已经能回答以上问题,说明你对Go调度器的设计已经有了一定的掌握,关于Go调度器源码的优秀资料已经有很多,我这里推荐2个:

  1. 雨痕的Go源码剖析六章并发调度,不止是源码,是以源码为基础进行了详细的Go调度器介绍:ttps://github.com/qyuhen/book
  2. Go夜读第12期,golang中goroutine的调度,M、P、G各自的一生状态,以及转换关系:https://reading.developerlear...

Go调度器的源码还涉及GC等,阅读源码时,可以暂时先跳过,主抓调度的逻辑。

另外,Go调度器涉及汇编,也许你不懂汇编,不用担心,雨痕的文章对汇编部分有进行解释。

最后,送大家一幅流程图,画出了主要的调度流程,大家也可边阅读边画,增加理解,高清版可到博客下载(原图原文跳转)

如何探索调度器

这部分教你探索Go调度器的源码,验证想法,主要思想就是,下载Go的源码,添加调试打印,编译修改的源文件,生成修改的go,然后使用修改go运行测试代码,观察结果。

下载和编译Go

  1. Github下载,并且换到go1.11.2分支,本文所有代码修改都基于go1.11.2版本。
$ GODIR=$GOPATH/src/github.com/golang/go
$ mkdir -p $GODIR
$ cd $GODIR/..
$ git clone https://github.com/golang/go.git
$ cd go
$ git fetch origin go1.11.2
$ git checkout origin/go1.11.2
$ git checkout -b go1.11.2
$ git checkout go1.11.2
  1. 初次编译,会跑测试,耗时长一点
$ cd $GODIR/src
$ ./all.bash
  1. 以后每次修改go源码后可以这样,4分钟左右可以编译完成
$ cd  $GODIR/src
$ time ./make.bash
Building Go cmd/dist using /usr/local/go.
Building Go toolchain1 using /usr/local/go.
Building Go bootstrap cmd/go (go_bootstrap) using Go toolchain1.
Building Go toolchain2 using go_bootstrap and Go toolchain1.
Building Go toolchain3 using go_bootstrap and Go toolchain2.
Building packages and commands for linux/amd64.
---
Installed Go for linux/amd64 in /home/xxx/go/src/github.com/golang/go
Installed commands in /home/xxx/go/src/github.com/golang/go/bin

real    1m11.675s
user    4m4.464s
sys    0m18.312s

编译好的go和gofmt在$GODIR/bin目录。

$ ll $GODIR/bin
total 16044
-rwxrwxr-x 1 vnt vnt 13049123 Apr 14 10:53 go
-rwxrwxr-x 1 vnt vnt  3377614 Apr 14 10:53 gofmt
  1. 为了防止我们修改的go和过去安装的go冲突,创建igo软连接,指向修改的go。
$ mkdir -p ~/testgo/bin
$ cd ~/testgo/bin
$ ln -sf $GODIR/bin/go igo
  1. 最后,把~/testgo/bin加入到PATH,就能使用igo来编译代码了,运行下igo,应当获得go1.11.2的版本:
$ igo version
go version go1.11.2 linux/amd64

当前,已经掌握编译和使用修改的go的办法,接下来就以1个简单的例子,教大家如何验证想法。

验证schedule()由g0执行

阅读源码的文章,你已经知道了g0是负责调度的,并且g0是全局变量,可在runtime包的任何地方直接使用,看到schedule()代码如下(所在文件:$GODIR/src/runtime/proc.go):

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    // 获取当前g,调度时这个g应当是g0
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }

    // m已经被某个g锁定,先停止当前m,等待g可运行时,再执行g,并且还得到了g所在的p
    if _g_.m.lockedg != 0 {
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // 省略...
}

问题:既然g0是负责调度的,为何schedule()每次还都执行_g_ := getg(),直接使用g0不行吗?schedule()真的是g0执行的吗?

《Go调度器系列(2)宏观看调度器》这篇文章中我曾介绍了trace的用法,阅读代码时发现使用debug.schedtraceprint()函数可以用作打印调试信息,那我们是不是可以使用这种方法打印我们想获取的信息呢?当然可以。

另外,注意print()并不是fmt.Print(),也不是C语言的printf,所以不是格式化输出,它是汇编实现的,我们不深入去了解它的实现了,现在要掌握它的用法:

// The print built-in function formats its arguments in an
// implementation-specific way and writes the result to standard error.
// Print is useful for bootstrapping and debugging; it is not guaranteed
// to stay in the language.
func print(args ...Type)

从上面可以看到,它接受可变长参数,我们使用的时候只需要传进去即可,但要手动控制格式。

我们修改schedule()函数,使用debug.schedtrace > 0控制打印,加入3行代码,把goid给打印出来,如果始终打印goid为0,则代表调度确实是由g0执行的:

if debug.schedtrace > 0 {
    print("schedule(): goid = ", _g_.goid, "\n") // 会是0吗?是的
}

schedule()如下:

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    // 获取当前g,调度时这个g应当是g0
    _g_ := getg()

    if debug.schedtrace > 0 {
        print("schedule(): goid = ", _g_.goid, "\n") // 会是0吗?是的
    }

    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }
    // ...
}

编译igo:

$ cd  $GODIR/src
$ ./make.bash

编写一个简单的demo(不能更简单):

package main

func main() {
}

结果如下,你会发现所有的schedule()函数调用都打印goid = 0,足以证明Go调度器的调度由g0完成(如果你认为还是缺乏说服力,可以写复杂一些的demo):

$ GODEBUG=schedtrace=1000 igo run demo1.go
schedule(): goid = 0
schedule(): goid = 0
SCHED 0ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=1 idlethreads=0 runqueue=0 [0 0 0 0 0 0 0 0]
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
schedule(): goid = 0
// 省略几百行

启发比结论更重要,希望各位朋友在学习Go调度器的时候,能多一些自己的探索和研究,而不仅仅停留在看看别人文章之上

参考资料

  1. Installing Go from source
  1. 如果这篇文章对你有帮助,请点个赞/喜欢,感谢
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019...

image

查看原文

赞 20 收藏 16 评论 0

大彬 评论了文章 · 2019-04-15

【Go源码分析】Go scheduler 源码分析

作者:孙伟

1、进程/线程/协程基本概念

  • 一个进程可以有多个线程,一般情况下固定2MB内存块来做栈,用来保存当前被调用/挂起的函数内部的变量,CPU在执行调度的时候切换的是线程,如果下一个线程也是当前进程的,就只有线程切换,“很快”就能完成;如果下一个线程不是当前的进程,就需要切换进程,这就得费点时间了。
  • 线程分为内核态线程用户态线程,用户态线程需要绑定内核态线程,CPU并不能感知用户态线程的存在,它只知道它在运行1个线程,这个线程实际是内核态线程。
  • 用户态线程实际有个名字叫协程(co-routine),为了容易区分,我们使用协程指用户态线程,使用线程指内核态线程。
  • 协程跟线程是有区别的,线程由CPU调度是抢占式的,协程由用户态调度是协作式的,一个协程让出CPU后,才执行下一个协程。

协程和线程绑定关系有以下3种:

  • N:1,N个协程绑定1个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1个进程的所有协程都绑定在1个线程上,一是某个程序用不了硬件的多核加速能力,二是一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。
  • 1:1,1个协程绑定1个线程,这种最容易实现。协程的调度都由CPU完成了,不存在N:1缺点,但有一个缺点是协程的创建、删除和切换的代价都由CPU完成,有点略显昂贵了。
  • M:N,M个协程绑定N个线程,是N:1和1:1类型的结合,克服了以上2种模型的缺点,但实现起来最为复杂。

2、Golang简介

2.1 Goroutine 概念

因为线程切换需要很大的上下文,这种切换消耗了大量CPU时间,所以Go的并行单元并不是传统意义上的线程,而是采用更轻量的协程(goroutine)来处理,大大提高了并行度,因此Go被称为“最并行的语言”。

2.2与其他并发模型的对比

  • Python等解释性语言采用的是多进程并发模型,进程的上下文是最大的,所以切换耗费巨大,同时由于多进程通信只能用socket通讯,或者专门设置共享内存,给编程带来了极大的困扰与不便;
  • C++等语言通常会采用多线程并发模型,相比进程,线程的上下文要小很多,而且多个线程之间本来就是共享内存的,所以编程相比要轻松很多。但是线程的启动和销毁,切换依然要耗费大量CPU时间;于是出现了线程池技术,将线程先储存起来,保持一定的数量,来避免频繁开启/关闭线程的时间消耗,但是这种初级的技术存在一些问题,比如有线程一直被IO阻塞,这样的话这个线程一直占据着坑位,导致后面的任务排不到队,拿不到线程来执行;
  • Go的并发较为复杂,Go采用了更轻量的数据结构来代替线程,这种数据结构相比线程更轻量,他有自己的栈,切换起来更快。然而真正执行并发的还是线程,Go通过调度器将goroutine调度到线程中执行,并适时地释放和创建新的线程,并且当一个正在运行的goroutine进入阻塞(常见场景就是等待IO)时,将其脱离占用的线程,将其他准备好运行的goroutine放在该线程上执行。通过较为复杂的调度手段,使得整个系统获得极高的并行度同时又不耗费大量的CPU资源。

2.3 Goroutine的特点

  • 非阻塞。Goroutine的引入是为了方便高并发程序的编写。一个Goroutine在进行阻塞操作(比如系统调用)时,会把当前线程中的其他Goroutine移交到其他线程中继续执行,从而避免了整个程序的阻塞。
  • 调度器。虽然Golang引入了垃圾回收(gc),在执行gc时就要求Goroutine是停止的,但Go通过自己实现调度器,也可以方便的实现该功能。 通过多个Goroutine来实现并发程序,既有异步IO的优势,又具有多线程、多进程编写程序的便利性。
  • 自己维护堆栈。当然引入Goroutine,也意味着引入了极大的复杂性。一个Goroutine既要包含要执行的代码,又要包含用于执行该代码的栈、PC(PC值=当前程序执行位置+8)和SP指针。堆栈指针需要保证各种模式下程序完成性。

既然每个Goroutine都有自己的栈,那么在创建Goroutine时,就要同时创建对应的栈。Goroutine在执行时,栈空间会不停增长。栈通常是连续增长的,由于每个进程中的各个线程共享虚拟内存空间,当有多个线程时,就需要为每个线程分配不同起始地址的栈。这就需要在分配栈之前先预估每个线程栈的大小。如果线程数量非常多,就很容易栈溢出。

为了解决这个问题,就有了Split Stacks 技术:创建栈时,只分配一块比较小的内存,如果进行某次函数调用导致栈空间不足时,就会在其他地方分配一块新的栈空间。新的空间不需要和老的栈空间连续。函数调用的参数会拷贝到新的栈空间中,接下来的函数执行都在新栈空间中进行。Golang的栈管理方式与此类似,但是为了更高的效率,使用了连续栈( Golang连续栈) 实现方式也是先分配一块固定大小的栈,在栈空间不足时,分配一块更大的栈,并把旧的栈全部拷贝到新栈中。这样避免了Split Stacks方法可能导致的频繁内存分配和释放。

Goroutine的执行是可以被抢占的。如果一个Goroutine一直占用CPU,长时间没有被调度过,就会被runtime抢占掉,把CPU时间交给其他Goroutine。 这个可以通过 debug/goroutine 阻塞实现。

2.4 结构体

  • M:指go中的工作者线程,是真正执行代码的单元;
  • P:是一种调度goroutine的上下文,goroutine依赖于P进行调度,P是真正的并行单元;
  • G:即goroutine,是go语言中的一段代码(以一个函数的形式展现),最小的并行单元;

P必须绑定在M上才能运行,M必须绑定了P才能运行,而一般情况下,最多有MAXPROCS(通常等于CPU数量)个P,但是可能有很多个M,真正运行的只有绑定了M的P,所以P是真正的并行单元。

每个P有一个自己的runnableG队列,可以从里面拿出一个G来运行,同时也有一个全局的runnable G队列,G通过P依附在M上面执行。不单独使用全局的runnable G队列的原因是,分布式的队列有利于减小临界区大小,想一想多个线程同时请求可用的G的时候,如果只有全局的资源,那么这个全局的锁会导致多少线程一直在等待。

但是如果一个正在执行的G进入了阻塞,典型的例子就是等待IO,那么他和它所在的M会在那边等待,而上下文P会传递到其他可用的M上面,这样这个阻塞就不会影响程序的并行度。

G结构体

type g struct {
   // Stack parameters.
   // stack describes the actual stack memory: [stack.lo, stack.hi).
   // stackguard0 is the stack pointer compared in the Go stack growth prologue.
   // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
   // stackguard1 is the stack pointer compared in the C stack growth prologue.
   // It is stack.lo+StackGuard on g0 and gsignal stacks.
   // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
   stack       stack   // offset known to runtime/cgo //描述了真实的栈内存,包括上下界、
   stackguard0 uintptr // offset known to liblink
   stackguard1 uintptr // offset known to liblink
 
   _panic         *_panic // innermost panic - offset known to liblink
   _defer         *_defer // innermost defer
   m              *m      // current m; offset known to arm liblink  //当前的M
   sched          gobuf    //goroutine切换时,用于保存g的上下文
   syscallsp      uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc
   syscallpc      uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc
   stktopsp       uintptr        // expected sp at top of stack, to check in traceback
   param          unsafe.Pointer // passed parameter on wakeup 用于传递参数,睡眠时 其他goroutine可以设置param,唤醒时该goroutine可以获取
   atomicstatus   uint32
   stackLock      uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
   goid           int64   //goroutine 的ID
   waitsince      int64  // approx time when the g become blocked  g被阻塞的 大概时间
   waitreason     string // if status==Gwaiting
   schedlink      guintptr
   preempt        bool     // preemption signal, duplicates stackguard0 = stackpreempt
   paniconfault   bool     // panic (instead of crash) on unexpected fault address
   preemptscan    bool     // preempted g does scan for gc
   gcscandone     bool     // g has scanned stack; protected by _Gscan bit in status
   gcscanvalid    bool     // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
   throwsplit     bool     // must not split stack
   raceignore     int8     // ignore race detection events
   sysblocktraced bool     // StartTrace has emitted EvGoInSyscall about this goroutine
   sysexitticks   int64    // cputicks when syscall has returned (for tracing)
   traceseq       uint64   // trace event sequencer
   tracelastp     puintptr // last P emitted an event for this goroutine
   lockedm        muintptr    //G被锁定只能在这个M运行
   sig            uint32
   writebuf       []byte
   sigcode0       uintptr
   sigcode1       uintptr
   sigpc          uintptr
   gopc           uintptr // pc of go statement that created this goroutine
   startpc        uintptr // pc of goroutine function
   racectx        uintptr
   waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
   cgoCtxt        []uintptr      // cgo traceback context
   labels         unsafe.Pointer // profiler labels
   timer          *timer         // cached timer for time.Sleep
   selectDone     uint32         // are we participating in a select and did someone win the race?
 
   // Per-G GC state
 
   // gcAssistBytes is this G's GC assist credit in terms of
   // bytes allocated. If this is positive, then the G has credit
   // to allocate gcAssistBytes bytes without assisting. If this
   // is negative, then the G must correct this by performing
   // scan work. We track this in bytes to make it fast to update
   // and check for debt in the malloc hot path. The assist ratio
   // determines how this corresponds to scan work debt.
   gcAssistBytes int64
}

Gobuf结构体

type gobuf struct {
    sp   uintptr
    pc   uintptr
    g    guintptr
    ctxt unsafe.Pointer
    ret  sys.Uintreg
    lr   uintptr
    bp   uintptr // for GOEXPERIMENT=framepointer
}

其中最主要的当然是sched了,保存了goroutine的上下文。goroutine切换的时候不同于线程有OS来负责这部分数据,而是由一个gobuf对象来保存,这样能够更加轻量级,再来看看gobuf的结构

M结构体

type m struct {
    g0      *g     // 带有调度栈的goroutine
    gsignal       *g         // 处理信号的goroutine
    tls           [6]uintptr // thread-local storage
    mstartfn      func()
    curg          *g       // 当前运行的goroutine
    caughtsig     guintptr
    p             puintptr // 关联p和执行的go代码
    nextp         puintptr
    id            int32
    mallocing     int32 // 状态
    spinning      bool // m是否out of work
    blocked       bool // m是否被阻塞
    inwb          bool // m是否在执行写屏蔽
    printlock     int8
    incgo         bool // m在执行cgo吗
    fastrand      uint32
    ncgocall      uint64      // cgo调用的总数
    ncgo          int32       // 当前cgo调用的数目
    park          note
    alllink       *m // 用于链接allm
    schedlink     muintptr
    mcache        *mcache // 当前m的内存缓存
    lockedg       *g // 锁定g在当前m上执行,而不会切换到其他m
    createstack   [32]uintptr // thread创建的栈
}

结构体M中有两个G是需要关注一下的:

  • 一个是curg,代表结构体M当前绑定的结构体G。
  • 另一个是g0,是带有调度栈的goroutine,这是一个比较特殊的goroutine。普通的goroutine的栈是在堆上分配的可增长的栈,而g0的栈是M对应的线程的栈。所有调度相关的代码,会先切换到该goroutine的栈中再执行。也就是说线程的栈也是用的g实现,而不是使用的OS的。

P结构体

type p struct {
    lock mutex
    id          int32
    status      uint32 // 状态,可以为pidle/prunning/...
    link        puintptr
    schedtick   uint32     // 每调度一次加1
    syscalltick uint32     // 每一次系统调用加1
    sysmontick  sysmontick
    m           muintptr   // 回链到关联的m
    mcache      *mcache
    racectx     uintptr
    goidcache    uint64 // goroutine的ID的缓存
    goidcacheend uint64
    // 可运行的goroutine的队列
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    runnext guintptr // 下一个运行的g
    sudogcache []*sudog
    sudogbuf   [128]*sudog
    palloc persistentAlloc // per-P to avoid mutex
    pad [sys.CacheLineSize]byte
}

其中P的状态有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列runqhead里面有可运行的goroutine,P优先从内部获取执行的g,这样能够提高效率。

Schedt结构体

type schedt struct {
   goidgen  uint64
    lastpoll uint64
    lock mutex
    midle        muintptr // idle状态的m
    nmidle       int32    // idle状态的m个数
    nmidlelocked int32    // lockde状态的m个数
    mcount       int32    // 创建的m的总数
    maxmcount    int32    // m允许的最大个数
    ngsys uint32 // 系统中goroutine的数目,会自动更新
    pidle      puintptr // idle的p
    npidle     uint32
    nmspinning uint32
    // 全局的可运行的g队列
    runqhead guintptr
    runqtail guintptr
    runqsize int32
    // dead的G的全局缓存
    gflock       mutex
    gfreeStack   *g
    gfreeNoStack *g
    ngfree       int32
    // sudog的缓存中心
    sudoglock  mutex
    sudogcache *sudog
}

大多数需要的信息都已放在了结构体M、G和P中,schedt结构体只是一个壳。可以看到,其中有M的idle队列,P的idle队列,以及一个全局的就绪的G队列。schedt结构体中的Lock是非常必须的,如果M或P等做一些非局部的操作,它们一般需要先锁住调度器。

2.5具体函数

goroutine调度器的代码在/src/runtime/proc.go中,一些比较关键的函数分析如下。

2.5.1 schedule函数

schedule函数在runtime需要进行调度时执行,为当前的P寻找一个可以运行的G并执行它,寻找顺序如下:

  • 1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;
  • 2) 如果1)失败,则调用findrunnable函数去寻找一个可以执行的G;
  • 3) 如果2)也没有得到可以执行的G,那么结束调度,从上次的现场继续执行。
  • 4) 注意)//偶尔会先检查一次全局可运行队列,以确保公平性。否则,两个goroutine可以完全占用本地runqueue。 通过 schedtick计数 %61来保证

代码如下:

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
   _g_ := getg()
 
   if _g_.m.locks != 0 {
      throw("schedule: holding locks")
   }
 
   if _g_.m.lockedg != 0 {
      stoplockedm()
      execute(_g_.m.lockedg.ptr(), false) // Never returns.
   }
 
   // We should not schedule away from a g that is executing a cgo call,
   // since the cgo call is using the m's g0 stack.
   if _g_.m.incgo {
      throw("schedule: in cgo")
   }
 
top:
   if sched.gcwaiting != 0 {
      gcstopm()
      goto top
   }
   if _g_.m.p.ptr().runSafePointFn != 0 {
      runSafePointFn()
   }
 
   var gp *g
   var inheritTime bool
   if trace.enabled || trace.shutdown {
      gp = traceReader()
      if gp != nil {
         casgstatus(gp, _Gwaiting, _Grunnable)
         traceGoUnpark(gp, 0)
      }
   }
   if gp == nil && gcBlackenEnabled != 0 {
      gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
   }
   if gp == nil {
      // Check the global runnable queue once in a while to ensure fairness.
      // Otherwise two goroutines can completely occupy the local runqueue
      // by constantly respawning each other.
      if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
         lock(&sched.lock)
         gp = globrunqget(_g_.m.p.ptr(), 1)
         unlock(&sched.lock)
      }
   }
   if gp == nil {
      gp, inheritTime = runqget(_g_.m.p.ptr())
      if gp != nil && _g_.m.spinning {
         throw("schedule: spinning with local work")
      }
   }
   if gp == nil {
      gp, inheritTime = findrunnable() // blocks until work is available
   }
 
   // This thread is going to run a goroutine and is not spinning anymore,
   // so if it was marked as spinning we need to reset it now and potentially
   // start a new spinning M.
   if _g_.m.spinning {
      resetspinning()
   }
 
   if gp.lockedm != 0 {
      // Hands off own p to the locked m,
      // then blocks waiting for a new p.
      startlockedm(gp)
      goto top
   }
 
   execute(gp, inheritTime)
}

2.5.2 findrunnable函数

findrunnable函数负责给一个P寻找可以执行的G,它的寻找顺序如下:

  • 1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;
  • 2) 如果1)失败,调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;
  • 3) 如果2)失败,调用netpoll(非阻塞)函数取一个异步回调的G
  • 4) 如果3)失败,尝试从其他P那里偷取一半数量的G过来;
  • 5) 如果4)失败,再次调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;
  • 6) 如果5)失败,调用netpoll(阻塞)函数取一个异步回调的G;
  • 7) 如果6)仍然没有取到G,那么调用stopm函数停止这个M。

代码如下:

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
   _g_ := getg()
 
   // The conditions here and in handoffp must agree: if
   // findrunnable would return a G to run, handoffp must start
   // an M.
 
top:
   _p_ := _g_.m.p.ptr()
   if sched.gcwaiting != 0 {
      gcstopm()
      goto top
   }
   if _p_.runSafePointFn != 0 {
      runSafePointFn()
   }
   if fingwait && fingwake {
      if gp := wakefing(); gp != nil {
         ready(gp, 0, true)
      }
   }
   if *cgo_yield != nil {
      asmcgocall(*cgo_yield, nil)
   }
 
   // local runq
   if gp, inheritTime := runqget(_p_); gp != nil {
      return gp, inheritTime
   }
 
   // global runq
   if sched.runqsize != 0 {
      lock(&sched.lock)
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      if gp != nil {
         return gp, false
      }
   }
 
   // Poll network.
   // This netpoll is only an optimization before we resort to stealing.
   // We can safely skip it if there are no waiters or a thread is blocked
   // in netpoll already. If there is any kind of logical race with that
   // blocked thread (e.g. it has already returned from netpoll, but does
   // not set lastpoll yet), this thread will do blocking netpoll below
   // anyway.
   if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
      if gp := netpoll(false); gp != nil { // non-blocking
         // netpoll returns list of goroutines linked by schedlink.
         injectglist(gp.schedlink.ptr())
         casgstatus(gp, _Gwaiting, _Grunnable)
         if trace.enabled {
            traceGoUnpark(gp, 0)
         }
         return gp, false
      }
   }
 
   // Steal work from other P's.
   procs := uint32(gomaxprocs)
   if atomic.Load(&sched.npidle) == procs-1 {
      // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
      // New work can appear from returning syscall/cgocall, network or timers.
      // Neither of that submits to local run queues, so no point in stealing.
      goto stop
   }
   // If number of spinning M's >= number of busy P's, block.
   // This is necessary to prevent excessive CPU consumption
   // when GOMAXPROCS>>1 but the program parallelism is low.
   if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
      goto stop
   }
   if !_g_.m.spinning {
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
   }
   for i := 0; i < 4; i++ {
      for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
         if sched.gcwaiting != 0 {
            goto top
         }
         stealRunNextG := i > 2 // first look for ready queues with more than 1 g
         if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
            return gp, false
         }
      }
   }
 
stop:
 
   // We have nothing to do. If we're in the GC mark phase, can
   // safely scan and blacken objects, and have work to do, run
   // idle-time marking rather than give up the P.
   if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
      _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
      gp := _p_.gcBgMarkWorker.ptr()
      casgstatus(gp, _Gwaiting, _Grunnable)
      if trace.enabled {
         traceGoUnpark(gp, 0)
      }
      return gp, false
   }
 
   // Before we drop our P, make a snapshot of the allp slice,
   // which can change underfoot once we no longer block
   // safe-points. We don't need to snapshot the contents because
   // everything up to cap(allp) is immutable.
   allpSnapshot := allp
 
   // return P and block
   lock(&sched.lock)
   if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
      unlock(&sched.lock)
      goto top
   }
   if sched.runqsize != 0 {
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      return gp, false
   }
   if releasep() != _p_ {
      throw("findrunnable: wrong p")
   }
   pidleput(_p_)
   unlock(&sched.lock)
 
   // Delicate dance: thread transitions from spinning to non-spinning state,
   // potentially concurrently with submission of new goroutines. We must
   // drop nmspinning first and then check all per-P queues again (with
   // #StoreLoad memory barrier in between). If we do it the other way around,
   // another thread can submit a goroutine after we've checked all run queues
   // but before we drop nmspinning; as the result nobody will unpark a thread
   // to run the goroutine.
   // If we discover new work below, we need to restore m.spinning as a signal
   // for resetspinning to unpark a new worker thread (because there can be more
   // than one starving goroutine). However, if after discovering new work
   // we also observe no idle Ps, it is OK to just park the current thread:
   // the system is fully loaded so no spinning threads are required.
   // Also see "Worker thread parking/unparking" comment at the top of the file.
   wasSpinning := _g_.m.spinning
   if _g_.m.spinning {
      _g_.m.spinning = false
      if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
         throw("findrunnable: negative nmspinning")
      }
   }
 
   // check all runqueues once again
   for _, _p_ := range allpSnapshot {
      if !runqempty(_p_) {
         lock(&sched.lock)
         _p_ = pidleget()
         unlock(&sched.lock)
         if _p_ != nil {
            acquirep(_p_)
            if wasSpinning {
               _g_.m.spinning = true
               atomic.Xadd(&sched.nmspinning, 1)
            }
            goto top
         }
         break
      }
   }
 
   // Check for idle-priority GC work again.
   if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
      lock(&sched.lock)
      _p_ = pidleget()
      if _p_ != nil && _p_.gcBgMarkWorker == 0 {
         pidleput(_p_)
         _p_ = nil
      }
      unlock(&sched.lock)
      if _p_ != nil {
         acquirep(_p_)
         if wasSpinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
         }
         // Go back to idle GC check.
         goto stop
      }
   }
 
   // poll network
   if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
      if _g_.m.p != 0 {
         throw("findrunnable: netpoll with p")
      }
      if _g_.m.spinning {
         throw("findrunnable: netpoll with spinning")
      }
      gp := netpoll(true) // block until new work is available
      atomic.Store64(&sched.lastpoll, uint64(nanotime()))
      if gp != nil {
         lock(&sched.lock)
         _p_ = pidleget()
         unlock(&sched.lock)
         if _p_ != nil {
            acquirep(_p_)
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
               traceGoUnpark(gp, 0)
            }
            return gp, false
         }
         injectglist(gp)
      }
   }
   stopm()
   goto top
}

2.5.3 newproc函数

newproc函数负责创建一个可以运行的G并将其放在当前的P的runnable G队列中,它是类似”go func() { … }”语句真正被编译器翻译后的调用,核心代码在newproc1函数。这个函数执行顺序如下:

  • 1) 获得当前的G所在的 P,然后从free G队列中取出一个G;
  • 2) 如果1)取到则对这个G进行参数配置,否则新建一个G;
  • 3) 将G加入P的runnable G队列。

代码如下:

// Go1.10.8版本默认stack大小为2KB

_StackMin = 2048
// 创建一个g对象,然后放到g队列
// 等待被执行

// Create a new g running fn with narg bytes of arguments starting
// at argp. callerpc is the address of the go statement that created
// this. The new g is put on the queue of g's waiting to run.
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
   _g_ := getg()
 
   if fn == nil {
      _g_.m.throwing = -1 // do not dump full stacks
      throw("go of nil func value")
   }
   _g_.m.locks++ // disable preemption because it can be holding p in a local var
   siz := narg
   siz = (siz + 7) &^ 7
 
   // We could allocate a larger initial stack if necessary.
   // Not worth it: this is almost always an error.
   // 4*sizeof(uintreg): extra space added below
   // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
   if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
      throw("newproc: function arguments too large for new goroutine")
   }
 
   _p_ := _g_.m.p.ptr()
   newg := gfget(_p_)
   if newg == nil {
      newg = malg(_StackMin)
      casgstatus(newg, _Gidle, _Gdead)
      allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
   }
   if newg.stack.hi == 0 {
      throw("newproc1: newg missing stack")
   }
 
   if readgstatus(newg) != _Gdead {
      throw("newproc1: new g is not Gdead")
   }
 
   totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
   totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
   sp := newg.stack.hi - totalSize
   spArg := sp
   if usesLR {
      // caller's LR
      *(*uintptr)(unsafe.Pointer(sp)) = 0
      prepGoExitFrame(sp)
      spArg += sys.MinFrameSize
   }
   if narg > 0 {
      memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
      // This is a stack-to-stack copy. If write barriers
      // are enabled and the source stack is grey (the
      // destination is always black), then perform a
      // barrier copy. We do this *after* the memmove
      // because the destination stack may have garbage on
      // it.
      if writeBarrier.needed && !_g_.m.curg.gcscandone {
         f := findfunc(fn.fn)
         stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
         // We're in the prologue, so it's always stack map index 0.
         bv := stackmapdata(stkmap, 0)
         bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
      }
   }
 
   memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
   newg.sched.sp = sp
   newg.stktopsp = sp
   newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
   newg.sched.g = guintptr(unsafe.Pointer(newg))
   gostartcallfn(&newg.sched, fn)
   newg.gopc = callerpc
   newg.startpc = fn.fn
   if _g_.m.curg != nil {
      newg.labels = _g_.m.curg.labels
   }
   if isSystemGoroutine(newg) {
      atomic.Xadd(&sched.ngsys, +1)
   }
   newg.gcscanvalid = false
   casgstatus(newg, _Gdead, _Grunnable)
 
   if _p_.goidcache == _p_.goidcacheend {
      // Sched.goidgen is the last allocated id,
      // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
      // At startup sched.goidgen=0, so main goroutine receives goid=1.
      _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
      _p_.goidcache -= _GoidCacheBatch - 1
      _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
   }
   newg.goid = int64(_p_.goidcache)
   _p_.goidcache++
   if raceenabled {
      newg.racectx = racegostart(callerpc)
   }
   if trace.enabled {
      traceGoCreate(newg, newg.startpc)
   }
   runqput(_p_, newg, true)
 
   if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
      wakep()
   }
   _g_.m.locks--
   if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
      _g_.stackguard0 = stackPreempt
   }
}

2.5.4 goexit0函数

goexit函数是当G退出时调用的。这个函数对G进行一些设置后,将它放入free G列表中,供以后复用,之后调用schedule函数调度。

// goexit continuation on g0.
func goexit0(gp *g) {
   _g_ := getg()
 
   //设置g的 status从 _Grunning变为 _Gdead
   casgstatus(gp, _Grunning, _Gdead)
   if isSystemGoroutine(gp) {
      atomic.Xadd(&sched.ngsys, -1)
   }
   //对该g 进行释放设置 基本为nil /0
   gp.m = nil
   locked := gp.lockedm != 0
   gp.lockedm = 0
   _g_.m.lockedg = 0
   gp.paniconfault = false
   gp._defer = nil // should be true already but just in case.
   gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
   gp.writebuf = nil
   gp.waitreason = ""
   gp.param = nil
   gp.labels = nil
   gp.timer = nil
 
   if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
      // Flush assist credit to the global pool. This gives
      // better information to pacing if the application is
      // rapidly creating an exiting goroutines.
      scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
      atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
      gp.gcAssistBytes = 0
   }
 
   // Note that gp's stack scan is now "valid" because it has no
   // stack.
   gp.gcscanvalid = true
   dropg()
 
   if _g_.m.lockedInt != 0 {
      print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
      throw("internal lockOSThread error")
   }
   _g_.m.lockedExt = 0
   //把这个g 推到free G 列表
   gfput(_g_.m.p.ptr(), gp)
   if locked {
      // The goroutine may have locked this thread because
      // it put it in an unusual kernel state. Kill it
      // rather than returning it to the thread pool.
 
      // Return to mstart, which will release the P and exit
      // the thread.
      if GOOS != "plan9" { // See golang.org/issue/22227.
         gogo(&_g_.m.g0.sched)
      }
   }
   schedule()
}

2.5.5 handoffp函数

handoffp函数将P从系统调用或阻塞的M中传递出去,如果P还有runnable G队列,那么新开一个M,调用startm函数,新开的M不空旋。

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
   // handoffp must start an M in any situation where
   // findrunnable would return a G to run on _p_.
 
 
   //如果这个P的队列不为空或调度内的size不为空 那么 进行startm 且不空旋
   if !runqempty(_p_) || sched.runqsize != 0 {
      startm(_p_, false)
      return
   }
   //如果正在进行GC处理  同上
   if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
      startm(_p_, false)
      return
   }
   //如果没活可做了,检查下有没有 空闲/自旋的 M
   //否则 不需要我们做自旋
   if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
      startm(_p_, true)
      return
   }
   //调度上锁  将这个P 摘除走
   lock(&sched.lock)
   if sched.gcwaiting != 0 {
      _p_.status = _Pgcstop
      sched.stopwait--
      if sched.stopwait == 0 {
         notewakeup(&sched.stopnote)
      }
      unlock(&sched.lock)
      return
   }
   if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
      sched.safePointFn(_p_)
      sched.safePointWait--
      if sched.safePointWait == 0 {
         notewakeup(&sched.safePointNote)
      }
   }
   if sched.runqsize != 0 {
      unlock(&sched.lock)
      startm(_p_, false)
      return
   }
   // If this is the last running P and nobody is polling network,
   // need to wakeup another M to poll network.
   if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
      unlock(&sched.lock)
      startm(_p_, false)
      return
   }
   pidleput(_p_)
   unlock(&sched.lock)
}

2.5.6 startm函数

startm函数调度一个M或者必要时创建一个M来运行指定的P。

// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
   //加锁
   lock(&sched.lock)
   if _p_ == nil {
       
      _p_ = pidleget()
      if _p_ == nil {
         unlock(&sched.lock)
         if spinning {
            // The caller incremented nmspinning, but there are no idle Ps,
            // so it's okay to just undo the increment and give up.
            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
               throw("startm: negative nmspinning")
            }
         }
         return
      }
   }
    
   mp := mget()
   unlock(&sched.lock)
   if mp == nil {
      var fn func()
      if spinning {
         // The caller incremented nmspinning, so set m.spinning in the new M.
         fn = mspinning
      }
      newm(fn, _p_)
      return
   }
    
   if mp.spinning {
      throw("startm: m is spinning")
   }
   if mp.nextp != 0 {
      throw("startm: m has p")
   }
   if spinning && !runqempty(_p_) {
      throw("startm: p has runnable gs")
   }
   // The caller incremented nmspinning, so set m.spinning in the new M.
   mp.spinning = spinning
   mp.nextp.set(_p_)
   notewakeup(&mp.park)
}

2.5.7 sysmon函数

sysmon函数是Go runtime启动时创建的,负责监控所有goroutine的状态,判断是否需要GC,进行netpoll等操作。sysmon函数中会调用retake函数进行抢占式调度。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
   lock(&sched.lock)
   sched.nmsys++
   checkdead()
   unlock(&sched.lock)
 
   // If a heap span goes unused for 5 minutes after a garbage collection,
   // we hand it back to the operating system.
   scavengelimit := int64(5 * 60 * 1e9)
 
   if debug.scavenge > 0 {
      // Scavenge-a-lot for testing.
      forcegcperiod = 10 * 1e6
      scavengelimit = 20 * 1e6
   }
 
   lastscavenge := nanotime()
   nscavenge := 0
 
   lasttrace := int64(0)
   idle := 0 // how many cycles in succession we had not wokeup somebody
   delay := uint32(0)
   for {
      if idle == 0 { // start with 20us sleep...
         delay = 20
      } else if idle > 50 { // start doubling the sleep after 1ms...
         delay *= 2
      }
      if delay > 10*1000 { // up to 10ms
         delay = 10 * 1000
      }
      usleep(delay)
      if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
         lock(&sched.lock)
         if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
            atomic.Store(&sched.sysmonwait, 1)
            unlock(&sched.lock)
            // Make wake-up period small enough
            // for the sampling to be correct.
            maxsleep := forcegcperiod / 2
            if scavengelimit < forcegcperiod {
               maxsleep = scavengelimit / 2
            }
            shouldRelax := true
            if osRelaxMinNS > 0 {
               next := timeSleepUntil()
               now := nanotime()
               if next-now < osRelaxMinNS {
                  shouldRelax = false
               }
            }
            if shouldRelax {
               osRelax(true)
            }
            notetsleep(&sched.sysmonnote, maxsleep)
            if shouldRelax {
               osRelax(false)
            }
            lock(&sched.lock)
            atomic.Store(&sched.sysmonwait, 0)
            noteclear(&sched.sysmonnote)
            idle = 0
            delay = 20
         }
         unlock(&sched.lock)
      }
      // trigger libc interceptors if needed
      if *cgo_yield != nil {
         asmcgocall(*cgo_yield, nil)
      }
      // poll network if not polled for more than 10ms
      lastpoll := int64(atomic.Load64(&sched.lastpoll))
      now := nanotime()
      if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
         atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
         gp := netpoll(false) // non-blocking - returns list of goroutines
         if gp != nil {
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before injectglist.
            // Otherwise it can lead to the following situation:
            // injectglist grabs all P's but before it starts M's to run the P's,
            // another M returns from syscall, finishes running its G,
            // observes that there is no work to do and no other running M's
            // and reports deadlock.
            incidlelocked(-1)
            injectglist(gp)
            incidlelocked(1)
         }
      }
      // retake P's blocked in syscalls
      // and preempt long running G's
      if retake(now) != 0 {
         idle = 0
      } else {
         idle++
      }
      // check if we need to force a GC
      if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
         lock(&forcegc.lock)
         forcegc.idle = 0
         forcegc.g.schedlink = 0
         injectglist(forcegc.g)
         unlock(&forcegc.lock)
      }
      // scavenge heap once in a while
      if lastscavenge+scavengelimit/2 < now {
         mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
         lastscavenge = now
         nscavenge++
      }
      if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
         lasttrace = now
         schedtrace(debug.scheddetail > 0)
      }
   }
}

2.5.8 retake函数

枚举所有的P 如果P在系统调用中(_Psyscall), 且经过了一次sysmon循环(20us~10ms), 则抢占这个P, 调用handoffp解除M和P之间的关联, 如果P在运行中(_Prunning), 且经过了一次sysmon循环并且G运行时间超过forcePreemptNS(10ms), 则抢占这个P

并设置g.preempt = true,g.stackguard0 = stackPreempt。

为什么设置了stackguard就可以实现抢占?
因为这个值用于检查当前栈空间是否足够, go函数的开头会比对这个值判断是否需要扩张栈。
newstack函数判断g.stackguard0等于stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。
抢占机制保证了不会有一个G长时间的运行导致其他G无法运行的情况发生。

func retake(now int64) uint32 {
   n := 0
   // Prevent allp slice changes. This lock will be completely
   // uncontended unless we're already stopping the world.
   lock(&allpLock)
   // We can't use a range loop over allp because we may
   // temporarily drop the allpLock. Hence, we need to re-fetch
   // allp each time around the loop.
   for i := 0; i < len(allp); i++ {
      _p_ := allp[i]
      if _p_ == nil {
         // This can happen if procresize has grown
         // allp but not yet created new Ps.
         continue
      }
      pd := &_p_.sysmontick
      s := _p_.status
      if s == _Psyscall {
         // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
         t := int64(_p_.syscalltick)
         if int64(pd.syscalltick) != t {
            pd.syscalltick = uint32(t)
            pd.syscallwhen = now
            continue
         }
         // On the one hand we don't want to retake Ps if there is no other work to do,
         // but on the other hand we want to retake them eventually
         // because they can prevent the sysmon thread from deep sleep.
         if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
            continue
         }
         // Drop allpLock so we can take sched.lock.
         unlock(&allpLock)
         // Need to decrement number of idle locked M's
         // (pretending that one more is running) before the CAS.
         // Otherwise the M from which we retake can exit the syscall,
         // increment nmidle and report deadlock.
         incidlelocked(-1)
         if atomic.Cas(&_p_.status, s, _Pidle) {
            if trace.enabled {
               traceGoSysBlock(_p_)
               traceProcStop(_p_)
            }
            n++
            _p_.syscalltick++
            handoffp(_p_)
         }
         incidlelocked(1)
         lock(&allpLock)
      } else if s == _Prunning {
         // Preempt G if it's running for too long.
         t := int64(_p_.schedtick)
         if int64(pd.schedtick) != t {
            pd.schedtick = uint32(t)
            pd.schedwhen = now
            continue
         }
         if pd.schedwhen+forcePreemptNS > now {
            continue
         }
         preemptone(_p_)
      }
   }
   unlock(&allpLock)
   return uint32(n)
}

3、调度器总结

3.1 调度器的两大思想

  • 复用线程:协程本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有2个体现:1)work stealing,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。2)handoff,当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。
  • 利用并行:GOMAXPROCS设置P的数量,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程处于运行状态,这些线程可能分布在多个CPU核上同时运行,使得并发利用并行。另外,GOMAXPROCS也限制了并发的程度,比如GOMAXPROCS = 核数/2,则最多利用了一半的CPU核进行并行。

3.2调度器的两小策略:

  • 抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死,这就是goroutine不同于coroutine的一个地方。
  • 全局G队列:在新的调度器中依然有全局G队列,但功能已经被弱化了,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

4、参考资料

查看原文

认证与成就

  • 获得 676 次点赞
  • 获得 4 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-11-10
个人主页被 5.6k 人浏览