二、项目构建/基础模块(下)
首先我们要先定义 KisFlow 的核心结构体,KisFlow
结构体,通过上述的设计理念,我们得知,KisFlow 表示整个一条数据计算流的结构。其中每次数据在一个 Flow 上,依次执行挂载在当前 Flow 的 Function。
2.3.1 KisFunction 家族#
KisFunction 应该是一个链式调用,所以结构体的基本形态应该是一个链表,通过一次 Function 的调用结束后,默认可以调度到下一个 Function 的节点上。 在 KisFlow 中,一共有 save
,load
, calculate
, extend
, varify
等多种行为的 Funciton,所以这里我们采用上述五种 function 的模板类,方便今后在不同针对不同特征的 function 做更加灵活和功能隔离的拓展和改造。
整体的 KisFunction 的类图设计如下:
2.2.2 抽象层 KisFunction 定义#
在 kis-flow
中创建一个新的目录 function
用来存放 function 的代码。
首先抽象接口编写在 kis/
目录下。
kis-flow/kis/function.go
package kis
import (
"context"
"kis-flow/config"
)
// Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
// 任意个KisFunction可以组合成一个KisFlow
type Function interface {
// Call 执行流式计算逻辑
Call(ctx context.Context, flow Flow) error
// SetConfig 给当前Function实例配置策略
SetConfig(s *config.KisFuncConfig) error
// GetConfig 获取当前Function实例配置策略
GetConfig() *config.KisFuncConfig
// SetFlow 给当前Function实例设置所依赖的Flow实例
SetFlow(f Flow) error
// GetFlow 获取当前Functioin实力所依赖的Flow
GetFlow() Flow
// CreateId 给当前Funciton实力生成一个随机的实例KisID
CreateId()
// GetId 获取当前Function的FID
GetId() string
// GetPrevId 获取当前Function上一个Function节点FID
GetPrevId() string
// GetNextId 获取当前Function下一个Function节点FID
GetNextId() string
// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
Next() Function
// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
Prev() Function
// SetN 设置下一层Function实例
SetN(f Function)
// SetP 设置上一层Function实例
SetP(f Function)
}
2.2.3 KisId 随机唯一实例 ID#
上述提出了一个新的概念 KisId
。 KisID 为 Function
的实例 ID,用于 KisFlow 内部区分不同的实例对象。KisId 和 Function Config 中的 Fid 的区别在于,Fid 用来形容一类 Funcion 策略的 ID,而 KisId 则为在 KisFlow 中 KisFunction 已经实例化的 实例对象 ID 这个 ID 是随机生成且唯一。
创建 kis-flow/id/
目录,且创建 kis_id.go 文件,实现有关 kis_id 生成的算法。
kis-flow/id/kis_id.go
package id
import (
"github.com/google/uuid"
"kis-flow/common"
"strings"
)
// KisID 获取一个中随机实例ID
// 格式为 "prefix1-[prefix2-][prefix3-]ID"
// 如:flow-1234567890
// 如:func-1234567890
// 如: conn-1234567890
// 如: func-1-1234567890
func KisID(prefix ...string) (kisId string) {
idStr := strings.Replace(uuid.New().String(), "-", "", -1)
kisId = formatKisID(idStr, prefix...)
return
}
func formatKisID(idStr string, prefix ...string) string {
var kisId string
for _, fix := range prefix {
kisId += fix
kisId += common.KisIdJoinChar
}
kisId += idStr
return kisId
}
kisId
模块提供 KisID()
方法,这里面依赖了第三方分布式 ID 生成库 github.com/google/uuid
,生成的随机 ID 为一个字符串,且调用者可以提供多个前缀,通过 -
符号进行拼接,得到的随机字符串 ID,如:func-1234567890
针对 KisId 的前缀,提供了一些字符串的枚举,如下:
kis-flow/common/const.go
// KisIdType 用户生成KisId的字符串前缀
const (
KisIdTypeFlow = "flow"
KisIdTypeConnnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
)
2.2.4 BaseFunction 基础父类#
按照设计,我们需要提供一个 BaseFunction
作为 Function
的一个子类,实现一些基础的功能接口。留下 Call()
让具体模式的 KisFunctionX
来重写实现,下面来进行对 BaseFunction 结构的定义。
在 kis-flow/function/
创建 kis_base_funciton.go
文件。
A. 结构定义#
kis-flow/function/kis_base_function.go
package function
import (
"context"
"errors"
"kis-flow/common"
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
)
type BaseFunction struct {
// Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
Id string
Config *config.KisFuncConfig
// flow
Flow kis.Flow //上下文环境KisFlow
// link
N kis.Function //下一个流计算Function
P kis.Function //上一个流计算Function
}
B. 方法实现#
kis-flow/function/kis_base_function.go
// Call
// BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }
func (base *BaseFunction) Next() kis.Function {
return base.N
}
func (base *BaseFunction) Prev() kis.Function {
return base.P
}
func (base *BaseFunction) SetN(f kis.Function) {
base.N = f
}
func (base *BaseFunction) SetP(f kis.Function) {
base.P = f
}
func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
if s == nil {
return errors.New("KisFuncConfig is nil")
}
base.Config = s
return nil
}
func (base *BaseFunction) GetId() string {
return base.Id
}
func (base *BaseFunction) GetPrevId() string {
if base.P == nil {
//Function为首结点
return common.FunctionIdFirstVirtual
}
return base.P.GetId()
}
func (base *BaseFunction) GetNextId() string {
if base.N == nil {
//Function为尾结点
return common.FunctionIdLastVirtual
}
return base.N.GetId()
}
func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
return base.Config
}
func (base *BaseFunction) SetFlow(f kis.Flow) error {
if f == nil {
return errors.New("KisFlow is nil")
}
base.Flow = f
return nil
}
func (base *BaseFunction) GetFlow() kis.Flow {
return base.Flow
}
func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}
这里注意 GetPrevId()
和 GetNextId()
两个方法实现,因为如果当前 Functioin
为双向链表中的第一个节点或者最后一个节点,那么他们的上一个或者下一个是没有节点的,那么 ID 也就不存在,为了在使用中不出现得不到 ID 的情况,我们提供了两个虚拟 FID,做特殊情况的边界处理,定义在 const.go 中。
kis-flow/common/const.go
const (
// FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
FunctionIdLastVirtual = "FunctionIdLastVirtual"
)
2.2.5 KisFunction 的 V/S/L/C/E 等模式类定义#
下面分别实现 V/S/L/C/E 五种不同模式的 KisFunction 子类。这里分别用创建文件来实现。
A. KisFunctionV#
kis-flow/function/kis_function_v.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionV struct {
BaseFunction
}
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionV, flow = %+v\n", flow)
// TODO 调用具体的Function执行方法
return nil
}
B. KisFunctionS#
kis-flow/function/kis_function_s.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionS struct {
BaseFunction
}
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionS, flow = %+v\n", flow)
// TODO 调用具体的Function执行方法
return nil
}
C. KisFunctionL#
kis-flow/function/kis_function_l.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionL struct {
BaseFunction
}
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionL, flow = %+v\n", flow)
// TODO 调用具体的Function执行方法
return nil
}
D. KisFunctionC#
kis-flow/function/kis_function_c.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionC struct {
BaseFunction
}
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunction_C, flow = %+v\n", flow)
// TODO 调用具体的Function执行方法
return nil
}
E. KisFunctionE#
kis-flow/function/kis_function_e.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionE struct {
BaseFunction
}
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionE, flow = %+v\n", flow)
// TODO 调用具体的Function执行方法
return nil
}
2.2.6 创建 KisFunction 实例#
下面提供一个创建具体模式 Function
的方法,这里采用简单工厂方法模式来实现创建对象。
kis-flow/function/kis_base_function.go
func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}
// NewKisFunction 创建一个NsFunction
// flow: 当前所属的flow实例
// s : 当前function的配置策略
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
//工厂生产泛化对象
switch common.KisMode(config.FMode) {
case common.V:
f = new(KisFunctionV)
break
case common.S:
f = new(KisFunctionS)
case common.L:
f = new(KisFunctionL)
case common.C:
f = new(KisFunctionC)
case common.E:
f = new(KisFunctionE)
default:
//LOG ERROR
return nil
}
// 生成随机实例唯一ID
f.CreateId()
//设置基础信息属性
if err := f.SetConfig(config); err != nil {
panic(err)
}
if err := f.SetFlow(flow); err != nil {
panic(err)
}
return f
}
注意 NewKisFunction()
方法返回的是一个抽象的 interface Function
。
还要注意,目前到这里没有实现 Flow
对象,但是 KisFunciton 的创建需要依赖传递一个 Flow
对象,我们现在可以暂时简单创建一个 Flow
对象的构造方法,之后在实现 Flow
章节再完善这部分的代码。
在 kis-filw/kis/
中创建 flow.go
文件。
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/config"
)
type Flow interface {
// TODO
}
在 kis-flow/flow/
下创建 kis_flow.go
文件,实现如下:
kis-flow/flow/kis_flow.go
package flow
import "kis-flow/config"
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
Id string
Name string
// TODO
}
// TODO for test
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// 基础信息
flow.Id = id.KisID(common.KisIdTypeFlow)
flow.Name = conf.FlowName
return flow
}
2.2.7 单元测试 KisFunction 创建实例#
现在来对上述的 KisFunction 实例的创建做一个简单的单元测试,在 kis-flow/test/
创建 kis_function_test.go
文件。
kis-flow/test/kis_function_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/function"
"testing"
)
func TestNewKisFunction(t *testing.T) {
ctx := context.Background()
// 1. 创建一个KisFunction配置实例
source := config.KisSource{
Name: "公众号抖音商城户订单数据",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
// 2. 创建一个 KisFlow 配置实例
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创建一个KisFlow对象
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 创建一个KisFunction对象
func1 := function.NewKisFunction(flow1, myFuncConfig1)
if err := func1.Call(ctx, flow1); err != nil {
t.Errorf("func1.Call() error = %v", err)
}
}
流程很简单,分为四个小步骤:
- 创建一个 KisFunction 配置实例
- 创建一个 KisFlow 配置实例
- 创建一个 KisFlow 对象
- 创建一个 KisFunction 对象
cd 到 kis-flow/test/
目录下执行:
go test -test.v -test.paniconexit0 -test.run TestNewKisFunction
结果如下:
=== RUN TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
--- PASS: TestNewKisFunction (0.00s)
PASS
ok kis-flow/test 1.005s
我们已经调用到了具体的 KisFunciton_C
实例的 Call()
方法。
2.5 【V0.1】 源代码#
github.com/aceld/kis-flow/releases...
作者:刘丹冰 Aceld github: github.com/aceld
KisFlow 开源项目地址:github.com/aceld/kis-flow
推荐文章: