ferry/pkg/service/handle.go
2020-07-15 01:40:56 +08:00

677 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"encoding/json"
"errors"
"ferry/models/base"
"ferry/models/process"
"ferry/models/workOrder"
"ferry/pkg/connection"
"fmt"
"reflect"
"time"
"github.com/jinzhu/gorm"
"github.com/gin-gonic/gin"
)
/*
@Author : lanyulei
@Desc : 处理工单
*/
/*
-- 节点 --
start: 开始节点
userTask: 审批节点
receiveTask: 处理节点
scriptTask: 任务节点
end: 结束节点
-- 网关 --
exclusiveGateway: 排他网关
parallelGateway: 并行网关
inclusiveGateway: 包容网关
*/
type Handle struct {
cirHistoryList []workOrder.CirculationHistory
workOrderId int
updateValue map[string]interface{}
stateValue map[string]interface{}
targetStateValue map[string]interface{}
workOrderData [][]byte
workOrderDetails workOrder.Info
endHistory bool
flowProperties int
circulationValue string
processState ProcessState
tx *gorm.DB
}
// 时间格式化
func fmtDuration(d time.Duration) string {
d = d.Round(time.Minute)
h := d / time.Hour
d -= h * time.Hour
m := d / time.Minute
return fmt.Sprintf("%02d小时 %02d分钟", h, m)
}
// 会签
func (h *Handle) Countersign(c *gin.Context) (err error) {
var (
stateList []map[string]interface{}
stateIdMap map[string]interface{}
currentState map[string]interface{}
cirHistoryCount int
)
err = json.Unmarshal(h.workOrderDetails.State, &stateList)
if err != nil {
return
}
stateIdMap = make(map[string]interface{})
for _, v := range stateList {
stateIdMap[v["id"].(string)] = v["label"]
if v["id"].(string) == h.stateValue["id"].(string) {
currentState = v
}
}
for _, cirHistoryValue := range h.cirHistoryList {
if _, ok := stateIdMap[cirHistoryValue.Source]; !ok {
break
}
for _, processor := range currentState["processor"].([]interface{}) {
if cirHistoryValue.ProcessorId != c.GetInt("userId") &&
cirHistoryValue.Source == currentState["id"].(string) &&
cirHistoryValue.ProcessorId == int(processor.(float64)) {
cirHistoryCount += 1
}
}
}
if cirHistoryCount == len(currentState["processor"].([]interface{}))-1 {
h.endHistory = true
err = h.circulation()
if err != nil {
return
}
}
return
}
// 工单跳转
func (h *Handle) circulation() (err error) {
var (
stateValue []byte
)
stateValue, err = json.Marshal(h.updateValue["state"])
if err != nil {
return
}
err = h.tx.Model(&workOrder.Info{}).
Where("id = ?", h.workOrderId).
Updates(map[string]interface{}{
"state": stateValue,
"related_person": h.updateValue["related_person"],
}).Error
if err != nil {
h.tx.Rollback()
return
}
return
}
// 条件判断
func (h *Handle) ConditionalJudgment(condExpr map[string]interface{}) (result bool, err error) {
var (
condExprOk bool
condExprValue interface{}
)
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
err = errors.New(e)
case error:
err = e
default:
err = errors.New("未知错误")
}
return
}
}()
for _, data := range h.workOrderData {
var formData map[string]interface{}
err = json.Unmarshal(data, &formData)
if err != nil {
return
}
if condExprValue, condExprOk = formData[condExpr["key"].(string)]; condExprOk {
break
}
}
if condExprValue == nil {
err = errors.New("未查询到对应的表单数据。")
return
}
// todo 待优化
switch reflect.TypeOf(condExprValue).String() {
case "string":
switch condExpr["sign"] {
case "==":
if condExprValue.(string) == condExpr["value"].(string) {
result = true
}
case "!=":
if condExprValue.(string) != condExpr["value"].(string) {
result = true
}
case ">":
if condExprValue.(string) > condExpr["value"].(string) {
result = true
}
case ">=":
if condExprValue.(string) >= condExpr["value"].(string) {
result = true
}
case "<":
if condExprValue.(string) < condExpr["value"].(string) {
result = true
}
case "<=":
if condExprValue.(string) <= condExpr["value"].(string) {
result = true
}
default:
err = errors.New("目前仅支持6种常规判断类型包括等于、不等于、大于、大于等于、小于、小于等于")
}
case "float64":
switch condExpr["sign"] {
case "==":
if condExprValue.(float64) == condExpr["value"].(float64) {
result = true
}
case "!=":
if condExprValue.(float64) != condExpr["value"].(float64) {
result = true
}
case ">":
if condExprValue.(float64) > condExpr["value"].(float64) {
result = true
}
case ">=":
if condExprValue.(float64) >= condExpr["value"].(float64) {
result = true
}
case "<":
if condExprValue.(float64) < condExpr["value"].(float64) {
result = true
}
case "<=":
if condExprValue.(float64) <= condExpr["value"].(float64) {
result = true
}
default:
err = errors.New("目前仅支持6种常规判断类型包括等于、不等于、大于、大于等于、小于、小于等于")
}
default:
err = errors.New("条件判断目前仅支持字符串、整型。")
}
return
}
// 并行网关,确认其他节点是否完成
func (h *Handle) completeAllParallel(c *gin.Context, target string) (statusOk bool, err error) {
var (
stateList []map[string]interface{}
)
err = json.Unmarshal(h.workOrderDetails.State, &stateList)
if err != nil {
err = fmt.Errorf("反序列化失败,%v", err.Error())
return
}
continueHistoryTag:
for _, v := range h.cirHistoryList {
status := false
for i, s := range stateList {
if v.Source == s["id"].(string) && v.Target == target {
status = true
stateList = append(stateList[:i], stateList[i+1:]...)
continue continueHistoryTag
}
}
if !status {
break
}
}
if len(stateList) == 1 && stateList[0]["id"].(string) == h.stateValue["id"] {
statusOk = true
}
return
}
func (h *Handle) commonProcessing(c *gin.Context) (err error) {
// 如果是拒绝的流转则直接跳转
if h.flowProperties == 0 {
err = h.circulation()
if err != nil {
err = fmt.Errorf("工单跳转失败,%v", err.Error())
}
return
}
// 会签
if h.stateValue["assignValue"] != nil && len(h.stateValue["assignValue"].([]interface{})) > 1 {
if isCounterSign, ok := h.stateValue["isCounterSign"]; ok {
if isCounterSign.(bool) {
h.endHistory = false
err = h.Countersign(c)
if err != nil {
return
}
} else {
err = h.circulation()
if err != nil {
return
}
}
} else {
err = h.circulation()
if err != nil {
return
}
}
} else {
err = h.circulation()
if err != nil {
return
}
}
return
}
func (h *Handle) HandleWorkOrder(
c *gin.Context,
workOrderId int,
tasks []string,
targetState string,
sourceState string,
circulationValue string,
flowProperties int,
) (err error) {
h.workOrderId = workOrderId
h.flowProperties = flowProperties
h.endHistory = true
var (
execTasks []string
relatedPersonList []int
cirHistoryValue []workOrder.CirculationHistory
cirHistoryData workOrder.CirculationHistory
costDurationValue string
sourceEdges []map[string]interface{}
targetEdges []map[string]interface{}
condExprStatus bool
relatedPersonValue []byte
parallelStatusOk bool
processInfo process.Info
)
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
err = errors.New(e)
case error:
err = e
default:
err = errors.New("未知错误")
}
return
}
}()
// 获取工单信息
err = connection.DB.Self.Model(&workOrder.Info{}).Where("id = ?", workOrderId).Find(&h.workOrderDetails).Error
if err != nil {
return
}
// 获取流程信息
err = connection.DB.Self.Model(&process.Info{}).Where("id = ?", h.workOrderDetails.Process).Find(&processInfo).Error
if err != nil {
return
}
err = json.Unmarshal(processInfo.Structure, &h.processState.Structure)
if err != nil {
return
}
// 获取当前节点
h.stateValue, err = h.processState.GetNode(sourceState)
if err != nil {
return
}
// 目标状态
h.targetStateValue, err = h.processState.GetNode(targetState)
if err != nil {
return
}
// 获取工单数据
err = connection.DB.Self.Model(&workOrder.TplData{}).
Where("work_order = ?", workOrderId).
Pluck("form_data", &h.workOrderData).Error
if err != nil {
return
}
// 根据处理人查询出需要会签的条数
err = connection.DB.Self.Model(&workOrder.CirculationHistory{}).
Where("work_order = ?", workOrderId).
Order("id desc").
Find(&h.cirHistoryList).Error
if err != nil {
return
}
err = json.Unmarshal(h.workOrderDetails.RelatedPerson, &relatedPersonList)
if err != nil {
return
}
relatedPersonStatus := false
for _, r := range relatedPersonList {
if r == c.GetInt("userId") {
relatedPersonStatus = true
break
}
}
if !relatedPersonStatus {
relatedPersonList = append(relatedPersonList, c.GetInt("userId"))
}
relatedPersonValue, err = json.Marshal(relatedPersonList)
if err != nil {
return
}
h.updateValue = map[string]interface{}{
"related_person": relatedPersonValue,
}
// 开启事务
h.tx = connection.DB.Self.Begin()
stateValue := map[string]interface{}{
"label": h.targetStateValue["label"].(string),
"id": h.targetStateValue["id"].(string),
}
switch h.targetStateValue["clazz"] {
// 排他网关
case "exclusiveGateway":
sourceEdges, err = h.processState.GetEdge(h.targetStateValue["id"].(string), "source")
if err != nil {
return
}
breakTag:
for _, edge := range sourceEdges {
edgeCondExpr := make([]map[string]interface{}, 0)
err = json.Unmarshal([]byte(edge["conditionExpression"].(string)), &edgeCondExpr)
if err != nil {
return
}
for _, condExpr := range edgeCondExpr {
// 条件判断
condExprStatus, err = h.ConditionalJudgment(condExpr)
if err != nil {
return
}
if condExprStatus {
// 进行节点跳转
h.targetStateValue, err = h.processState.GetNode(edge["target"].(string))
if err != nil {
return
}
if h.targetStateValue["clazz"] == "userTask" || h.targetStateValue["clazz"] == "receiveTask" {
if h.targetStateValue["assignValue"] == nil || h.targetStateValue["assignType"] == "" {
err = errors.New("处理人不能为空")
return
}
}
h.updateValue["state"] = []map[string]interface{}{{
"id": h.targetStateValue["id"].(string),
"label": h.targetStateValue["label"],
"processor": h.targetStateValue["assignValue"],
"process_method": h.targetStateValue["assignType"],
}}
err = h.commonProcessing(c)
if err != nil {
err = fmt.Errorf("流程流程跳转失败,%v", err.Error())
return
}
break breakTag
}
}
}
if !condExprStatus {
err = errors.New("所有流转均不符合条件,请确认。")
return
}
// 并行/聚合网关
case "parallelGateway":
// 入口,判断
sourceEdges, err = h.processState.GetEdge(h.targetStateValue["id"].(string), "source")
if err != nil {
err = fmt.Errorf("查询流转信息失败,%v", err.Error())
return
}
targetEdges, err = h.processState.GetEdge(h.targetStateValue["id"].(string), "target")
if err != nil {
err = fmt.Errorf("查询流转信息失败,%v", err.Error())
return
}
if len(sourceEdges) > 0 {
h.targetStateValue, err = h.processState.GetNode(sourceEdges[0]["target"].(string))
if err != nil {
return
}
} else {
err = errors.New("并行网关流程不正确")
return
}
if len(sourceEdges) > 1 && len(targetEdges) == 1 {
// 入口
h.updateValue["state"] = make([]map[string]interface{}, 0)
for _, edge := range sourceEdges {
targetStateValue, err := h.processState.GetNode(edge["target"].(string))
if err != nil {
return err
}
h.updateValue["state"] = append(h.updateValue["state"].([]map[string]interface{}), map[string]interface{}{
"id": edge["target"].(string),
"label": targetStateValue["label"],
"processor": targetStateValue["assignValue"],
"process_method": targetStateValue["assignType"],
})
}
err = h.circulation()
if err != nil {
err = fmt.Errorf("工单跳转失败,%v", err.Error())
return
}
} else if len(sourceEdges) == 1 && len(targetEdges) > 1 {
// 出口
parallelStatusOk, err = h.completeAllParallel(c, sourceEdges[0]["target"].(string))
if err != nil {
err = fmt.Errorf("并行检测失败,%v", err.Error())
return
}
if parallelStatusOk {
h.endHistory = true
h.updateValue["state"] = []map[string]interface{}{{
"id": h.targetStateValue["id"].(string),
"label": h.targetStateValue["label"],
"processor": h.targetStateValue["assignValue"],
"process_method": h.targetStateValue["assignType"],
}}
err = h.circulation()
if err != nil {
err = fmt.Errorf("工单跳转失败,%v", err.Error())
return
}
} else {
h.endHistory = false
}
} else {
err = errors.New("并行网关流程不正确")
return
}
// 包容网关
case "inclusiveGateway":
fmt.Println("inclusiveGateway")
return
case "start":
stateValue["processor"] = []int{h.workOrderDetails.Creator}
stateValue["process_method"] = "person"
h.updateValue["state"] = []interface{}{stateValue}
err = h.circulation()
if err != nil {
return
}
case "userTask":
stateValue["processor"] = h.targetStateValue["assignValue"].([]interface{})
stateValue["process_method"] = h.targetStateValue["assignType"].(string)
h.updateValue["state"] = []interface{}{stateValue}
err = h.commonProcessing(c)
if err != nil {
return
}
case "receiveTask":
stateValue["processor"] = h.targetStateValue["assignValue"].([]interface{})
stateValue["process_method"] = h.targetStateValue["assignType"].(string)
h.updateValue["state"] = []interface{}{stateValue}
err = h.commonProcessing(c)
if err != nil {
return
}
case "scriptTask":
stateValue["processor"] = []int{}
stateValue["process_method"] = ""
h.updateValue["state"] = []interface{}{stateValue}
case "end":
stateValue["processor"] = []int{}
stateValue["process_method"] = ""
h.updateValue["state"] = []interface{}{stateValue}
err = h.circulation()
if err != nil {
h.tx.Rollback()
return
}
err = h.tx.Model(&workOrder.Info{}).
Where("id = ?", h.workOrderId).
Update("is_end", 1).Error
if err != nil {
h.tx.Rollback()
return
}
}
// 流转历史写入
err = connection.DB.Self.Model(&cirHistoryValue).
Where("work_order = ?", workOrderId).
Find(&cirHistoryValue).
Order("create_time desc").Error
if err != nil {
h.tx.Rollback()
return
}
for _, t := range cirHistoryValue {
if t.Source != h.stateValue["id"] {
costDuration := time.Since(t.CreatedAt.Time)
costDurationValue = fmtDuration(costDuration)
}
}
cirHistoryData = workOrder.CirculationHistory{
Model: base.Model{},
Title: h.workOrderDetails.Title,
WorkOrder: h.workOrderDetails.Id,
State: h.stateValue["label"].(string),
Source: h.stateValue["id"].(string),
Target: h.targetStateValue["id"].(string),
Circulation: circulationValue,
Processor: c.GetString("nickname"),
ProcessorId: c.GetInt("userId"),
CostDuration: costDurationValue,
}
err = h.tx.Create(&cirHistoryData).Error
if err != nil {
h.tx.Rollback()
return
}
// 判断目标是否是结束节点
if h.targetStateValue["clazz"] == "end" && h.endHistory == true {
err = h.tx.Create(&workOrder.CirculationHistory{
Model: base.Model{},
Title: h.workOrderDetails.Title,
WorkOrder: h.workOrderDetails.Id,
State: h.targetStateValue["label"].(string),
Source: h.targetStateValue["id"].(string),
Processor: c.GetString("nickname"),
ProcessorId: c.GetInt("userId"),
Circulation: "结束",
}).Error
if err != nil {
h.tx.Rollback()
return
}
}
h.tx.Commit() // 提交事务
// 执行流程公共任务及节点任务
if h.stateValue["task"] != nil {
for _, task := range h.stateValue["task"].([]interface{}) {
tasks = append(tasks, task.(string))
}
}
continueTag:
for _, task := range tasks {
for _, t := range execTasks {
if t == task {
continue continueTag
}
}
execTasks = append(execTasks, task)
}
go ExecTask(execTasks)
return
}