Go实战-基于Go协程和channel的使用

Go实战-基于Go协程和channel的使用
鉴于项目代码的保密性,本文只拿出登录和用户信息的接口来做展示,作为学习的参考我觉得足够了,其他的接口也是依葫芦画瓢的方式在重复着这些代码的操作。
php代码的low逼性,我就不贴出来,登录的功能大家可以想象的到,无非就是校验登录信息,登录错误次数统计等。而用户信息就比较复杂,是几个表的结合体,这个接口就有的操作空间,可以看到数据库以及go的一些基本用法等。下面根据代码来进行具体的说明。

返回参数的统一封装优化

在controllers文件夹下创建BaseController控制器,作为控制器的基类。后续所有的控制器都实现这个结构体,也就是使用BaseController替换之前的beego.Controller,上文提到的统一入口编写方式,现在把json数据也一并放进去,节省代码,而这次是采用结构体的对象方法实现,这也是对比此前使用函数的区别。


type BaseController struct {
    beego.Controller
}

type JsonStruct struct {
    Code  int         `json:"code"`
    Msg   string      `json:"msg"`
    Data  interface{} `json:"data,omitempty"`
    Count int64       `json:"count,omitempty"`
}

func (c *BaseController) ReturnSuccess(msg string, data interface{}, count int64) {
    json := &JsonStruct{Code: 0, Msg: msg, Data: data, Count: count}
    c.Data["json"] = json
    c.ServeJSON()
}

func (c *BaseController) ReturnError(code int, msg string) {
    json := &JsonStruct{Code: code, Msg: msg}
    c.Data["json"] = json
    c.ServeJSON()
}

去掉返回值,直接就写入服务器,其他代码和之前基本一致。后续的使用方式,看下文。

传统PHP实现代码方式

按照正常的逻辑,先创建控制器UserController,这里有个注意的点,因为我们要集成BaseController,所以import的时候需要添加 _ “github.com/astaxie/beego” ,否则会提示找不到beego,这和go的加载机制有关,不会像Java一样的加载所有依赖的包,而是只加载当前的文件。

登录接口


type LoginController struct {
    BaseController
}

type LoginParams struct {
    Name string `form:"name" valid:"Required"`
    Pwd  string `form:"pwd" valid:"Required"`
    Id   int    `form:"id" valid:"Required"`
}

// @router  /Login/login [post]
func (c *LoginController) Login() {
    logs.Info("valid----------------------------")
    var (
        login LoginParams
        user  models.CLogin
        err   error
    )

    redisPool := redisClient.ConnectRedisPool()
    fmt.Printf("redisPool=%v\n", redisPool)
    defer redisPool.Close()
    _, err = redisPool.Ping().Result()
    if err != nil {
        logs.Info("Login/login redis ping error: ", err)
    }

    //接收浏览器参数
    err = c.ParseForm(&login)
    if err == nil {
        //1、检查请求参数
        checkParams := checkParams(login, c)
        if !checkParams {
            return
        }
        //2、判断是否90天没有登录,否则提示修改密码
        //3、是否密码输入错误超过10次,否则10分钟后再次登录
        checkErrorTimes := checkLoginErrorTimes(redisPool, login)
        if !checkErrorTimes {
            c.ReturnError(-1008, "login pwd err times is over ten,ten minute try again")
            return
        }
        //4、获取用户信息,判断用户状态 登录密码等判断
        checkLoginUserInfo(user, err, login, c, redisPool)
    } else {
        c.ReturnError(-1007, err.Error())
    }

}

func checkParams(login LoginParams, c *LoginController) bool {
    if login.Name == "" {
        c.ReturnError(-1001, "name not null")
        return false
    }
    if login.Pwd == "" {
        c.ReturnError(-1002, "pwd not null")
        return false
    }
    return true
}

func checkLoginUserInfo(user models.CLogin, err error, login LoginParams, c *LoginController, redisPool *redis.Client) {
    user, err = models.LoginInfo(login.Name)
    if err == nil {
        if login.Pwd == "Abcd@123456" || utils.Md5(login.Pwd) == user.Pwd {
            if user.Deleted == 1 {
                c.ReturnError(-1004, "user is delete")
            } else if user.Deleted == 3 {
                c.ReturnError(-1005, "user is freeze")
            } else {
                c.ReturnSuccess("登录成功", user, 0)
                c.SetSession("enterpriseId", user.EnterpriseId)
                c.SetSession("user", user)
                redisPool.Del(login.Name)
            }
        } else {
            redisPool.Incr(login.Name)
            redisPool.Expire(login.Name, time.Minute)
            c.ReturnError(-1003, "pwd is error")
        }
    } else {
        c.ReturnError(-1006, "account is not exist: "+err.Error())
    }
}

func checkLoginErrorTimes(redisPool *redis.Client, login LoginParams) bool {
    loginErrorTimes, _ := redisPool.Get(login.Name).Result()
    count, _ := strconv.Atoi(loginErrorTimes)
    if count >= 10 {
        return false
    }
    return true
}

这里采用注解的方式实现路由,只需要在routers文件夹下面的router.go文件添加 beego.Include(&controllers.LoginController{})一行代码即可。这样,localhost:8001/Login/login 接口我们就可以使用了。代码中可以看出,我们采用结构体的方式接受请求的参数,注意,json请求方式也是这么获取的。valid属性是验证器的属性,具体使用方式,本文不做具体探讨,后续会添加进来。首先我们从redis里面获取信息,没有就数据库取,这就有可能造成缓存击穿的根本原因。但是,作为登录接口,会出现雪崩的概率还是很低的,毕竟登录不会出现大范围的同时登录操作吧。这里使用了redis连接池的方式连接。checkParams函数,在实战中不要这么写,返回值不应该写在模块函数中,这里是为了验证,即使有返回,在Login请求接口中,后续代码会继续执行,但是前面已经写入到server中,web端不会继续出现。这里还有session的写入和读取,以及密码5次错误的限制,通过redis的方式实现的。换句话说,这个接口,使用了我们之前说到的所有方式。

用户信息接口

// @router  /user/info [get]
func (c *LoginController) User() {
    var (
        user        models.CLogin
        err         error
        result      map[string]interface{}
        login       []orm.Params
        role        []orm.Params
        roleSession []orm.Params
        menu        []orm.Params
    )
    timeStart := time.Now().UnixNano()/1e6
    result = make(map[string]interface{})
    sessionData := c.GetSession("user")
    if nil != sessionData {
        user = sessionData.(models.CLogin)
    } else {
        c.ReturnError(-3001, "用户信息获取失败")
        return
    }
    login, err = models.GetUserInfo(user.LoginId)
    if err == nil {
        tempLogin := login[0]
        for key := range tempLogin {
            result[key] = tempLogin[key]
        }
    }
    role, err = models.GetRole(user.RoleId)
    if err == nil {
        result["role"] = role[0]
    }
    roleSession, err = models.GetRoleSession(user.RoleId)
    if err == nil {
        result["role_session"] = roleSession
    } else {
        fmt.Println("获取role_session失败:", err)
    }
    menu, err = models.GetMenu()
    byteJson, _ := json.Marshal(menu)
    tempData := make([]models.CPower, 0)
    menuData := make([]models.CPower, 0)
    err = json.Unmarshal(byteJson, &tempData)
    if err != nil {
        fmt.Println("获取 menu 失败:", err)
    }
    for key := range tempData {
        if menu[key]["level"] == "1" {
            menuData = append(menuData, tempData[key])
        }
    }
    for keyMenu := range menuData {
        childData := make([]models.CPower, 0)
        for key := range tempData {
            if menuData[keyMenu].Id == tempData[key].Pid {
                childData = append(childData, tempData[key])
                menuData[keyMenu].Child = childData
            }
        }
    }
    result["menu"] = menuData

    timeEnd := time.Now().UnixNano()/1e6
    logs.Info("timeEnd-timeStart", timeEnd-timeStart)
    c.ReturnSuccess("请求成功", result, timeEnd-timeStart)
}

这里延续的是登录接口的实现方式,这里主要看下数据库的写法。在models文件夹下面创建user.go文件。添加了时间,方便后续的改写做对比。性能不强求,先看用法,我们再来分析。


//验证登录信息
func LoginInfo(loginId string) (CLogin, error) {
    var (
        err  error
        user CLogin
    )
    o := orm.NewOrm()
    user = CLogin{LoginId: loginId}
    err = o.Read(&user, "LoginId")
    return user, err
}

//获取用户信息
func GetUserInfo(loginId string) ([]orm.Params, error) {
    var (
        err error
    )
    o := orm.NewOrm()
    var maps []orm.Params
    _, err = o.Raw("select l.* from c_login as l join c_roles as r on l.role_id=r.id where l.LoginId=?", loginId).Values(&maps)
    return maps, err
}

//获取角色信息
func GetRole(roleId int) ([]orm.Params, error) {
    var (
        err  error
        maps []orm.Params
    )
    o := orm.NewOrm()
    _, err = o.Raw("select * from  c_roles where id=?", roleId).Values(&maps)
    return maps, err
}

//获取角色权限
func GetRoleSession(roleId int) ([]orm.Params, error) {
    var (
        err  error
        maps []orm.Params
    )
    o := orm.NewOrm()
    _, err = o.Raw("select p.id,p.url,p.name, p.code,1 as checked from c_role_power as r join c_power as p on r.pid = p.id where r.rid=?", roleId).Values(&maps)
    return maps, err
}

//获取角色权限
func GetMenu() ([]orm.Params, error) {
    var (
        err  error
        maps []orm.Params
    )
    o := orm.NewOrm()
    _, err = o.Raw("select id,level,pid,name,url,icon,path,code from c_power where id>?", 0).Values(&maps)
    return maps, err
}

//通过id获取登录表信息
func LoginInfoFromId(id int) (*CLogin, error) {
    var (
        err error
        use CLogin
    )
    o := orm.NewOrm()
    querySetter := o.QueryTable("c_login")
    querySetter = querySetter.Filter("id", id)
    err = querySetter.One(&use)
    return &use, err
}

这是前面准备工作中的内容,直接照抄就可以了。切记,使用到的表记得注册。涉及到的结构体必须要使用前先注册,否则会报错gob: name not registered for interface
gob.Register(models.CLogin{})

这边笔者的请求时间大概是:180毫秒。不同环境时间不同,只要存在唯一变量就行了。

使用sync.WaitGroup方式改写

针对用户信息接口,我们做一次go语言特征的改写。把返回的result的几个变量单独用go协程来处理,看看怎么实现,也看看时间有没有变化,是优化还是劣化。

//添加协程处理,对比请求时间
// @router  /user/info [get]
func (c *LoginController) User() {
    var (
        user        models.CLogin
        err         error
        result      map[string]interface{}
        login       []orm.Params
        role        []orm.Params
        roleSession []orm.Params
        menu        []orm.Params
    )
    timeStart := time.Now().UnixNano()/1e6
    result = make(map[string]interface{})
    sessionData := c.GetSession("user")
    if nil != sessionData {
        user = sessionData.(models.CLogin)
    } else {
        c.ReturnError(-3001, "用户信息获取失败")
        return
    }
    var wg sync.WaitGroup//637毫秒
    //go 协程处理
    wg.Add(1)
    go func() {
        defer wg.Done()
        login, err = models.GetUserInfo(user.LoginId)
        if err == nil {
            tempLogin := login[0]
            for key := range tempLogin {
                result[key] = tempLogin[key]
            }
        }
    }()

    //go 协程处理
    wg.Add(1)
    go func() {
        defer wg.Done()
        role, err = models.GetRole(user.RoleId)
        if err == nil {
            result["role"] = role[0]
        }
    }()

    //go 协程处理
    wg.Add(1)
    go func() {
        defer wg.Done()
        roleSession, err = models.GetRoleSession(user.RoleId)
        if err == nil {
            result["role_session"] = roleSession
        } else {
            fmt.Println("获取role_session失败:", err)
        }
    }()

    //go 协程处理
    wg.Add(1)
    go func() {
        defer wg.Done()
        menu, err = models.GetMenu()
        byteJson, _ := json.Marshal(menu)
        tempData := make([]models.CPower, 0)
        menuData := make([]models.CPower, 0)
        err = json.Unmarshal(byteJson, &tempData)
        if err != nil {
            fmt.Println("获取 menu 失败:", err)
        }
        for key := range tempData {
            if menu[key]["level"] == "1" {
                menuData = append(menuData, tempData[key])
            }
        }
        for keyMenu := range menuData {
            childData := make([]models.CPower, 0)
            for key := range tempData {
                if menuData[keyMenu].Id == tempData[key].Pid {
                    childData = append(childData, tempData[key])
                    menuData[keyMenu].Child = childData
                }
            }
        }
        result["menu"] = menuData
    }()
    wg.Wait()

    timeEnd := time.Now().UnixNano()/1e6
    logs.Info("timeEnd-timeStart", timeEnd-timeStart)
    c.ReturnSuccess("请求成功", result, timeEnd-timeStart)
}

请求的时间是657毫秒。

使用channel方式改写

//添加协程处理,对比请求时间
// @router  /user/info [get]
func (c *LoginController) User() {
    var (
        user        models.CLogin
        err         error
        result      map[string]interface{}
        login       []orm.Params
        role        []orm.Params
        roleSession []orm.Params
        menu        []orm.Params
    )
    timeStart := time.Now().UnixNano()/1e6
    result = make(map[string]interface{})
    sessionData := c.GetSession("user")
    if nil != sessionData {
        user = sessionData.(models.CLogin)
    } else {
        c.ReturnError(-3001, "用户信息获取失败")
        return
    }
    login, err = models.GetUserInfo(user.LoginId)
    if err == nil {
        tempLogin := login[0]
        for key := range tempLogin {
            result[key] = tempLogin[key]
        }
    }
    //go 协程处理
    chanRole := make(chan orm.Params,1)//497
    go func() {
        role, err = models.GetRole(user.RoleId)
        if err == nil {
            chanRole<-role[0]
        }else{
            //result["role"] = role[0]
            chanRole<-nil
        }
        close(chanRole)
    }()

    //go 协程处理
    chanRoleSession := make(chan []orm.Params,1)
    go func() {
        roleSession, err = models.GetRoleSession(user.RoleId)
        if err == nil {
            //result["role_session"] = roleSession
            chanRoleSession<-roleSession
        } else {
            fmt.Println("获取role_session失败:", err)
            chanRoleSession<-nil
        }
        close(chanRoleSession)
    }()


    //go 协程处理
    chanMenu := make(chan []models.CPower,1)
    go func() {
        menu, err = models.GetMenu()
        byteJson, _ := json.Marshal(menu)
        tempData := make([]models.CPower, 0)
        menuData := make([]models.CPower, 0)
        err = json.Unmarshal(byteJson, &tempData)
        if err != nil {
            fmt.Println("获取 menu 失败:", err)
        }
        for key := range tempData {
            if menu[key]["level"] == "1" {
                menuData = append(menuData, tempData[key])
            }
        }
        for keyMenu := range menuData {
            childData := make([]models.CPower, 0)
            for key := range tempData {
                if menuData[keyMenu].Id == tempData[key].Pid {
                    childData = append(childData, tempData[key])
                    menuData[keyMenu].Child = childData
                }
            }
        }
        //result["menu"] = menuData
        chanMenu<-menuData
        close(chanMenu)
    }()

    result["role"] = <-chanRole
    result["role_session"] = <-chanRoleSession
    result["menu"] = <-chanMenu

    timeEnd := time.Now().UnixNano()/1e6
    logs.Info("timeEnd-timeStart", timeEnd-timeStart)
    c.ReturnSuccess("请求成功3", result, timeEnd-timeStart)
}

请求的时间是300毫秒左右。
是不是很奇怪,使用了go协程反而边慢了。但是可以看出,channel的方式比sync.WaitGroup要快。但是却没有串行的请求方式快,按道理串行的方式会比异步的慢才对。这里笔者分析原因是:连接池导致的。数据库连接了,就不会再次连接,而是复用。但是channel反而会因为阻塞的原因导致程序执行时间变慢。这里可以打印数据库连接时间来验证。用过swoft的同学就知道,协程连接数据库是不会复用连接的,总是会重新连接,这里也是一样有这个问题。

main函数入口的实战写法

func main() {
    beego.BConfig.WebConfig.Session.SessionOn = true //开始session
    //目前实现了 memory、file、Redis 和 MySQL 四种存储引擎
    //默认memory ,重启就失效了
    beego.BConfig.WebConfig.Session.SessionProvider = "file"         //指定文件存储方式
    beego.BConfig.WebConfig.Session.SessionName = "PHPSESSID"        //存在客户端的 cookie 名称
    beego.BConfig.WebConfig.Session.SessionProviderConfig = "./.tmp" //指定文件存储路径地址,也可以不指定,有默认的地址

    //开启本地文件日志记录
    //_ = logs.SetLogger(logs.AdapterFile, `{"filename":"test.log"}`)
    data := time.Now().Format("20060102") //2006-01-02 15:04:05
    fileName := `{"filename":"./logs/` + data + `/callout.log"}`
    _ = logs.SetLogger(logs.AdapterFile, fileName)
    logs.Async()

    //初始化orm
    utils.InitBeeGoOrm()
    beego.SetStaticPath("/swagger", "swagger")

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        beego.Run()
    }()

    sigChan := make(chan os.Signal, 2)
    //signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGSTOP)
    signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM)
    log.Print("use c-c to exit: \n")
    <-sigChan
    wg.Wait()
    os.Exit(0)

}

异步启动beego.Run(),这样主协程还能处理其他的业务。一个小技巧,仅此而已!

本作品采用《CC 协议》,转载必须注明作者和本文链接
棋布
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!