Golang框架实战-KisFlow流式计算框架(10)-Flow多副本

9.1 多副本能力

KisFlow如果在执行流体中,需要被多个Goroutine来并发使用,可能需要同一个配置的创建多个Flow来匹配多个并发的计算流,所以Flow需要一个创建副本的能力。本章将实现这部分的能力。

9.1.1 Flow新增接口

首先,给Flow的抽象层新增一个接口Fork(),原型如下:

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error
    // GetCacheData 得到当前Flow的缓存数据
    GetCacheData(key string) interface{}
    // SetCacheData 设置当前Flow的缓存数据
    SetCacheData(key string, value interface{}, Exp time.Duration)
    // GetMetaData 得到当前Flow的临时数据
    GetMetaData(key string) interface{}
    // SetMetaData 设置当前Flow的临时数据
    SetMetaData(key string, value interface{})
    // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
    GetFuncParam(key string) string
    // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
    GetFuncParamAll() config.FParam


    // +++++++++++++++++++++++++
    // Fork 得到Flow的一个副本(深拷贝)
    Fork(ctx context.Context) Flow
}

Fork()会根据一个已有的KisFlow实例,完全克隆一个资源隔离的但是具有相同配置的KisFlow实例。
具体的实现方法如下:

kis-flow/flow/kis_flow.go

// Fork 得到Flow的一个副本(深拷贝)
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {

    config := flow.Conf

    // 通过之前的配置生成一个新的Flow
    newFlow := NewKisFlow(config)

    for _, fp := range flow.Conf.Flows {
        if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
            //当前function没有配置Params
            newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
        } else {
            //当前function有配置Params
            newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
        }
    }

    log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
    log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())

    return newFlow
}

Fork()中,首先会根据flow的配置信息,重新创建一个KisFlow实例,并且将flow所关联的Params等配置信息一同拷贝,最后通过Link()将新建的Function和Flow连接起来。

上述代码为了调试,给Flow新增了一个打印全部FuncParams信息的接口GetFuncParamsAllFuncs(),具体的实现方式如下:

kis-flow/kis/flow.go

type Flow interface {
    // ... ...
    // ... ...

    // GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
    GetFuncParamsAllFuncs() map[string]config.FParam

    // ... ...
}

kis-flow/flow/kis_flow_data.go

// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
    flow.fplock.RLock()
    defer flow.fplock.RUnlock()

    return flow.funcParams
}

9.2 单元测试

下面我们来测试一个Fork能力,单元测试代码如下:

kis-flow/test/kis_fork_test.go

func TestForkFlow(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    flow1Clone1 := flow1.Fork(ctx)

    // 3. 提交原始数据
    _ = flow1Clone1.CommitRow("This is Data1 from Test")
    _ = flow1Clone1.CommitRow("This is Data2 from Test")
    _ = flow1Clone1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1Clone1.Run(ctx); err != nil {
        panic(err)
    }
}

首先我们先创建flowName1的flow实例,然后通过fork()得到flowClone1,然后执行flowClone1的调度流程。

cd到kis-flow/test/下执行:

go test -test.v -test.paniconexit0 -test.run TestForkFlow

结果如下:

=== RUN   TestForkFlow
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
context.Background
=====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]]

context.Background
=====>Flow Fork, newFlow.funcParams = map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]]

context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d180 ThisFunctionId:func-9406285e2fa94bd582dab4a875771a97 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d200 ThisFunctionId:func-de7c4e4175b74a898cb43863e53b3215 PrevFunctionId:func-9406285e2fa94bd582dab4a875771a97 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]

KisFunctionC, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d300 ThisFunctionId:func-614511f5142e4023b80373517f3ea0a7 PrevFunctionId:func-de7c4e4175b74a898cb43863e53b3215 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 2
--- PASS: TestForkFlow (0.03s)
PASS
ok      kis-flow/test   0.996s

通过结果可以看出,flowClone1和flowName1具有相同的配置信息。

9.3 【V0.8】源代码

github.com/aceld/kis-flow/releases...


作者:刘丹冰Aceld github: github.com/aceld
KisFlow开源项目地址:github.com/aceld/kis-flow

本作品采用《CC 协议》,转载必须注明作者和本文链接
刘丹冰Aceld
刘丹冰Aceld
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!