commit
750861f380
@ -31,7 +31,12 @@
|
||||
|
||||
演示demo: [http://fdevops.com:8001/#/dashboard](http://fdevops.com:8001/#/dashboard)
|
||||
|
||||
账号密码:admin/123456
|
||||
```
|
||||
账号:admin
|
||||
密码:123456
|
||||
|
||||
演示demo登陆需要取消ldap验证,就是登陆页面取消ldap的打勾。
|
||||
```
|
||||
|
||||
文档: [https://www.fdevops.com/docs/ferry](https://www.fdevops.com/docs/ferry-tutorial-document/introduction)
|
||||
|
||||
|
@ -109,6 +109,8 @@ func UpdateProcess(c *gin.Context) {
|
||||
"classify": processValue.Classify,
|
||||
"task": processValue.Task,
|
||||
"notice": processValue.Notice,
|
||||
"icon": processValue.Icon,
|
||||
"remarks": processValue.Remarks,
|
||||
}).Error
|
||||
if err != nil {
|
||||
app.Error(c, -1, err, fmt.Sprintf("更新流程信息失败,%v", err.Error()))
|
||||
@ -193,7 +195,7 @@ func ClassifyProcessList(c *gin.Context) {
|
||||
for _, item := range classifyList {
|
||||
err = orm.Eloquent.Model(&process2.Info{}).
|
||||
Where("classify = ? and name LIKE ?", item.Id, fmt.Sprintf("%%%v%%", processName)).
|
||||
Select("id, create_time, update_time, name").
|
||||
Select("id, create_time, update_time, name, icon, remarks").
|
||||
Find(&item.ProcessList).Error
|
||||
if err != nil {
|
||||
app.Error(c, -1, err, fmt.Sprintf("获取流程失败,%v", err.Error()))
|
||||
|
@ -221,8 +221,11 @@ func TaskDetails(c *gin.Context) {
|
||||
)
|
||||
|
||||
fileName = c.DefaultQuery("file_name", "")
|
||||
if fileName == "" {
|
||||
app.Error(c, -1, errors.New("参数不正确,请确认file_name参数是否存在"), "")
|
||||
if fileName == "" ||
|
||||
strings.HasPrefix(fileName, ".") ||
|
||||
strings.HasPrefix(fileName, "/") ||
|
||||
strings.HasPrefix(fileName, "\\") {
|
||||
app.Error(c, -1, errors.New("file_name参数不正确,请确认"), "")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -71,6 +71,12 @@ func CreateWorkOrder(c *gin.Context) {
|
||||
Tasks json.RawMessage `json:"tasks"`
|
||||
Source string `json:"source"`
|
||||
}
|
||||
paramsValue struct {
|
||||
Id int `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Priority int `json:"priority"`
|
||||
FormData []interface{} `json:"form_data"`
|
||||
}
|
||||
)
|
||||
|
||||
err := c.ShouldBind(&workOrderValue)
|
||||
@ -248,7 +254,18 @@ func CreateWorkOrder(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
if len(taskList) > 0 {
|
||||
go service.ExecTask(taskList)
|
||||
paramsValue.Id = workOrderInfo.Id
|
||||
paramsValue.Title = workOrderInfo.Title
|
||||
paramsValue.Priority = workOrderInfo.Priority
|
||||
paramsValue.FormData = workOrderValue.Tpls["form_data"]
|
||||
|
||||
params, err := json.Marshal(paramsValue)
|
||||
if err != nil {
|
||||
app.Error(c, -1, err, "")
|
||||
return
|
||||
}
|
||||
|
||||
go service.ExecTask(taskList, string(params))
|
||||
}
|
||||
|
||||
app.OK(c, "", "成功提交工单申请")
|
||||
|
@ -33,6 +33,7 @@ func UploadFile(c *gin.Context) {
|
||||
fileType string
|
||||
saveFilePath string
|
||||
err error
|
||||
protocol string = "http"
|
||||
)
|
||||
tag, _ = c.GetPostForm("type")
|
||||
fileType = c.DefaultQuery("file_type", "images")
|
||||
@ -42,13 +43,17 @@ func UploadFile(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if strings.HasPrefix(c.Request.Header.Get("Origin"), "https") {
|
||||
protocol = "https"
|
||||
}
|
||||
|
||||
if viper.GetBool("settings.domain.getHost") {
|
||||
urlPrefix = fmt.Sprintf("http://%s/", c.Request.Host)
|
||||
urlPrefix = fmt.Sprintf("%s://%s/", protocol, c.Request.Host)
|
||||
} else {
|
||||
if strings.HasSuffix(viper.GetString("settings.domain.url"), "/") {
|
||||
urlPrefix = viper.GetString("settings.domain.url")
|
||||
} else {
|
||||
urlPrefix = fmt.Sprintf("http://%s/", viper.GetString("settings.domain.url"))
|
||||
urlPrefix = fmt.Sprintf("%s://%s/", protocol, viper.GetString("settings.domain.url"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,8 +53,7 @@ settings:
|
||||
maxsize: 10240
|
||||
path: ./logs/ferry.log
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
port: 6379
|
||||
url: redis://ferry123456@127.0.0.1:6379
|
||||
ssl:
|
||||
key: keystring
|
||||
pem: temp/pem.pem
|
||||
|
@ -53,8 +53,7 @@ settings:
|
||||
maxsize: 10240
|
||||
path: ./logs/ferry.log
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
port: 6379
|
||||
url: redis://ferry123456@127.0.0.1:6379
|
||||
ssl:
|
||||
key: keystring
|
||||
pem: temp/pem.pem
|
||||
|
@ -13,13 +13,15 @@ import (
|
||||
type Info struct {
|
||||
base.Model
|
||||
Name string `gorm:"column:name; type:varchar(128)" json:"name" form:"name"` // 流程名称
|
||||
Icon string `gorm:"column:icon; type:varchar(128)" json:"icon" form:"icon"` // 流程标签
|
||||
Structure json.RawMessage `gorm:"column:structure; type:json" json:"structure" form:"structure"` // 流程结构
|
||||
Classify int `gorm:"column:classify; type:int(11)" json:"classify" form:"classify"` // 分类ID
|
||||
Tpls json.RawMessage `gorm:"column:tpls; type:json" json:"tpls" form:"tpls"` // 模版
|
||||
Task json.RawMessage `gorm:"column:task; type:json" json:"task" form:"task"` // 任务ID, array, 可执行多个任务,可以当成通知任务,每个节点都会去执行
|
||||
SubmitCount int `gorm:"column:submit_count; type:int(11); default:0" json:"submit_count" form:"submit_count"` // 提交统计
|
||||
Creator int `gorm:"column:creator; type:int(11)" json:"creator" form:"creator"` // 创建者
|
||||
Notice json.RawMessage `gorm:"column:notice; type:json" json:"notice" form:"notice"` // TODO:绑定通知
|
||||
Notice json.RawMessage `gorm:"column:notice; type:json" json:"notice" form:"notice"` // 绑定通知
|
||||
Remarks string `gorm:"column:remarks; type:varchar(1024)" json:"remarks" form:"remarks"` // 流程备注
|
||||
}
|
||||
|
||||
func (Info) TableName() string {
|
||||
|
@ -348,6 +348,12 @@ func (h *Handle) HandleWorkOrder(
|
||||
noticeList []int
|
||||
sendSubject string = "您有一条待办工单,请及时处理"
|
||||
sendDescription string = "您有一条待办工单请及时处理,工单描述如下"
|
||||
paramsValue struct {
|
||||
Id int `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Priority int `json:"priority"`
|
||||
FormData []interface{} `json:"form_data"`
|
||||
}
|
||||
)
|
||||
|
||||
defer func() {
|
||||
@ -638,6 +644,8 @@ func (h *Handle) HandleWorkOrder(
|
||||
return
|
||||
}
|
||||
|
||||
paramsValue.FormData = append(paramsValue.FormData, t["tplValue"])
|
||||
|
||||
// 是否可写,只有可写的模版可以更新数据
|
||||
updateStatus := false
|
||||
if writeTplList, writeOK := h.stateValue["writeTpls"]; writeOK {
|
||||
@ -823,7 +831,15 @@ continueTag:
|
||||
}
|
||||
execTasks = append(execTasks, task)
|
||||
}
|
||||
go ExecTask(execTasks)
|
||||
|
||||
paramsValue.Id = h.workOrderDetails.Id
|
||||
paramsValue.Title = h.workOrderDetails.Title
|
||||
paramsValue.Priority = h.workOrderDetails.Priority
|
||||
params, err := json.Marshal(paramsValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go ExecTask(execTasks, string(params))
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -12,13 +12,13 @@ import (
|
||||
@Author : lanyulei
|
||||
*/
|
||||
|
||||
func ExecTask(taskList []string) {
|
||||
func ExecTask(taskList []string, params string) {
|
||||
for _, taskName := range taskList {
|
||||
filePath := fmt.Sprintf("%v/%v", viper.GetString("script.path"), taskName)
|
||||
if strings.HasSuffix(filePath, ".py") {
|
||||
task.Send("python", filePath)
|
||||
task.Send("python", filePath, params)
|
||||
} else if strings.HasSuffix(filePath, ".sh") {
|
||||
task.Send("shell", filePath)
|
||||
task.Send("shell", filePath, params)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,6 @@ import (
|
||||
"ferry/pkg/task/worker"
|
||||
)
|
||||
|
||||
func Send(classify string, scriptPath string) {
|
||||
worker.SendTask(context.Background(), classify, scriptPath)
|
||||
func Send(classify string, scriptPath string, params string) {
|
||||
worker.SendTask(context.Background(), classify, scriptPath, params)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ func Start() {
|
||||
worker.StartServer()
|
||||
|
||||
// 2. 启动异步调度
|
||||
taskWorker := worker.NewAsyncTaskWorker(1)
|
||||
taskWorker := worker.NewAsyncTaskWorker(10)
|
||||
err := taskWorker.Launch()
|
||||
if err != nil {
|
||||
logger.Errorf("启动machinery失败,%v", err.Error())
|
||||
|
@ -2,6 +2,7 @@ package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"ferry/pkg/logger"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
@ -11,41 +12,41 @@ import (
|
||||
|
||||
var asyncTaskMap map[string]interface{}
|
||||
|
||||
func executeTaskBase(scriptPath string) {
|
||||
command := exec.Command("/bin/bash", "-c", scriptPath) //初始化Cmd
|
||||
err := command.Start() //运行脚本
|
||||
if nil != err {
|
||||
func executeTaskBase(scriptPath string, params string) (err error) {
|
||||
command := exec.Command(scriptPath, params) //初始化Cmd
|
||||
out, err := command.CombinedOutput()
|
||||
if err != nil {
|
||||
logger.Errorf("task exec failed,%v", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Process PID:", command.Process.Pid)
|
||||
|
||||
err = command.Wait() //等待执行完成
|
||||
if nil != err {
|
||||
logger.Errorf("task exec failed,%v", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Output: ", string(out))
|
||||
logger.Info("ProcessState PID: ", command.ProcessState.Pid())
|
||||
logger.Info("Exit Code ", command.ProcessState.Sys().(syscall.WaitStatus).ExitStatus())
|
||||
return
|
||||
}
|
||||
|
||||
// ExecCommand 异步任务
|
||||
func ExecCommand(classify string, scriptPath string) error {
|
||||
func ExecCommand(classify string, scriptPath string, params string) (err error) {
|
||||
if classify == "shell" {
|
||||
logger.Info("start exec shell...", scriptPath)
|
||||
executeTaskBase(scriptPath)
|
||||
return nil
|
||||
} else if classify == "python" {
|
||||
logger.Info("start exec python...", scriptPath)
|
||||
executeTaskBase(scriptPath)
|
||||
return nil
|
||||
logger.Info("start exec shell - ", scriptPath)
|
||||
err = executeTaskBase(scriptPath, params)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return nil
|
||||
} else if classify == "python" {
|
||||
logger.Info("start exec python - ", scriptPath)
|
||||
err = executeTaskBase(scriptPath, params)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = errors.New("目前仅支持Python与Shell脚本的执行,请知悉。")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func SendTask(ctx context.Context, classify string, scriptPath string) {
|
||||
func SendTask(ctx context.Context, classify string, scriptPath string, params string) {
|
||||
args := make([]tasks.Arg, 0)
|
||||
args = append(args, tasks.Arg{
|
||||
Name: "classify",
|
||||
@ -57,6 +58,11 @@ func SendTask(ctx context.Context, classify string, scriptPath string) {
|
||||
Type: "string",
|
||||
Value: scriptPath,
|
||||
})
|
||||
args = append(args, tasks.Arg{
|
||||
Name: "params",
|
||||
Type: "string",
|
||||
Value: params,
|
||||
})
|
||||
task, _ := tasks.NewSignature("ExecCommandTask", args)
|
||||
task.RetryCount = 5
|
||||
_, err := AsyncTaskCenter.SendTaskWithContext(ctx, task)
|
||||
|
@ -2,7 +2,6 @@ package worker
|
||||
|
||||
import (
|
||||
"ferry/pkg/logger"
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
|
||||
@ -23,10 +22,7 @@ func StartServer() {
|
||||
|
||||
func NewTaskCenter() (*machinery.Server, error) {
|
||||
cnf := &taskConfig.Config{
|
||||
Broker: fmt.Sprintf("redis://%v:%v",
|
||||
viper.GetString("settings.redis.host"),
|
||||
viper.GetString("settings.redis.port"),
|
||||
),
|
||||
Broker: viper.GetString("settings.redis.url"),
|
||||
DefaultQueue: "ServerTasksQueue",
|
||||
ResultBackend: "eager",
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user