From 8334ba32ec4a0f3fefb85d31f752200c07511215 Mon Sep 17 00:00:00 2001 From: YuleiLan Date: Fri, 21 Aug 2020 10:32:14 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9E=E6=8E=A5redis=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/api/server.go | 7 +++++-- main.go | 4 ---- pkg/task/server.go | 7 +++++-- pkg/task/worker/worker.go | 10 ++++++++-- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/cmd/api/server.go b/cmd/api/server.go index 0eafabb..93b938a 100644 --- a/cmd/api/server.go +++ b/cmd/api/server.go @@ -5,6 +5,7 @@ import ( "ferry/database" "ferry/global/orm" "ferry/pkg/logger" + "ferry/pkg/task" "ferry/router" "ferry/tools" config2 "ferry/tools/config" @@ -52,10 +53,12 @@ func usage() { func setup() { - //1. 读取配置 + // 1. 读取配置 config2.ConfigSetup(config) - //2. 初始化数据库链接 + // 2. 初始化数据库链接 database.Setup() + // 3. 启动异步任务队列 + go task.Start() } diff --git a/main.go b/main.go index 284174b..cb33336 100644 --- a/main.go +++ b/main.go @@ -2,12 +2,8 @@ package main import ( "ferry/cmd" - "ferry/pkg/task" ) func main() { - // 启动异步任务队列 - go task.Start() - cmd.Execute() } diff --git a/pkg/task/server.go b/pkg/task/server.go index 0ff8812..3a12191 100644 --- a/pkg/task/server.go +++ b/pkg/task/server.go @@ -10,8 +10,11 @@ import ( ) func Start() { - // 启动异步任务框架 - taskWorker := worker.NewAsyncTaskWorker(0) + // 1. 启动服务,连接redis + worker.StartServer() + + // 2. 启动异步调度 + taskWorker := worker.NewAsyncTaskWorker(1) err := taskWorker.Launch() if err != nil { logger.Errorf("启动machinery失败,%v", err.Error()) diff --git a/pkg/task/worker/worker.go b/pkg/task/worker/worker.go index 9f07761..636b7e3 100644 --- a/pkg/task/worker/worker.go +++ b/pkg/task/worker/worker.go @@ -2,6 +2,9 @@ package worker import ( "ferry/pkg/logger" + "fmt" + + "github.com/spf13/viper" "github.com/RichardKnop/machinery/v1" taskConfig "github.com/RichardKnop/machinery/v1/config" @@ -10,7 +13,7 @@ import ( var AsyncTaskCenter *machinery.Server -func init() { +func StartServer() { tc, err := NewTaskCenter() if err != nil { panic(err) @@ -20,7 +23,10 @@ func init() { func NewTaskCenter() (*machinery.Server, error) { cnf := &taskConfig.Config{ - Broker: "redis://127.0.0.1:6379", + Broker: fmt.Sprintf("redis://%v:%v", + viper.GetString("settings.redis.host"), + viper.GetString("settings.redis.port"), + ), DefaultQueue: "ServerTasksQueue", ResultBackend: "eager", }