Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions example/stovepipe/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ go_library(
importpath = "github.com/uber/submitqueue/example/stovepipe/orchestrator/server",
visibility = ["//visibility:private"],
deps = [
"//core/consumer",
"//core/errs",
"//core/errs/generic",
"//core/errs/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//stovepipe/core/topickey",
"//stovepipe/orchestrator/controller",
"//stovepipe/orchestrator/controller/start",
"//stovepipe/orchestrator/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//reflection",
Expand Down
99 changes: 98 additions & 1 deletion example/stovepipe/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"net"
Expand All @@ -25,8 +26,17 @@ import (
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
genericerrs "github.com/uber/submitqueue/core/errs/generic"
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/stovepipe/core/topickey"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller/start"
pb "github.com/uber/submitqueue/stovepipe/orchestrator/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -102,6 +112,62 @@ func run() error {
metricsWgDone.Wait()
}()

queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
if queueDSN == "" {
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
}
queueDB, err := sql.Open("mysql", queueDSN)
if err != nil {
return fmt.Errorf("failed to open queue database: %w", err)
}
defer queueDB.Close()

mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer mysqlQueue.Close()

logger.Info("initialized queue", zap.String("dsn", queueDSN))

subscriberName := os.Getenv("HOSTNAME")
if subscriberName == "" {
subscriberName = fmt.Sprintf("stovepipe-orchestrator-%d", time.Now().Unix())
}

registry, err := newTopicRegistry(mysqlQueue, subscriberName)
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

startController := start.NewController(start.Params{
Logger: logger.Sugar(),
Scope: scope,
Registry: registry,
TopicKey: topickey.TopicKeyStart,
ConsumerGroup: "orchestrator-start",
})
if err := primaryConsumer.Register(startController); err != nil {
return fmt.Errorf("failed to register start controller: %w", err)
}
logger.Info("controllers registered", zap.Int("primary", 1))

if err := primaryConsumer.Start(ctx); err != nil {
return fmt.Errorf("failed to start primary consumer: %w", err)
}
logger.Info("consumer started")

grpcServer := grpc.NewServer()

pingController := controller.NewPingController(logger, scope)
Expand Down Expand Up @@ -140,11 +206,42 @@ func run() error {
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down stovepipe orchestrator server due to critical GRPC server error...")
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

primaryStopErr := primaryConsumer.Stop(30000)
if primaryStopErr != nil {
primaryStopErr = fmt.Errorf("failed to stop consumer: %w", primaryStopErr)
}

if primaryStopErr != nil || serverErr != nil {
err = errors.Join(primaryStopErr, serverErr)
}

return err
}

func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: topickey.TopicKeyStart,
Name: "start",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-start",
),
},
{
Key: topickey.TopicKeyValidate,
Name: "validate",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-validate",
),
},
})
}
Loading