Go并发请求多个API

公司之前的很多项目都是PHP写的,有些功能接口返回的数据量比较大,所以业务上会按照数据模块拆分成多个内部接口,在主接口中并发调用,最后整理数据结果集后返回。正好最近在学习Go,想着这个并发请求封装类用Go应该怎么封装,正好学习一下。本着学习的想法发上来,有问题的地方欢迎大佬指正,一起学习。

请求库接口代码utils/multi_requester.go

package utils

import (
    "bytes"
    "encoding/json"
    "errors"
    "io"
    "log"
    "net/http"
    "sync"
    "time"
)

type MultiRequester struct {
    client             *http.Client
    requestPool        []*http.Request
    requestBox         map[*http.Request]map[string]interface{}
    requestID          string
    defaultInnerDomain string
    mu                 sync.Mutex
    responseCache      map[*http.Request]map[string]interface{}
}

func NewMultiRequester(timeout time.Duration) *MultiRequester {
    mr := &MultiRequester{
        client:             &http.Client{},
        requestPool:        make([]*http.Request, 0),
        requestBox:         make(map[*http.Request]map[string]interface{}),
        requestID:          "0",
        defaultInnerDomain: "",
        responseCache:      make(map[*http.Request]map[string]interface{}), // 初始化响应缓存
    }

    if timeout.Seconds() > 0 {
        mr.client.Timeout = timeout
    }

    return mr
}

func (mr *MultiRequester) getDefaultDomain() string {
    if mr.defaultInnerDomain == "" {
        // Implement the logic to get the default inner domain here
        // You can use a configuration file or a constant value
        // Example:
        // mr.defaultInnerDomain = "https://example.com"
    }
    return mr.defaultInnerDomain
}

func (mr *MultiRequester) Add(url string, params map[string]interface{}) *http.Request {
    commonParams := mr.getCommonParams()
    for k, v := range params {
        commonParams[k] = v
    }

    jsonParams, _ := json.Marshal(commonParams)
    req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonParams))
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("User-Agent", "your_user_agent")

    mr.requestPool = append(mr.requestPool, req)
    mr.requestBox[req] = map[string]interface{}{"url": url, "params": params}

    return req
}

func (mr *MultiRequester) Exec() {
    var wg sync.WaitGroup

    for _, req := range mr.requestPool {
        wg.Add(1)
        go func(req *http.Request) {
            defer wg.Done()
            resp, err := mr.doRequest(req)

            if err != nil {
                // 处理请求错误
                log.Println("Error executing request:", err)
                return
            }

            // 将响应缓存到 responseCache 中
            mr.mu.Lock()
            mr.responseCache[req] = resp
            mr.mu.Unlock()
        }(req)
    }

    wg.Wait()
}

func (mr *MultiRequester) doRequest(req *http.Request) (map[string]interface{}, error) {
    resp, err := mr.client.Do(req)
    if err != nil {
        // Handle the error
        log.Println("client do err: ", err)
        return nil, err
    }

    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        // Handle the error
        log.Println("client read resp err: ", err)
        return nil, err
    }

    if resp.StatusCode != http.StatusOK {
        // 处理非 200 响应
        return nil, errors.New("Non-200 HTTP status code")
    }

    var responseMap map[string]interface{}
    err = json.Unmarshal(body, &responseMap)
    if err != nil {
        log.Println("client unmarshal err: ", err)
        return nil, err
    }

    return responseMap, nil
}

func (mr *MultiRequester) getCommonParams() map[string]interface{} {
    commonParams := map[string]interface{}{
        "inner_request": 1,
        "request_id":    mr.requestID,
    }
    return commonParams
}

func (mr *MultiRequester) GetContent(req *http.Request) map[string]interface{} {
    // Implement logic to extract content from the response
    // You can use a similar approach as in the "doRequest" method
    // Return the content as a map[string]interface{}
    mr.mu.Lock()
    defer mr.mu.Unlock()

    // 从 responseCache 中获取响应
    if resp, ok := mr.responseCache[req]; ok {
        return resp
    }

    // 如果请求不存在于 responseCache 中,返回 nil
    return nil
}

func (mr *MultiRequester) Cleanup() {
    mr.mu.Lock()
    defer mr.mu.Unlock()

    for req := range mr.requestBox {
        delete(mr.requestBox, req)
    }
}

// You can add additional methods and functionality as needed

测试代码utils/multi_requester_test.go

package utils

import (
    "fmt"
    "testing"
    "time"
)

func TestMultiRequester(t *testing.T) {
    begin := time.Now()

    // 创建一个 MultiRequester 实例,设置超时时间为5秒
    mr := NewMultiRequester(5 * time.Second)

    // 添加请求到 MultiRequester
    params1 := map[string]interface{}{
        "key1": "中国",
        "key2": "美国",
    }
    req1 := mr.Add("http://localhost:8080/endpoint1", params1)

    params2 := map[string]interface{}{
        "key3": "唐朝",
        "key4": "宋朝",
    }
    req2 := mr.Add("http://localhost:8080/endpoint2", params2)

    // 执行所有请求
    mr.Exec()

    // 获取请求1的响应内容
    content1 := mr.GetContent(req1)
    fmt.Println("Response from endpoint 1:", content1)

    // 获取请求2的响应内容
    content2 := mr.GetContent(req2)
    fmt.Println("Response from endpoint 2:", content2)

    // 清理资源
    mr.Cleanup()

    fmt.Printf("\ncost: %dms\n", time.Since(begin).Milliseconds())
}

模拟API接口 api.go

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

// 定义一个结构体来表示请求的参数
type RequestBody1 struct {
    Key1 string `json:"key1"`
    Key2 string `json:"key2"`
}

type RequestBody2 struct {
    Key3 string `json:"key3"`
    Key4 string `json:"key4"`
}

// 定义一个结构体来表示响应的结果
type ResponseBody struct {
    Message string `json:"message"`
    Name    string `json:"name"`
    City    string `json:"city"`
}

func main() {
    // 创建一个 HTTP 服务器,监听端口 8080
    http.HandleFunc("/endpoint1", handleEndpoint1)
    http.HandleFunc("/endpoint2", handleEndpoint2)

    fmt.Println("Server is listening on :8080...")
    http.ListenAndServe(":8080", nil)
}

// 处理第一个 POST 请求的函数
func handleEndpoint1(w http.ResponseWriter, r *http.Request) {
    // 检查请求方法是否为 POST
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    // 解析请求体中的 JSON 数据
    var reqBody RequestBody1
    err := json.NewDecoder(r.Body).Decode(&reqBody)
    if err != nil {
        http.Error(w, "Invalid request body", http.StatusBadRequest)
        return
    }

    // 延迟1秒
    time.Sleep(1 * time.Second)

    // 处理请求参数并生成响应
    message := fmt.Sprintf("Received request with key1=%s and key2=%s", reqBody.Key1, reqBody.Key2)
    response := ResponseBody{Message: message, Name: reqBody.Key1, City: reqBody.Key2}

    // 将响应转换为 JSON 并返回给客户端
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// 处理第二个 POST 请求的函数
func handleEndpoint2(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    var reqBody RequestBody2
    err := json.NewDecoder(r.Body).Decode(&reqBody)
    if err != nil {
        http.Error(w, "Invalid request body", http.StatusBadRequest)
        return
    }

    // 延迟8秒
    time.Sleep(8 * time.Second)

    // 处理请求参数并生成响应
    message := fmt.Sprintf("Received request with key3=%s and key4=%s at endpoint2", reqBody.Key3, reqBody.Key4)
    response := ResponseBody{Message: message, Name: reqBody.Key3, City: reqBody.Key4}

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!