Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

7.1 Action Abort(终止流程)#

KisFlow Action 是指在执行 Function 的时候,同时可以控制 Flow 的调度逻辑,KisFlow 提供一些 Action 动作让开发者做选择,本节先介绍最简单的 Action 动作,Abort(终止当前 Flow)。

我们最终的 Abort 的使用形式如下:

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)  // 终止Flow
}

AbortFuncHandler() 是一个 Function 的业务回调方法,是由开发者自定义的,在执行完当前 Funciton 之后,正常的情况是继续执行下一个 Funciton,但是如果传递 flow.Next(kis.ActionAbort) 作为当前 Funciton 的返回值,那么则不会执行到下一个 Funciton, 而是直接终止当前 Flow 的调度计算流。

下面我们先来实现 KisFlow 的 Abort Action 动作模式。

7.1.1 Abort 接口定义#

首先,先对 Flow 的 Abort () 接口做定义。

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig

    //  --- KisFlow Action ---
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error
}

这里面提供一个接口 Next(acts ...ActionFunc) error,其中参数是一个可变参数,类型为 ActionFunc,这个是我们给 KisFlow 定义的 Action 相关的方法。有关 Action 的定义模块如下:

7.1.2 Action 模块定义#

Action 是用来在 Flow 执行过程中,通过 Function 来控制 Flow 执行特殊动作的行为配置模块,包括上面的 Abort 行为,Abort 也属于其中一个 Action。Action 的模块定义如下,在 kis-flow/kis/ 下创建 action.go 文件,实现:

kis-flow/kis/action.go

package kis

// Action KisFlow执行流程Actions
type Action struct {
    // Abort 终止Flow的执行
    Abort bool
}

// ActionFunc KisFlow Functional Option 类型
type ActionFunc func(ops *Action)

// LoadActions 加载Actions,依次执行ActionFunc操作函数
func LoadActions(acts []ActionFunc) Action {
    action := Action{}

    if acts == nil {
        return action
    }

    for _, act := range acts {
        act(&action)
    }

    return action
}

// ActionAbort 终止Flow的执行
func ActionAbort(action *Action) {
    action.Abort = true
}

首先,现在 Action 只有 Abort 一个行为,我们用 bool 类型来表示 Abort 是否为终止,true 则为需要终止 flow 的调用。
其次,type ActionFunc func(ops *Action) 这个函数原型为一个函数类型,函数的形参是传递进来一个 Action{} 指针,而 func ActionAbort(action *Action) 则是它的一个具体的函数,ActionAbort () 的方法的目的就是将 Action 的 Abort 成员设置为 true。

最后看 func LoadActions(acts []ActionFunc) Action 方法。这个形参是一个 ActionFunc 函数数组,LoadActions() 则是创建一个新的 Action {} ,然后依次执行 []ActionFunc 的函数来改变 Aciton {} 的成员,最终将新的 Action {} 返回上层。

7.1.3 Next 方法实现#

接下来,我们需要给 KisFlow 模块实现这个接口,首先需要给 KisFlow 添加一个 Action {} 成员,表示每次执行完 Function 之后所携带的动作。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据

    // +++++++++++++++++++++

    // KisFlow Action
    action kis.Action        // 当前Flow所携带的Action动作
}

然后实现 KisFlow 的 Next () 接口,如下:

kis-flow/flow/kis_flow.go

// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {

    // 加载Function FaaS 传递的 Action动作
    flow.action = kis.LoadActions(acts)

    return nil
}

每次开发者在执行 Function 的自定义业务回调中,最后会调用 flow.Next() 来传递 Action,所以 Next(acts ...kis.ActionFunc) error 就是讲传递的 Action 属性加载进来并且在 flow.action 保存。

7.1.4 Abort 控制 Flow 流程#

现在有个 Abort 来控制 Flow 流,那么我们需要给 KisFlow 添加一个成员来表示这个状态

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据
    action kis.Action        // 当前Flow所携带的Action动作

    // +++++++++
    abort  bool              // 是否中断Flow
}

在每次执行到 flow.Run () 方法时,需要重置 abort 变量,并且在循环调度的时候加上对 flow.abort 的判断。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    // +++++++++
    // 重置 abort
    flow.abort = false  //  每次进入调度,要重置abort状态

    // ... ...

    // ... ...

    //流式链式调用
    for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度

        // ... ...
        // ... ...

        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // ... ...

            fn = fn.Next()
        }
    }

    return nil

这样在每次 Call () 调度到 Funciton 的自定方法时,如果 return flow.Next(ActionAbort) 就会对 flow 的 Action 状态进行改变,从而就控制了 flow 的流程终止。最后就是将 Action 的 Abort 状态传递给 KisFlow 的 Abort 状态。

既然有了 Abort 状态,那么我们可以通过给 Flow 执行过程中添加一个设定,如果当前的 Function 没有提交本层的结果数据,也就是 flow.buffer 为空,那么将不会进入下一层,在本层直接结束退出 Flow 的 Run () 调用。

kis-flow/flow/kis_flow_data.go

//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
    if len(flow.buffer) == 0 {
        // ++++++++++++
        flow.abort = true
        return nil
    }

    // ... ...
    // ... ...

    return nil

7.1.5 Action 捕获及处理#

接下来来实现一个专门处理 Action 动作的方法,定义在 kis-flow/flow/kis_flow_action.go 文件中,如下:

kis-flow/flow/kis_flow_action.go

package flow

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/kis"
)

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    if err := flow.commitCurData(ctx); err != nil {
        return nil, err
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

然后稍微改进下 KisFlow的Run() 流程,将 dealAction() 方法嵌入进去。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        //flow被配置关闭
        return nil
    }

    // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // 提交数据流原始数据
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    //流式链式调用
    for fn != nil && flow.abort == false {

        // flow记录当前执行到的Function 标记
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // 得到当前Function要处理与的源数据
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // +++++++++++++++++++++++++++++++
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }
            // +++++++++++++++++++++++++++++++
        }
    }

    return nil
}

7.1.6 Action Abort 单元测试#

首先我们新建一个 Function 业务,配置文件如下:

kis-flow/test/load_conf/func/func-AbortFunc.yml

kistype: func
fname: abortFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

当前的 Funciton 的名称为 abortFunc,然后实现其 FaaS 函数,如下:

kis-flow/test/faas/faas_abort.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)
}

这个 Function 就会最终调用 flow.Next(kis.ActionAbort) 来终止 Flow,接下来我们新建一个 Flow,将上面的 Function 作为中间的 Function,看检测是否会终止之后的 Function 被执行。
新建的 flow 的配置如下:

kis-flow/test/load_conf/flow/flow-FlowName2.yml

kistype: flow
status: 1
flow_name: flowName2
flows:
  - fname: funcName1
  - fname: abortFunc
  - fname: funcName3

当前 Flow 的名称为 flowName2,当前的 Flow 有三个 Function,其中 funcNam1 和 funcName2 我们之前都已经定义好了,abortFunc 是我们新建的,并且在中间。如果 abort 功能满足,则 funcName3 将不会被调度。

接下来实现单元测试用例。

kis-flow/test/kis_action_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestActionAbort(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName2")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

cd 到 kis-flow/test/ 目录下执行如下指令:

go test -test.v -test.paniconexit0 -test.run  TestActionAbort

结果如下:

=== RUN   TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

Add FlowRouter FlowName=flowName2

context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok      kis-flow/test   0.487s

通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了 Flow 的 Run () 方法。

7.2 Action DataReuse(复用上层数据)#

Action DataReuse 为服用上层数据,含义为,当前的执行 Function 提交到下一层的结果将不被使用,而是直接将当前 Function 的上一层结果数据,复用到下一层,作为下一层 Funciton 的数据源。

下面来实现 Action DataReuse 功能。

7.2.1 DataReuse Action 添加#

在 Action 中添加 DataReuse 成员,是一个 bool 类型。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // +++++++++++++
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // Abort 终止Flow的执行
    Abort bool
}


// ActionDataReuse Next复用上层Function数据Option
func ActionDataReuse(act *Action) {
    act.DataReuse = true
}

然后提供一个 ActionFunc,命名为:ActionDataReuse,实现中为改变 DataReuse 状态为 true。

7.2.2 复用上层数据到下层#

这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下:

kis-flow/flow/kis_flow_data.go

// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {

    // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
    if len(flow.data[flow.PrevFunctionId]) == 0 {
        flow.abort = true
        return nil
    }

    // 本层结果数据等于上层结果数据(复用上层结果数据到本层)
    flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]

    // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

逻辑很简单,与 commitCurData() 不同的是,commitCurData() 为将 flow.buffer 的数据提交到 flow.data[flow.ThisFunctionId] 中,而 commitReuseData() 为将上一层的结果数据提交到 flow.data[flow.ThisFunctionId] 中。

7.2.3 处理 DataReuse Action 动作#

然后在 dealAction() 方法中添加对 Action DataReuse 的动作捕获,如下:

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // ++++++++++++++++
    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }


    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

7.2.4 单元测试#

下面来针对 DataReuse 做单元测试,首先创建一个名字为 dataReuseFunc 的 Funciton,先创建配置文件。

kis-flow/test/load_conf/func/func-dataReuseFunc.yml

kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

同时新建一个 Flow 流,名称为 flowName3,配置如下:

kis-flow/test/load_conf/flow/func-FlowName3.yml

kistype: flow
status: 1
flow_name: flowName3
flows:
  - fname: funcName1
  - fname: dataReuseFunc
  - fname: funcName3

针对 dataReuseFunc 的 Function 的逻辑业务,如下:

kis-flow/test/faas/faas_data_reuse.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call DataReuseFuncHandler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交结果数据
        _ = flow.CommitRow(resultStr)
    }

    return flow.Next(kis.ActionDataReuse)
}

最后实现测试用例,如下:

kis-flow/test/kis_action_test.go

func TestActionDataReuse(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName3")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd 到 kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse

结果是:

=== RUN   TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok      kis-flow/test   0.523s

通过结果可以看出,在最后的 funcName3Handler 中得到的数据是 funcName1 传递下来的数据,中间的 ReuseFunction 将上层的数据复用到了下一层,变成了 FuncName3 的数据源。

7.3 Action ForceEntryNext(强制进入下一层)#

7.3.1 ForceEntryNext Action 属性#

目前的 KisFlow 为,如果当前的 Function 没有 commit 数据(本层的结果数据),那么当前的 Function 结束后,将不会继续调度下一层 Function。 但是有的 Flow 的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过 ForceEntryNext 这个动作来触发。
首先我们在 Action 中新增一个 ForceEntryNext 属性。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // Abort 终止Flow的执行
    Abort bool
}

// ActionForceEntryNext 强制进入下一层
func ActionForceEntryNext(act *Action) {
    act.ForceEntryNext = true
}

且提供配置函数 ActionForceEntryNext() 来修改这个属性状态。

7.3.2 捕获 Action#

在捕获 Action 的 dealAction() 方法中,加上对这个状态的判断,如果被设置,则需要将 flow.Abort 状态改成 false,flow 将继续执行下一层。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ++++++++++++++++++++++++++++
    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

这里有一个细节,我们需要调用一个方法 commitVoidData(),即提交空数据,原因是,如果不提交空数据,那么 flow.buffer 依然为空,那么不会执行数据的提交动作,那么会导致 flow.data[flow.ThisFunctionId] 这条不存在,也就是 key 不存在,那么再执行到 flow.getCurData() 会出现找不到 key 的异常而 panic。所以这里需要提交一个空的数据到 flow.data[flow.ThisFunctionId] 中。
具体的 commitVoidData() 实现如下:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitVoidData(ctx context.Context) error {
    if len(flow.buffer) != 0 {
        return nil
    }

    // 制作空数据
    batch := make(common.KisRowArr, 0)

    // 将本层计算的缓冲数据提交到本层结果数据中
    flow.data[flow.ThisFunctionId] = batch

    log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

7.3.3 单元测试,不设置 ForceEntryNext#

首先,创建一个 noResultFunc 的 Function 配置,且实现相关的回调业务函数。

kis-flow/test/load_conf/func/func-NoResultFunc.yml

kistype: func
fname: noResultFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next()
}

这里面在 Function 的最后,只调用 flow.Next() 不传递任何 Action 动作。
然后新建一个 FlowName4,配置如下:

kis-flow/test/load_conf/flow-FlowName4.yml

kistype: flow
status: 1
flow_name: flowName4
flows:
  - fname: funcName1
  - fname: noResultFunc
  - fname: funcName3

最后我们编写单元测试用例代码,将 noResultFunc 放在中间的部分。

kis-flow/test/kis_action_test.go

func TestActionForceEntry(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName4")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd 到 kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok      kis-flow/test   0.958s

因为 noResultFunc 不会生成任何的结果数据,所以下一层 Function 将不会被执行,最后只执行到

---> Call NoResultFuncHandler ----

7.3.4 单元测试,设置 ForceEntryNext#

下面我们将 Action 为 ForceEntryNext 加上,在 NoResultFuncHandler() 中,加上 flow.Next(kis.ActionForceEntryNext),如下:

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionForceEntryNext)
}

cd 到 kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok      kis-flow/test   0.348s

会发现,Function 第三层 funcName3Handler 被执行到,但是没有任何的数据。

7.4 Action JumpFunc(流程跳转)#

接下来,来实现 JumpFunc Action,JumpFunc 是可以在当前 Flow 中任意跳转到指定的 FuncName 继续执行(前提是跳转的 FuncName 当当前 Flow 中存在)

注意:JumpFunc 容易出现无限循环流,所以在业务的设计要慎用。

7.4.1 Action 添加 JumpFunc#

首先在 Action 添加一个 JumpFunc 属性,注意,JunpFunc 不是一个 bool 状态,而是一个 string 字符串,表示具体要跳转的 FunctionName 名称。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // ++++++++++
    // JumpFunc 跳转到指定Function继续执行
    JumpFunc string

    // Abort 终止Flow的执行
    Abort bool
}


// ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc
// (注意:容易出现Flow循环调用,导致死循环)
func ActionJumpFunc(funcName string) ActionFunc {
    return func(act *Action) {
        act.JumpFunc = funcName
    }
}

然后提供一个修改 JumpFunc 的配置方法 ActionJumpFunc(), 注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改 Action 中的 JumpFunc 属性。

7.4.2 捕获 Action#

接下来,我们来捕获 JumpFunc 的 Action 动作,判断 JumpFunc 是否为空字符串即可。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // ++++++++++++++++++++++++++++++++
    // JumpFunc Action
    if flow.action.JumpFunc != "" {
        if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
            //当前JumpFunc不在flow中
            return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
        }

        jumpFunction := flow.Funcs[flow.action.JumpFunc]
        // 更新上层Function
        flow.PrevFunctionId = jumpFunction.GetPrevId()
        fn = jumpFunction

        // 如果设置跳跃,强制跳跃
        flow.abort = false
    // ++++++++++++++++++++++++++++++++

    } else {

        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    }

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

如果设置 JumpFunc,则需要修改下次执行的 fn 指针,否则则正常寻址 fn.Next()

7.4.3 单元测试#

接下来来定义一个跳转 Action 的 Function,配置,如下:

kis-flow/test/load_conf/func/func-jumpFunc.yml

kistype: func
fname: jumpFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

并且实现相关的 Funciton 业务逻辑,如下:

kis-flow/test/faas/faas_jump.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call JumpFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionJumpFunc("funcName1"))
}

这里,最后通过 flow.Next(kis.ActionJumpFunc("funcName1")) 来指定跳转到 funcName1 的 Function。

新建一个 Flow,为 FlowName5,配置如下:

kis-flow/test/load_conf/flow/flow-FlowName5.yml

kistype: flow
status: 1
flow_name: flowName5
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: jumpFunc

之后,来实现单元测试用例代码,如下:

kis-flow/test/kis_action_test.go

func TestActionJumpFunc(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName5")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd 到 kis-flow/test/ 执行:

go test -test.v -test.paniconexit0 -test.run  TestActionJumpFunc

结果如下:

... 
...

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----

 ... 
 ...

发现我们会无限循环的调度 Flow,这样说明我们的 JumpFunc Action 已经生效。

7.5【V0.6】源代码#

github.com/aceld/kis-flow/releases...


作者:刘丹冰 Aceld github: github.com/aceld
KisFlow 开源项目地址:github.com/aceld/kis-flow

本作品采用《CC 协议》,转载必须注明作者和本文链接
刘丹冰 Aceld
刘丹冰Aceld