KisFlow-Golang流式计算案例(一)-快速开始QuickStart 
                                                    
                        
                    
                    
  
                    
                    DownLoad kis-flow source
$go get github.com/aceld/kis-flow《KisFlow开发者文档》
1. KisFlow快速开始(使用配置文件)
案例源代码: kis-flow-usage/2-quick_start_with_config at main · aceld/kis-flow-usage
首先我们创建一个项目,项目的文件路径如下:
项目目录
├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.goFlow
定义当前的Flow,当前的Flow名称为:”CalStuAvgScore”,这是一个计算学生平均分值的数据流。

定义两个Function,Function1为:Calculate,是计算学生平均分的逻辑,Function2为Expand 为打印最终结果。
Config
有关Flow和Function的配置文件如下。
(1) Flow Config
conf/flow-CalStuAvgScore.yml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
 - fname: AvgStuScore
 - fname: PrintStuAvgScore(2) Function1 Config
conf/func-AvgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
 name: 学生学分
 must:
 - stu_id(3) Function2(Slink) Config
conf/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
 name: 学生学分
 must:
 - stu_idMain
接下来是主逻辑,主要分成三步骤:
- 加载配置文件,获取Flow实例;
- 提交数据;
- 运行Flow。
main.go
package main
import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
)
func main() {
    ctx := context.Background()
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }
    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }
    return
}Function1
第一个计算流程的实现逻辑如下, AvgStuScoreIn 为输入数据类型,当前有三个学分,AvgStuScoreOut为输出数据类型,为平均分值。
faas_stu_score_avg.go
package main
import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // 提交结果数据
        _ = flow.CommitRow(out)
    }
    return nil
}Function2
打印的计算逻辑为直接打印数据即可,如下。
faas_stu_score_avg_print.go
package main
import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}
type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }
    return nil
}OutPut
最后运行程序,得到结果如下:
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]</pre>2. KisFlow快速开始(使用原生接口,动态配置)
案例源代码: kis-flow-usage/1-quick_start at main · aceld/kis-flow-usage
项目目录
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Flow
Main
main.go
package main
import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/common"
    "github.com/aceld/kis-flow/config"
    "github.com/aceld/kis-flow/flow"
    "github.com/aceld/kis-flow/kis"
)
func main() {
    ctx := context.Background()
    // Create a new flow configuration
    myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
    // Create new function configuration
    avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
    printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
    // Create a new flow
    flow1 := flow.NewKisFlow(myFlowConfig1)
    // Link functions to the flow
    _ = flow1.Link(avgStuScoreConfig, nil)
    _ = flow1.Link(printStuScoreConfig, nil)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }
    return
}
func init() {
    // Register functions
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}Function1
faas_stu_score_avg.go
package main
import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}
// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // 提交结果数据
        _ = flow.CommitRow(out)
    }
    return nil
}Function2
faas_stu_score_avg_print.go
package main
import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}
type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }
    return nil
}OutPut
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]作者:刘丹冰Aceld github: github.com/aceld
KisFlow开源项目地址:github.com/aceld/kis-flow
本作品采用《CC 协议》,转载必须注明作者和本文链接
 
           刘丹冰Aceld 的个人博客
 刘丹冰Aceld 的个人博客
         
           
           关于 LearnKu
                关于 LearnKu
               
                     
                     
                     粤公网安备 44030502004330号
 粤公网安备 44030502004330号 
 
推荐文章: