Skip to content

Commit

Permalink
Merge pull request #437 from bookingcom/grzkv/redesign_and_refactoring
Browse files Browse the repository at this point in the history
Further redesign and refactoring
  • Loading branch information
grzkv authored Nov 30, 2022
2 parents ad02a9a + c22acee commit f6008a1
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 402 deletions.
122 changes: 116 additions & 6 deletions pkg/app/carbonapi/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package carbonapi

import (
"fmt"
"log"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
Expand All @@ -12,12 +16,14 @@ import (

"github.com/bookingcom/carbonapi/expr/functions"
"github.com/bookingcom/carbonapi/expr/functions/cairo/png"
"github.com/bookingcom/carbonapi/pkg/app/zipper"
"github.com/bookingcom/carbonapi/pkg/backend"
bnet "github.com/bookingcom/carbonapi/pkg/backend/net"
"github.com/bookingcom/carbonapi/pkg/blocker"
"github.com/bookingcom/carbonapi/pkg/cache"
"github.com/bookingcom/carbonapi/pkg/carbonapipb"
"github.com/bookingcom/carbonapi/pkg/cfg"
"github.com/bookingcom/carbonapi/pkg/parser"
"github.com/dgryski/go-expirecache"

"github.com/facebookgo/grace/gracehttp"
"github.com/facebookgo/pidfile"
Expand Down Expand Up @@ -46,7 +52,12 @@ type App struct {
ms PrometheusMetrics
Lg *zap.Logger

Zipper *zipper.App
ZipperConfig cfg.Zipper
ZipperBackends []backend.Backend
ZipperTopLevelDomainCache *expirecache.Cache
ZipperTLDPrefixes []TLDPrefix
ZipperMetrics *ZipperPrometheusMetrics
ZipperLg *zap.Logger
}

// New creates a new app
Expand All @@ -70,16 +81,15 @@ func New(config cfg.API, lg *zap.Logger, buildVersion string) (*App, error) {

setUpConfig(app, lg)

app.Zipper = zipper.Setup(config.ZipperConfig, BuildVersion, lg)
go zipper.ProbeTopLevelDomains(app.Zipper.TopLevelDomainCache, app.Zipper.TLDPrefixes, app.Zipper.Backends, app.Zipper.Config.InternalRoutingCache, app.Zipper.Metrics)
registerZipperMetrics(app.Zipper)
app.ZipperConfig, app.ZipperBackends, app.ZipperTopLevelDomainCache, app.ZipperTLDPrefixes, app.ZipperMetrics, app.ZipperLg = SetupZipper(config.ZipperConfig, BuildVersion, lg)
go ProbeTopLevelDomains(app.ZipperTopLevelDomainCache, app.ZipperTLDPrefixes, app.ZipperBackends, app.ZipperConfig.InternalRoutingCache, app.ZipperMetrics)

return app, nil
}

// Start starts the app: inits handlers, logger, starts HTTP server
func (app *App) Start(logger *zap.Logger) {
app.registerPrometheusMetrics()
registerPrometheusMetrics(&app.ms, app.ZipperMetrics)

handler := initHandlers(app, logger)
internalHandler := initHandlersInternal(app, logger)
Expand Down Expand Up @@ -281,3 +291,103 @@ func (app *App) bucketRequestTimes(req *http.Request, t time.Duration) {
app.ms.FindDurationLin.Observe(t.Seconds())
}
}

func InitBackends(config cfg.Zipper, ms *ZipperPrometheusMetrics, logger *zap.Logger) ([]backend.Backend, error) {
client := &http.Client{}
client.Transport = &http.Transport{
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
IdleConnTimeout: 3 * time.Second,
DialContext: (&net.Dialer{
Timeout: config.Timeouts.Connect,
KeepAlive: config.KeepAliveInterval,
DualStack: true,
}).DialContext,
}

configBackendList := config.GetBackends()
backends := make([]backend.Backend, 0, len(configBackendList))
for _, host := range configBackendList {
if host.Http == "" {
return nil, fmt.Errorf("backend without http address was provided: %+v", host)
}
dc, cluster, _ := config.InfoOfBackend(host.Http)
var b backend.Backend
var err error

bConf := bnet.Config{
Address: host.Http,
DC: dc,
Cluster: cluster,
Client: client,
Timeout: config.Timeouts.AfterStarted,
PathCacheExpirySec: uint32(config.ExpireDelaySec),
QHist: ms.TimeInQueueSeconds,
Responses: ms.BackendResponses,
Logger: logger,
}
var be backend.BackendImpl
if host.Grpc != "" {
be, err = bnet.NewGrpc(bnet.GrpcConfig{
Config: bConf,
GrpcAddress: host.Grpc,
})
} else {
be, err = bnet.New(bConf)
}
b = backend.NewBackend(be,
config.BackendQueueSize,
config.ConcurrencyLimitPerServer,
ms.BackendRequestsInQueue,
ms.BackendSemaphoreSaturation,
ms.BackendTimeInQSec,
ms.BackendEnqueuedRequests)

if err != nil {
return backends, fmt.Errorf("Couldn't create backend for '%s'", host)
}

backends = append(backends, b)
}

return backends, nil
}

// Setup sets up the zipper for future lanuch.
func SetupZipper(configFile string, BuildVersion string, lg *zap.Logger) (cfg.Zipper, []backend.Backend, *expirecache.Cache,
[]TLDPrefix, *ZipperPrometheusMetrics, *zap.Logger) {
if configFile == "" {
log.Fatal("missing config file option")
}

fh, err := os.Open(configFile)
if err != nil {
log.Fatalf("unable to read config file: %s", err)
}

config, err := cfg.ParseZipperConfig(fh)
if err != nil {
log.Fatalf("failed to parse config at %s: %s", configFile, err)
}
fh.Close()

if config.MaxProcs != 0 {
runtime.GOMAXPROCS(config.MaxProcs)
}

if len(config.GetBackends()) == 0 {
log.Fatal("no backends loaded; exiting")
}

lg.Info("starting carbonzipper",
zap.String("build_version", BuildVersion),
zap.String("zipperConfig", fmt.Sprintf("%+v", config)),
)

ms := NewZipperPrometheusMetrics(config)
bs, err := InitBackends(config, ms, lg)
if err != nil {
lg.Fatal("failed to init backends", zap.Error(err))
}

return config, bs, expirecache.New(0), InitTLDPrefixes(lg, config.TLDCacheExtraPrefixes), ms, lg
}
8 changes: 4 additions & 4 deletions pkg/app/carbonapi/http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/bookingcom/carbonapi/expr/interfaces"
"github.com/bookingcom/carbonapi/expr/metadata"
"github.com/bookingcom/carbonapi/expr/types"
"github.com/bookingcom/carbonapi/pkg/app/zipper"
"github.com/bookingcom/carbonapi/pkg/cache"
"github.com/bookingcom/carbonapi/pkg/carbonapipb"
"github.com/bookingcom/carbonapi/pkg/date"
Expand Down Expand Up @@ -554,7 +553,8 @@ func (app *App) sendRenderRequest(ctx context.Context, path string, from, until
var err error
var metrics []dataTypes.Metric
app.ms.UpstreamRequests.WithLabelValues("render").Inc()
metrics, err = zipper.Render(app.Zipper, ctx, path, int64(from), int64(until), app.Zipper.Metrics, app.Zipper.Lg)
metrics, err = Render(app.ZipperTopLevelDomainCache, app.ZipperTLDPrefixes, app.ZipperBackends,
app.ZipperConfig.RenderReplicaMismatchConfig, ctx, path, int64(from), int64(until), app.ZipperMetrics, app.ZipperLg)

// time in queue is converted to ms
app.ms.TimeInQueueExp.Observe(float64(request.Trace.Report()[2]) / 1000 / 1000)
Expand Down Expand Up @@ -771,7 +771,7 @@ func (app *App) resolveGlobs(ctx context.Context, metric string, useCache bool,

Trace(lg, "sending find request upstream")
app.ms.UpstreamRequests.WithLabelValues("find").Inc()
matches, err = zipper.Find(app.Zipper, ctx, request.Query, app.Zipper.Metrics, app.Zipper.Lg)
matches, err = Find(app.ZipperTopLevelDomainCache, app.ZipperTLDPrefixes, app.ZipperBackends, ctx, request.Query, app.ZipperMetrics, app.ZipperLg)

if err != nil {
Trace(lg, "upstream find request failed", zap.Error(err))
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (app *App) infoHandler(w http.ResponseWriter, r *http.Request, lg *zap.Logg
var infos []dataTypes.Info
var err error
app.ms.UpstreamRequests.WithLabelValues("info").Inc()
infos, err = zipper.Info(app.Zipper, ctx, query, app.Zipper.Metrics, app.Zipper.Lg)
infos, err = Info(app.ZipperTopLevelDomainCache, app.ZipperTLDPrefixes, app.ZipperBackends, ctx, query, app.ZipperMetrics, app.ZipperLg)

if err != nil {
var notFound dataTypes.ErrNotFound
Expand Down
Loading

0 comments on commit f6008a1

Please sign in to comment.