Skip to content

Commit

Permalink
feat: export service entrypoint
Browse files Browse the repository at this point in the history
Move the entrypoint logic into its own package, so it can be imported from other projects.

Signed-off-by: Utku Ozdemir <[email protected]>
  • Loading branch information
utkuozdemir committed May 30, 2024
1 parent 86e1317 commit 74bca2d
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 321 deletions.
2 changes: 1 addition & 1 deletion .kres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
baseSpecPath: /api
vtProtobufEnabled: true
specs:
- source: api/storage/storage.proto
- source: api/storage/discovery.proto
subdirectory: storage
genGateway: false
---
Expand Down
14 changes: 7 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-05-28T00:03:10Z by kres bcb280a.
# Generated on 2024-05-30T08:00:54Z by kres f249b6c.

ARG TOOLCHAIN

Expand All @@ -21,7 +21,7 @@ RUN markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ignore

# collects proto specs
FROM scratch AS proto-specs
ADD api/storage/storage.proto /api/storage/
ADD api/storage/discovery.proto /api/storage/

# base toolchain image
FROM --platform=${BUILDPLATFORM} ${TOOLCHAIN} AS toolchain
Expand All @@ -37,6 +37,9 @@ ENV GOTOOLCHAIN ${GOTOOLCHAIN}
ARG GOEXPERIMENT
ENV GOEXPERIMENT ${GOEXPERIMENT}
ENV GOPATH /go
ARG GOIMPORTS_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@v${GOIMPORTS_VERSION}
RUN mv /go/bin/goimports /bin
ARG PROTOBUF_GO_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/protobuf/cmd/protoc-gen-go@v${PROTOBUF_GO_VERSION}
RUN mv /go/bin/protoc-gen-go /bin
Expand All @@ -46,9 +49,6 @@ RUN mv /go/bin/protoc-gen-go-grpc /bin
ARG GRPC_GATEWAY_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v${GRPC_GATEWAY_VERSION}
RUN mv /go/bin/protoc-gen-grpc-gateway /bin
ARG GOIMPORTS_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@v${GOIMPORTS_VERSION}
RUN mv /go/bin/goimports /bin
ARG VTPROTOBUF_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto@v${VTPROTOBUF_VERSION}
RUN mv /go/bin/protoc-gen-go-vtproto /bin
Expand Down Expand Up @@ -81,8 +81,8 @@ RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null
# runs protobuf compiler
FROM tools AS proto-compile
COPY --from=proto-specs / /
RUN protoc -I/api --go_out=paths=source_relative:/api --go-grpc_out=paths=source_relative:/api --go-vtproto_out=paths=source_relative:/api --go-vtproto_opt=features=marshal+unmarshal+size+equal+clone /api/storage/storage.proto
RUN rm /api/storage/storage.proto
RUN protoc -I/api --go_out=paths=source_relative:/api --go-grpc_out=paths=source_relative:/api --go-vtproto_out=paths=source_relative:/api --go-vtproto_opt=features=marshal+unmarshal+size+equal+clone /api/storage/discovery.proto
RUN rm /api/storage/discovery.proto
RUN goimports -w -local github.com/siderolabs/discovery-service /api
RUN gofumpt -w /api

Expand Down
155 changes: 78 additions & 77 deletions api/storage/storage.pb.go → api/storage/discovery.pb.go

Large diffs are not rendered by default.

File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

259 changes: 24 additions & 235 deletions cmd/discovery-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,19 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/jonboulle/clockwork"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/siderolabs/discovery-api/api/v1alpha1/server/pb"
"github.com/prometheus/client_golang/prometheus"
"github.com/siderolabs/go-debug"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/siderolabs/discovery-service/internal/landing"
"github.com/siderolabs/discovery-service/internal/limiter"
_ "github.com/siderolabs/discovery-service/internal/proto"
"github.com/siderolabs/discovery-service/internal/state"
"github.com/siderolabs/discovery-service/internal/state/storage"
"github.com/siderolabs/discovery-service/pkg/limits"
"github.com/siderolabs/discovery-service/pkg/server"
"github.com/siderolabs/discovery-service/pkg/service"
)

var (
Expand Down Expand Up @@ -95,7 +74,28 @@ func main() {
zap.ReplaceGlobals(logger)
zap.RedirectStdLog(logger)

if err = signalHandler(context.Background(), logger, run); err != nil {
if err = signalHandler(context.Background(), logger, func(ctx context.Context, logger *zap.Logger) error {
return service.Run(ctx, service.Options{
SnapshotsEnabled: snapshotsEnabled,
SnapshotPath: snapshotPath,
SnapshotInterval: snapshotInterval,

RedirectEndpoint: redirectEndpoint,
ListenAddr: listenAddr,
GCInterval: gcInterval,

LandingServerEnabled: true,
LandingAddr: landingAddr,

DebugServerEnabled: true,
DebugAddr: debugAddr,

MetricsServerEnabled: true,
MetricsAddr: metricsAddr,

MetricsRegisterer: prometheus.DefaultRegisterer,
}, logger)
}); err != nil {
logger.Error("service failed", zap.Error(err))

os.Exit(1)
Expand All @@ -108,214 +108,3 @@ func signalHandler(ctx context.Context, logger *zap.Logger, f func(ctx context.C

return f(ctx, logger)
}

func recoveryHandler(logger *zap.Logger) grpc_recovery.RecoveryHandlerFunc {
return func(p interface{}) error {
if logger != nil {
logger.Error("grpc panic", zap.Any("panic", p), zap.Stack("stack"))
}

return status.Errorf(codes.Internal, "%v", p)
}
}

func interceptorLogger(l *zap.Logger) logging.Logger {
return logging.LoggerFunc(func(_ context.Context, lvl logging.Level, msg string, fields ...any) {
f := make([]zap.Field, 0, len(fields)/2)

for i := 0; i < len(fields); i += 2 {
key := fields[i].(string) //nolint:forcetypeassert,errcheck
value := fields[i+1]

switch v := value.(type) {
case string:
f = append(f, zap.String(key, v))
case int:
f = append(f, zap.Int(key, v))
case bool:
f = append(f, zap.Bool(key, v))
default:
f = append(f, zap.Any(key, v))
}
}

logger := l.WithOptions(zap.AddCallerSkip(1)).With(f...)

switch lvl {
case logging.LevelDebug:
logger.Debug(msg)
case logging.LevelInfo:
logger.Info(msg)
case logging.LevelWarn:
logger.Warn(msg)
case logging.LevelError:
logger.Error(msg)
default:
panic(fmt.Sprintf("unknown level %v", lvl))
}
})
}

func run(ctx context.Context, logger *zap.Logger) error {
logger.Info("service starting")

defer logger.Info("service shut down")

recoveryOpt := grpc_recovery.WithRecoveryHandler(recoveryHandler(logger))

limiter := limiter.NewIPRateLimiter(limits.IPRateRequestsPerSecondMax, limits.IPRateBurstSizeMax)

metrics := grpc_prometheus.NewServerMetrics(
grpc_prometheus.WithServerHandlingTimeHistogram(grpc_prometheus.WithHistogramBuckets([]float64{0.01, 0.1, 0.25, 0.5, 1.0, 2.5})),
)

loggingOpts := []logging.Option{
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
logging.WithFieldsFromContext(logging.ExtractFields),
}

//nolint:contextcheck
serverOptions := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(
server.AddLoggingFieldsUnaryServerInterceptor(),
logging.UnaryServerInterceptor(interceptorLogger(logger), loggingOpts...),
server.RateLimitUnaryServerInterceptor(limiter),
metrics.UnaryServerInterceptor(),
grpc_recovery.UnaryServerInterceptor(recoveryOpt),
),
grpc.ChainStreamInterceptor(
server.AddLoggingFieldsStreamServerInterceptor(),
server.RateLimitStreamServerInterceptor(limiter),
logging.StreamServerInterceptor(interceptorLogger(logger), loggingOpts...),
metrics.StreamServerInterceptor(),
grpc_recovery.StreamServerInterceptor(recoveryOpt),
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
}),
grpc.SharedWriteBuffer(true),
experimental.RecvBufferPool(grpc.NewSharedBufferPool()),
grpc.ReadBufferSize(16 * 1024),
grpc.WriteBufferSize(16 * 1024),
}

state := state.NewState(logger)
prom.MustRegister(state)

var stateStorage *storage.Storage

if snapshotsEnabled {
stateStorage = storage.New(snapshotPath, state, logger)
prom.MustRegister(stateStorage)

if err := stateStorage.Load(); err != nil {
logger.Warn("failed to load state from storage", zap.Error(err))
}
} else {
logger.Info("snapshots are disabled")
}

srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint)
prom.MustRegister(srv)

lis, err := net.Listen("tcp", listenAddr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

landingLis, err := net.Listen("tcp", landingAddr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

s := grpc.NewServer(serverOptions...)
pb.RegisterClusterServer(s, srv)

metrics.InitializeMetrics(s)

if err = prom.Register(metrics); err != nil {
return fmt.Errorf("failed to register metrics: %w", err)
}

var metricsMux http.ServeMux

metricsMux.Handle("/metrics", promhttp.Handler())

metricsServer := http.Server{
Addr: metricsAddr,
Handler: &metricsMux,
}

landingServer := http.Server{
Handler: landing.Handler(state, logger),
}

eg, ctx := errgroup.WithContext(ctx)

if snapshotsEnabled {
eg.Go(func() error {
return stateStorage.Start(ctx, clockwork.NewRealClock(), snapshotInterval)
})
}

eg.Go(func() error {
logger.Info("gRPC server starting", zap.Stringer("address", lis.Addr()))

if err := s.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %w", err)
}

return nil
})

eg.Go(func() error {
logger.Info("landing server starting", zap.Stringer("address", landingLis.Addr()))

if err := landingServer.Serve(landingLis); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("failed to serve: %w", err)
}

return nil
})

eg.Go(func() error {
logger.Info("metrics starting", zap.String("address", metricsServer.Addr))

if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

return nil
})

eg.Go(func() error {
<-ctx.Done()

shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

s.GracefulStop()
landingServer.Shutdown(ctx) //nolint:errcheck
metricsServer.Shutdown(shutdownCtx) //nolint:errcheck,contextcheck

return nil
})

eg.Go(func() error {
state.RunGC(ctx, logger, gcInterval)

return nil
})

eg.Go(func() error {
limiter.RunGC(ctx)

return nil
})

eg.Go(func() error {
return debug.ListenAndServe(ctx, debugAddr, func(msg string) { logger.Info(msg) })
})

return eg.Wait()
}
Loading

0 comments on commit 74bca2d

Please sign in to comment.