golang netpoller

前言

开始讲netpoller之前先讲一下socket网络编程和epoll,不管go也好,nginx也好,只要是开启网络服务就绕不开socket,它是网络服务的基础。下一章网络编程先讲讲socket的基础函数。

网络编程

socket(购买电话机)

int socket(int domain, int type, int protocol);

socket函数对应于普通文件的打开操作。普通文件的打开操作返回一个文件描述字,而socket()用于创建一个socket描述符(socket descriptor),它唯一标识一个socket。这个socket描述字跟文件描述字一样,后续的操作都有用到它,把它作为参数,通过它来进行一些读写操作。

正如可以给fopen的传入不同参数值,以打开不同的文件。创建socket的时候,也可以指定不同的参数创建不同的socket描述符,socket函数的三个参数分别为:
domain:即协议域,又称为协议族(family)。常用的协议族有,AF_INET(IPv4)、AF_INET6(IPv6)、AF_LOCAL(或称AF_UNIX,Unix域socket)、AF_ROUTE等等。协议族决定了socket的地址类型,在通信中必须采用对应的地址,如AF_INET决定了要用ipv4地址(32位的)与端口号(16位的)的组合、AF_UNIX决定了要用一个绝对路径名作为地址。
type:指定socket类型。常用的socket类型有,SOCK_STREAM(流式套接字)、SOCK_DGRAM(数据报式套接字)、SOCK_RAW、SOCK_PACKET、SOCK_SEQPACKET等等 ,linux2.1.17版本后增加了SOCK_NONBLOCK,代表了非阻塞模式
示例:

int s = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);

protocol:就是指定协议。常用的协议有,IPPROTO_TCP、PPTOTO_UDP、IPPROTO_SCTP、IPPROTO_TIPC等,它们分别对应TCP传输协议、UDP传输协议、STCP传输协议、TIPC传输协议。

bind(接电话线)

bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

函数的三个参数分别为:
sockfd:即socket描述字,它是通过socket()函数创建了,唯一标识一个socket。bind()函数就是将给这个描述字绑定一个名字

addr:一个const struct sockaddr *指针,指向要绑定给sockfd的协议地址,例如”127.0.0.1:80”

addrlen: 对应的是地址的长度。

listen(等电话中)

int  listen(int sockfd, int backlog);

backlog: socket可以排队的最大连接个数
当有多个客户端一起请求的时候,服务端不可能来多少就处理多少,这样如果并发太多,就会因为性能的因素发生拥塞,然后造成雪崩。所有就搞了一个队列,先将请求放在队列里面,一个个来。socket_listen里面的第二个参数backlog就是设置这个队列的长度。如果将队列长度设置成10,那么如果有20个请求一起过来,服务端就会先放10个请求进入这个队列,因为长度只有10。然后其他的就直接拒绝。tcp协议这时候不会发送rst给客户端,这样的话客户端就会重新发送SYN,以便能进入这个队列。
如果三次握手完成了,就会将完成三次握手的请求取出来,放入另一个队列中,这样队列就空出一个位置,其他重发SYN的请求就可以进入队列中。

connect(打电话)

int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

sockaddr:socket地址,”127.0.0.1:8080”
addrlen:地址长度

accept(接电话)

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

sockaddr:一个指针,接收客户端socket地址,”127.0.0.1:36820”
addrlen:客户端地址长度

read/write/recv/send (通话)

read函数是负责从fd中读取内容。
write函数将buf中的nbytes字节内容写入文件描述符fd。

golang netpoller

io多路复用

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O 事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第 5 位置为 1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET 判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

struct eventpoll{
    .... 
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr; 
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};
#include <sys/epoll.h>  
int epoll_create(int size); 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

函数

epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wailt

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

优势

由于epoll的实现机制与select/poll机制完全不同,上面所说的 select的缺点在epoll上不复存在。

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?

在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

epoll的设计和实现与select完全不同。把原先的select/poll调用分成了3个部分:

1)调用epoll_create()建立一个epoll对象

2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字

3)调用epoll_wait收集发生的事件的连接

如此一来,要实现上面说是的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是非频繁调用的,而 epoll_wait 则是会被高频调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为: ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdllist 双向链表中是否有就绪的 fd,当 rdllist 为空(无就绪 fd)时挂起当前进程,直到 rdllist 非空时进程才被唤醒并返回。

边缘/水平触发模式

Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率

Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符

为什么ET模式要用非阻塞socket

假设buffer有1024字节,一次读取的数据很大,超过1024的大小:这种情况就需要读取多次,也就是要调用多次recv函数,通常我们会用while循环一直读取,无论ET还是LT,这样LT模式就显得很鸡肋,所以一般不用LT。在ET模式下,(1)、如果用阻塞IO+while循环,当最后一个数据读取完后,程序是无法立刻跳出while循环的,因为阻塞IO会在 while(true){ int len=recv(); }这里阻塞住,除非对方关闭连接或者recv出错,这样程序就无法继续往下执行,这一次的epoll_wait没有办法处理其它的连接,会造成延迟、并发度下降。(2)、如果是非阻塞IO+while循环当读取完数据后,recv会立即返回-1,并将errno设置为EAGAIN或EWOULDBLOCK,这就表示数据已经读取完成,已经没有数据了,可以退出循环了。这样就不会像阻塞IO一样卡在那里,这就减少了不必要的等待时间,性能自然更高。

网络编程+epoll代码示例

int main(){  
    struct epoll_event ev, events[MAX_EVENTS];  
    int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;  
    struct sockaddr_in local, remote;  
    char buf[BUFSIZ];  

    //创建listen socket  
    if( (listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK , 0)) < 0) {  
        perror("sockfd\n");  
        exit(1);  
    }  
    bzero(&local, sizeof(local));  
    local.sin_family = AF_INET;  
    local.sin_addr.s_addr = htonl(INADDR_ANY);;  
    local.sin_port = htons(PORT);  
    if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {  
        perror("bind\n");  
        exit(1);  
    }  
    listen(listenfd, 20);  

    epfd = epoll_create(MAX_EVENTS);  
    if (epfd == -1) {  
        perror("epoll_create");  
        exit(EXIT_FAILURE);  
    }  

    ev.events = EPOLLIN;  
    ev.data.fd = listenfd;  
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {  
        perror("epoll_ctl: listen_sock");  
        exit(EXIT_FAILURE);  
    }  

    for (;;) {  
        nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);  
        if (nfds == -1) {  
            perror("epoll_pwait");  
            exit(EXIT_FAILURE);  
        }  
        for (i = 0; i < nfds; ++i) {  
            fd = events[i].data.fd;  
            if (fd == listenfd) {  
                while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,   
                                (size_t *)&addrlen)) > 0) {  
                    ev.events = EPOLLIN | EPOLLET;  
                    ev.data.fd = conn_sock;  
                    if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,  
                                &ev) == -1) {  
                        perror("epoll_ctl: add");  
                        exit(EXIT_FAILURE);  
                    }  
                }  
                if (conn_sock == -1) {  
                    if (errno != EAGAIN && errno != ECONNABORTED   
                            && errno != EPROTO && errno != EINTR)   
                        perror("accept");  
                }  
                continue;  
            }    
            if (events[i].events & EPOLLIN) {  
                n = 0;  
                while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {  
                    n += nread;  
                }  
                if (nread == -1 && errno != EAGAIN) {  
                    perror("read error");  
                }  
                ev.data.fd = fd;  
                ev.events = events[i].events | EPOLLOUT;  
                if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {  
                    perror("epoll_ctl: mod");  
                }  
            }  
            if (events[i].events & EPOLLOUT) {  
                sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);  
                int nwrite, data_size = strlen(buf);  
                n = data_size;  
                while (n > 0) {  
                    nwrite = write(fd, buf + data_size - n, n);  
                    if (nwrite < n) {  
                        if (nwrite == -1 && errno != EAGAIN) {  
                            perror("write error");  
                        }  
                        break;  
                    }  
                    n -= nwrite;  
                }  
                close(fd);  
            }  
        }  
    }  
    return 0;  
}  

go netpoller

Go netpoller 基本原理

Go netpoller 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

Go 是一门跨平台的编程语言,而不同平台针对特定的功能有不用的实现,这当然也包括了 I/O 多路复用技术,比如 Linux 里的 I/O 多路复用有 selectpollepoll,而 freeBSD 或者 MacOS 里则是 kqueue,而 Windows 里则是基于异步 I/O 实现的 iocp,等等;因此,Go 为了实现底层 I/O 多路复用的跨平台,分别基于上述的这些不同平台的系统调用实现了多版本的 netpollers

数据结构

type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

type FD struct {
   // Lock sysfd and serialize access to Read and Write methods.
  fdmu fdMutex

  // System file descriptor. Immutable until Close.
  Sysfd int

  // I/O poller.
  pd pollDesc

  // Writev cache.
  iovecs *[]syscall.Iovec

  // Semaphore signaled when file is closed.
  csema uint32

  // Non-zero if this file has been set to blocking mode.
  isBlocking uint32

  // Whether this is a streaming descriptor, as opposed to a
 // packet-based descriptor like a UDP socket. Immutable.  IsStream bool

  // Whether a zero byte read indicates EOF. This is false for a
 // message based socket connection.  ZeroReadIsEOF bool

  // Whether this is a file rather than a network socket.
  isFile bool
}

代码示例(服务端)

package main

import (
   "fmt"
 "log" "net")

func main() {
   listen, err := net.Listen("tcp", ":8888")
   if err != nil {
      log.Println("listen error: ", err)
      return
  }

   for {
      conn, err := listen.Accept()
      if err != nil {
         log.Println("accept error: ", err)
         break
  }

      // start a new goroutine to handle the new connection.
  go HandleConn(conn)
   }
}

func HandleConn(conn net.Conn) {
   defer conn.Close()
   packet := make([]byte, 1024)
   for {
      // block here if socket is not available for reading data.
  n, err := conn.Read(packet)
      if err != nil {
         log.Println("read socket error: ", err)
         return
  }
      fmt.Println(string(packet[:n]))
      // same as above, block here if socket is not available for writing.
  _, _ = conn.Write([]byte("hi"))
   }
}

代码示例(客户端)

package main

import (
    "fmt"
    "net"
    "testing"
)


func TestClient(t *testing.T) {
    conn, err := net.Dial("tcp", "127.0.0.1:8888")
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    packet := make([]byte, 1024)
    d := "hello"
    _, err = conn.Write([]byte(d))
    if err != nil {
        panic(err)
    }

    n, err := conn.Read(packet)
    if err != nil {
        panic(err)
    }
    fmt.Println(string(packet[:n]))
}

epoll对应方法

#include <sys/epoll.h>  
int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

net.Listen

调用 net.Listen 之后,底层会通过 Linux 的系统调用 socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD ,接着调用 netFD 的 listenStream 方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化

  1. 调用 epollcreate1 (netpollinit) 创建一个 epoll 实例 epfd,作为整个 runtime 的唯一 event-loop 使用;
  2. netpollBreakRd 通知信号量封装成 epollevent 事件结构体注册(netpollopen)进 epoll 实例。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服务端的 netFD 在 listen 时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列
  2. netFD 在 accept 时将返回的 connFD 也加入 epoll 的事件队列
  3. netFD 在读写时出现 syscall.EAGAIN 错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的 waitRead 中返回

Conn.Read/Conn.Write

read/write 的工作流程如下:

  1. accept 成功时会生成客户端connFD,并将fd 加入 epoll 的事件队列
  2. netFD 在read/write时出现 syscall.EAGAIN 错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的 waitRead 中返回
    核心是调用 syscall.Read/syscall.Write 系统调用

epoll_wait时机

首先,client 连接 server 的时候,listener 通过 accept 调用接收新 connection,每一个新 connection 都启动一个 goroutine 处理,accept 调用会把该 connection 的 fd 连带所在的 goroutine 上下文信息封装注册到 epoll 的监听列表里去,当 goroutine 调用 conn.Read 或者 conn.Write 等需要阻塞等待的函数时,会被 gopark 给封存起来并使之休眠,让 P 去执行本地调度队列里的下一个可执行的 goroutine,往后 Go scheduler 会在循环调度的 runtime.schedule() 函数以及 sysmon 监控线程中调用 runtime.netpoll 以获取可运行的 goroutine 列表并通过调用 injectglist 把剩下的 g 放入全局调度队列或者当前 P 本地调度队列去重新执行。

那么当 I/O 事件发生之后,netpoller 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 runtime.netpoll

runtime.netpoll 的核心逻辑是:

  1. 根据调用方的入参 delay,设置对应的调用 epollwait 的 timeout 值;
  2. 调用 epollwait 等待发生了可读/可写事件的 fd;
  3. 循环 epollwait 返回的事件列表,处理对应的事件类型, 组装可运行的 goroutine 链表并返回。

golang netpoller

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 2

文是好文,融合了很多文章的观点,但是记得要标明出处

2年前 评论
xuefeng (楼主) 2年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!