并发请求数据时遇到的问题

1. 运行环境

go version go1.25.0 windows/amd64

2. 问题描述?

一个json文件中有多个需要请求的url地址,形如 http://localhost:8080/:id,
创建json文件代码

func CreateJSONFile() {
    var urls []string
    for i := 0; i < 1000; i++ {
        urls = append(urls, fmt.Sprintf("http://localhost:8080/%d", i))
    }
    data, _ := json.Marshal(urls)
    file, err := os.Create("urls.json")
    if err != nil {
        fmt.Println("文件创建失败:", err)
        return
    }
    defer file.Close()
    _, err = file.Write(data)
    if err != nil {
        fmt.Println("写入文件失败:", err)
        return
    }
    fmt.Println("数据成功写入")
}

服务器端返回其ID值,

//java  spring代码
@RestController
public class Ctl {
    @GetMapping("/{id}")
    public String ctl(@PathVariable String id) {
        return id;
    }
}

对服务端进行并发请求,go代码如下(扇入/扇出)

import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "os"
    "sync"
)

var urls []string

func init() {
    bytes, err := os.ReadFile("urls.json")
    if err != nil {
        fmt.Println(err)
    }
    _ = json.Unmarshal(bytes, &urls)
}

func worker(url string, ch chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get(url)
    if err != nil {
        fmt.Println(err)
    }
    defer resp.Body.Close()
    body, _ := io.ReadAll(resp.Body)
    ch <- string(body)
}

func main() {
    var wg sync.WaitGroup
    var ret []string
    ch := make(chan string, 5)

    for _, url := range urls {
        wg.Add(1)
        go worker(url, ch, &wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    for s := range ch {
        ret = append(ret, s)
    }

    fmt.Println(ret)
}

当请求的url较少时,能够正常运行。

Go
当请求的url数量大于400时就会出错

Go

3. 您期望得到的结果?

正确的结果

4. 您实际得到的结果?

梦想星辰大海
最佳答案

错误原因是并发请求太多了,对方拒绝,可以加个队列,然后启动固定的消费者,从而达到限制请求并发数。

package main

import (
    "fmt"
    "io"
    "log"
    "net/http"
    "sync"
    "time"
)

func worker(url string, ch chan string) {
    resp, err := http.Get(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = resp.Body.Close()
    }()
    body, _ := io.ReadAll(resp.Body)
    ch <- string(body)
}

func main() {
    go func() {
        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
            http.Error(w, r.URL.Query().Get("id"), http.StatusOK)
        })
        _ = http.ListenAndServe("127.0.0.1:8080", nil)
    }()
    time.Sleep(1 * time.Second)
    p := NewParallel(10)
    ch := make(chan string, 5)
    t := time.Now()
    go func() {
        for i := 0; i < 1000; i++ {
            fixI := i
            p.Add(func() {
                worker(fmt.Sprintf("http://localhost:8080/?id=%d", fixI), ch)
            })
        }
        p.Wait()
        close(ch)
    }()
    var ret []string
    for s := range ch {
        ret = append(ret, s)
    }
    fmt.Println(time.Since(t), len(ret))
}

type Parallel struct {
    queue chan func()
    wg    sync.WaitGroup
}

func (r *Parallel) Add(fn func()) {
    r.queue <- fn
}

func (r *Parallel) Wait() {
    close(r.queue)
    r.wg.Wait()
}

func (r *Parallel) run(fn func()) {
    defer func() {
        if err := recover(); err != nil {
            log.Println("Parallel panic", "err", err)
        }
    }()
    fn()
}

func NewParallel(worker int) *Parallel {
    p := &Parallel{
        queue: make(chan func(), worker*2),
        wg:    sync.WaitGroup{},
    }
    for i := 0; i < worker; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for fn := range p.queue {
                p.run(fn)
            }
        }()
    }
    return p
}
2周前 评论
讨论数量: 6

从错误信息看,是请求的接口服务主动拒绝

3周前 评论
terryzzh (楼主) 3周前
terryzzh (楼主) 3周前
梦想星辰大海 2周前

java默认自带连接池,go不自带。

限制协程数量比如100个,httpclient配置transport,比如:

var client = &http.Client{
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 100,
    },
    Timeout: 5 * time.Second, // 防止卡死
}

http.Get(url) 判断err,如果err不为空,那么resp就是nil。肯定会报错。

3周前 评论
梦想星辰大海

错误原因是并发请求太多了,对方拒绝,可以加个队列,然后启动固定的消费者,从而达到限制请求并发数。

package main

import (
    "fmt"
    "io"
    "log"
    "net/http"
    "sync"
    "time"
)

func worker(url string, ch chan string) {
    resp, err := http.Get(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        _ = resp.Body.Close()
    }()
    body, _ := io.ReadAll(resp.Body)
    ch <- string(body)
}

func main() {
    go func() {
        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
            http.Error(w, r.URL.Query().Get("id"), http.StatusOK)
        })
        _ = http.ListenAndServe("127.0.0.1:8080", nil)
    }()
    time.Sleep(1 * time.Second)
    p := NewParallel(10)
    ch := make(chan string, 5)
    t := time.Now()
    go func() {
        for i := 0; i < 1000; i++ {
            fixI := i
            p.Add(func() {
                worker(fmt.Sprintf("http://localhost:8080/?id=%d", fixI), ch)
            })
        }
        p.Wait()
        close(ch)
    }()
    var ret []string
    for s := range ch {
        ret = append(ret, s)
    }
    fmt.Println(time.Since(t), len(ret))
}

type Parallel struct {
    queue chan func()
    wg    sync.WaitGroup
}

func (r *Parallel) Add(fn func()) {
    r.queue <- fn
}

func (r *Parallel) Wait() {
    close(r.queue)
    r.wg.Wait()
}

func (r *Parallel) run(fn func()) {
    defer func() {
        if err := recover(); err != nil {
            log.Println("Parallel panic", "err", err)
        }
    }()
    fn()
}

func NewParallel(worker int) *Parallel {
    p := &Parallel{
        queue: make(chan func(), worker*2),
        wg:    sync.WaitGroup{},
    }
    for i := 0; i < worker; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for fn := range p.queue {
                p.run(fn)
            }
        }()
    }
    return p
}
2周前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!