go io操作

go io操作

go相关的sdk 具体代码在此,您的star 、点赞评论都是我更新的动力

理论知识

wxler.github.io/2021/02/19/134758/

相关概念

io操作在日常开发中接触的基本上都是磁盘io和网络io

input 就是将磁盘或网卡中的数据读取到我们用户空间的内存中。

output 将用户空间内存中的数据写到磁盘或网卡中。

具体流程是 读:磁盘/网卡–>内核缓冲区–>用户空间,写则相反。

img

拓展知识dma

wxler.github.io/2021/02/19/134758/...

从磁盘/网卡拷贝数据到内核缓冲区的工作由cpu负责,转为磁盘/网卡自己来做,提高cpu的效率。

io类型

概念:文件描述符。linux系统中一切都是文件, 文件描述符(File Descriptor, FD)是操作系统内核为每个打开的文件分配的一个非负整数,用于唯一标识该文件 。可以抽象理解为操作系统读取文件的接口。

如何让操作系统知道该文件描述符已经准备好了被读取和被写入?

1、BIO (Blocking I/O),称之为同步阻塞I/O ,每个文件描述符启动一个线程,检查该文件描述符是否就绪,如果没就是就阻塞等待该文件描述符就绪。

2、NIO (Non-blocking IO),称之为非阻塞IO,只启动一个线程,不断检查所有文件描述符是否就绪,如果没有就绪就立刻返回,而检查的过程会进行系统调用消耗资源

3、IO多路复用。多路复用IO把轮询多个文件描述符放在内核空间里执行,即让内核负责监听所有的文件描述符(这样就不会有用户态和系统态的切换),当有文件描述符就绪,就将这些文件描述符返回给用户进程。

img

多路复用模式包含三种,即select、poll和epoll,这几种模式主要区别在于对就绪的文件描述符的方式

select 最多能将1024个文件描述符加入到监听的集合中,当有文件描述符就绪后立刻返回。select只知道有文件描述符就绪了,但不知道是哪个,任然需要变量整个集合找出就绪的。

poll poll和select类似,但是poll使用链表存储文件描述符,没有1024的限制。

epoll 使用红黑树,存储监听的文件描述符。当文件描述符就绪后使用链表存储就绪的文件描述符。不用像select和poll遍历所有监听的文件描述符找出就绪的, epoll的工作模式 水平触发(LT模式),边缘触发(ET模式)

LT如何文件描述符就绪,你不处理则会一直通知,ET只会通知一次。

4、**AIO ( Asynchronous I/O)**:异步非阻塞I/O模型。传输过程如下:

img

可以看到,异步非阻塞I/O在判断数据有没有准备好(即文件描述符是否就绪)和真正读数据两个阶段都是非阻塞的。AIO在第一次执行系统调用后,会注册一个回调函数,内核在检测到某文件描述符是否就绪,调用该回调函数执行真正的读操作,将数据从内核空间拷贝到用户空间,然后返回给用户使用。在整个过程,用户进程都是非阻塞状态,可以做其它的事情。类似js的回调函数。

零拷贝

将一个文件通过网络发送的流程:

磁盘数据–>系统内核缓冲区–>用户空间–>系统内核缓冲区–>网卡.涉及多次将数据四次缓冲区的拷贝,并经历了四次内核态和用户态的切换。

下图是不使用zero copy的网络IO传输过程:

img

零拷贝则不用将数据拷贝到用户空间。直接在两个内核缓冲区之间拷贝。

ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count)

go相关sdk

io多路复用

go 主要是使用

“golang.org/x/sys/unix”, “syscall”这两个包

使用epoll创建一个tcp服务器

//go:build linux
// +build linux

package main

import (
    "errors"
    "log"
    "syscall"
)

func main() {
    // 创建 socket
    fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
    if err != nil {
        log.Fatalf("Error creating socket: %s", err)
    }
    defer syscall.Close(fd)

    //// 设置 socket 为非阻塞模式
    if err = syscall.SetNonblock(fd, true); err != nil {
        log.Fatalf("Error setting non-blocking mode: %s", err)
    }

    // 绑定地址和端口
    addr := syscall.SockaddrInet4{Port: 8080}
    copy(addr.Addr[:], []byte{0, 0, 0, 0}) // 绑定到所有 IP 地址
    if err = syscall.Bind(fd, &addr); err != nil {
        log.Fatalf("Error binding socket: %s", err)
    }

    // 开始监听
    if err = syscall.Listen(fd, syscall.SOMAXCONN); err != nil {
        log.Fatalf("Error listening on socket: %s", err)
    }

    // 创建 epoll 实例
    epfd, err := syscall.EpollCreate1(0)
    if err != nil {
        log.Fatalf("Error creating epoll instance: %s", err)
    }
    defer syscall.Close(epfd)

    // 将监听 socket 添加到 epoll 实例
    if err = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{
        Events: syscall.EPOLLIN,
        Fd:     int32(fd),
    }); err != nil {
        log.Fatalf("Error adding listener to epoll: %s", err)
    }

    // 事件循环
    events := make([]syscall.EpollEvent, 10)
    for {
        n, err := syscall.EpollWait(epfd, events, -1)
        if err != nil {
            if errors.Is(err, syscall.EINTR) {
                log.Printf("Epoll wait was interrupted, retrying...")
                continue
            }
            log.Fatalf("Error waiting for epoll events: %s", err)
        }
        log.Printf("Received %d events", n)
        for i := 0; i < n; i++ {
            if events[i].Fd == int32(fd) {
                // 接受新连接
                nfd, _, err := syscall.Accept(fd)
                if err != nil {
                    if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) {
                        log.Printf("No connections are available to accept")
                        continue
                    }
                    log.Printf("Error accepting connection: %s", err)
                    continue
                }

                // 设置 socket 为非阻塞模式
                if err = syscall.SetNonblock(nfd, true); err != nil {
                    log.Fatalf("Error setting non-blocking mode: %s", err)
                }
                // 将新连接添加到 epoll 实例
                if err = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, nfd, &syscall.EpollEvent{
                    Events: syscall.EPOLLIN | -syscall.EPOLLET,
                    Fd:     int32(nfd),
                }); err != nil {
                    log.Printf("Error adding connection to epoll: %s", err)
                    syscall.Close(nfd)
                    continue
                }

                log.Printf("Accepted new connection")
            } else {
                log.Printf("Ready for connection %+v", events[i])
                // 处理已就绪的连接
                cfd := int(events[i].Fd)
                go handleConnection(cfd, epfd)
            }
        }
    }
}

func handleConnection(cfd int, epfd int) {
    // defer syscall.Close(cfd)
    buf := make([]byte, 1024)

    for {
        n, err := syscall.Read(cfd, buf)
        if err != nil {
            log.Printf("Error reading from connection: %s", err)
            if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EBADF) {
                // 非阻塞模式下无数据可读
                break
            }
            return
        }
        if n == 0 {
            // 连接已关闭
            log.Printf("Connection closed")
            return
        }
        log.Printf("Received data: %s", string(buf[:n]))
        // 回显数据
        if _, err = syscall.Write(cfd, buf[:n]); err != nil {
            log.Printf("Error writing to connection: %s", err)
            return
        }
        return
    }
}

零拷贝

sendfile

sendfile 实现大文件不经过用户空间,直接传输。

//go:build linux
// +build linux

package main

import (
    "fmt"
    "golang.org/x/sys/unix"
    "log"
    "net/http"
    "os"
)

// UploadHandler handles file upload and saves it using sendfile
func UploadHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
        http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
        return
    }

    // 10mb以上存临时文件,否则存储内存。
    err := r.ParseMultipartForm(10 << 20) // 10 MB
    if err != nil {
        log.Printf("Error parsing form: %v", err)
        http.Error(w, "Error parsing form", http.StatusBadRequest)
        return
    }

    // Get the file from form data
    file, handler, err := r.FormFile("file")
    if err != nil {
        log.Printf("Error retrieving file: %v", err)
        http.Error(w, "Error retrieving file", http.StatusBadRequest)
        return
    }
    defer file.Close()

    // Create a destination file
    dstPath := handler.Filename
    dstFile, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
    if err != nil {
        log.Printf("Error creating destination file: %v", err)
        http.Error(w, "Error creating destination file", http.StatusInternalServerError)
        return
    }
    defer dstFile.Close()

    // Get file descriptors
    srcFd := int(file.(*os.File).Fd())
    dstFd := int(dstFile.Fd())

    // Get the file size
    fi, err := file.(*os.File).Stat()
    if err != nil {
        log.Printf("Error getting file info: %v", err)
        http.Error(w, "Error getting file info", http.StatusInternalServerError)
        return
    }
    fileSize := fi.Size()

    // Use sendfile to transfer data
    _, err = unix.Sendfile(dstFd, srcFd, nil, int(fileSize))
    if err != nil {
        log.Printf("Error using sendfile: %v", err)
        http.Error(w, "Error using sendfile", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "File uploaded successfully: %s\n", handler.Filename)
}

func main() {
    http.HandleFunc("/upload", UploadHandler)
    fmt.Println("Server started at :8080")
    http.ListenAndServe(":8080", nil)
}

mmap

mmap 是内存映射文件(Memory Mapped File)的缩写,是一种将文件的内容映射到进程的虚拟地址空间中的技术。通过 mmap,应用程序可以将文件或设备(如物理内存)的内容与内存地址关联,从而可以像访问内存一样访问文件内容。这种技术可以提高文件读写的效率,特别是对于大文件的操作,因为它避免了将文件内容拷贝到用户空间的过程。

package main

import (
    "fmt"
    "golang.org/x/exp/mmap"
    "log"
)

func main() {
    // 打开文件进行内存映射
    reader, err := mmap.Open("example.txt")
    if err != nil {
        log.Fatalf("Error opening file: %v", err)
    }
    defer reader.Close()

    // 获取文件大小
    size := reader.Len()
    fmt.Printf("File size: %d bytes\n", size)

    // 读取文件内容
    data := make([]byte, size)
    _, err = reader.ReadAt(data, 0)
    if err != nil {
        log.Fatalf("Error reading file: %v", err)
    }

    // 打印文件内容
    fmt.Println("File content:")
    fmt.Println(string(data))
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!