diff --git a/cmd/api/server.go b/cmd/api/server.go index 0eafabb..93b938a 100644 --- a/cmd/api/server.go +++ b/cmd/api/server.go @@ -5,6 +5,7 @@ import ( "ferry/database" "ferry/global/orm" "ferry/pkg/logger" + "ferry/pkg/task" "ferry/router" "ferry/tools" config2 "ferry/tools/config" @@ -52,10 +53,12 @@ func usage() { func setup() { - //1. 读取配置 + // 1. 读取配置 config2.ConfigSetup(config) - //2. 初始化数据库链接 + // 2. 初始化数据库链接 database.Setup() + // 3. 启动异步任务队列 + go task.Start() } diff --git a/main.go b/main.go index 284174b..cb33336 100644 --- a/main.go +++ b/main.go @@ -2,12 +2,8 @@ package main import ( "ferry/cmd" - "ferry/pkg/task" ) func main() { - // 启动异步任务队列 - go task.Start() - cmd.Execute() } diff --git a/pkg/task/server.go b/pkg/task/server.go index 0ff8812..3a12191 100644 --- a/pkg/task/server.go +++ b/pkg/task/server.go @@ -10,8 +10,11 @@ import ( ) func Start() { - // 启动异步任务框架 - taskWorker := worker.NewAsyncTaskWorker(0) + // 1. 启动服务,连接redis + worker.StartServer() + + // 2. 启动异步调度 + taskWorker := worker.NewAsyncTaskWorker(1) err := taskWorker.Launch() if err != nil { logger.Errorf("启动machinery失败,%v", err.Error()) diff --git a/pkg/task/worker/worker.go b/pkg/task/worker/worker.go index 9f07761..636b7e3 100644 --- a/pkg/task/worker/worker.go +++ b/pkg/task/worker/worker.go @@ -2,6 +2,9 @@ package worker import ( "ferry/pkg/logger" + "fmt" + + "github.com/spf13/viper" "github.com/RichardKnop/machinery/v1" taskConfig "github.com/RichardKnop/machinery/v1/config" @@ -10,7 +13,7 @@ import ( var AsyncTaskCenter *machinery.Server -func init() { +func StartServer() { tc, err := NewTaskCenter() if err != nil { panic(err) @@ -20,7 +23,10 @@ func init() { func NewTaskCenter() (*machinery.Server, error) { cnf := &taskConfig.Config{ - Broker: "redis://127.0.0.1:6379", + Broker: fmt.Sprintf("redis://%v:%v", + viper.GetString("settings.redis.host"), + viper.GetString("settings.redis.port"), + ), DefaultQueue: "ServerTasksQueue", ResultBackend: "eager", }