diff --git a/caboose.go b/caboose.go index a588981..3fca9b4 100644 --- a/caboose.go +++ b/caboose.go @@ -15,12 +15,18 @@ import ( type Config struct { OrchestratorEndpoint url.URL OrchestratorClient *http.Client - Client *http.Client - DoValidation bool - AffinityKey string - PoolRefresh time.Duration - PoolMaxSize int - MaxConcurrency int + + LoggingEndpoint url.URL + LoggingClient *http.Client + LoggingInterval time.Duration + + Client *http.Client + + DoValidation bool + AffinityKey string + PoolRefresh time.Duration + PoolMaxSize int + MaxConcurrency int } var ErrNotImplemented error = errors.New("not implemented") @@ -28,13 +34,16 @@ var ErrNotImplemented error = errors.New("not implemented") type Caboose struct { config *Config pool *pool + logger *logger } func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { c := Caboose{ config: config, pool: newPool(config), + logger: newLogger(config), } + c.pool.logger = c.logger if c.config.Client == nil { c.config.Client = http.DefaultClient } @@ -43,6 +52,7 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { func (c *Caboose) Close() { c.pool.Close() + c.logger.Close() } func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) { diff --git a/cmd/caboose/main.go b/cmd/caboose/main.go index 1c5f048..c4e3907 100644 --- a/cmd/caboose/main.go +++ b/cmd/caboose/main.go @@ -39,6 +39,7 @@ func main1() int { out := args.Get(1) oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby") + le, _ := url.Parse("https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/") saturnClient := http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ @@ -50,9 +51,14 @@ func main1() int { cb, err := caboose.NewCaboose(&caboose.Config{ OrchestratorEndpoint: *oe, OrchestratorClient: http.DefaultClient, - DoValidation: true, - PoolRefresh: 5 * time.Minute, - Client: &saturnClient, + + LoggingEndpoint: *le, + LoggingClient: http.DefaultClient, + LoggingInterval: 5 * time.Second, + + DoValidation: true, + PoolRefresh: 5 * time.Minute, + Client: &saturnClient, }) if err != nil { return err diff --git a/go.mod b/go.mod index 7029252..e293a18 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/buraksezer/consistent v0.10.0 + github.com/google/uuid v1.3.0 github.com/ipfs/go-block-format v0.1.1 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-ipfs-blockstore v1.2.0 @@ -18,7 +19,6 @@ require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-blockservice v0.5.0 // indirect diff --git a/log.go b/log.go new file mode 100644 index 0000000..6dd66c6 --- /dev/null +++ b/log.go @@ -0,0 +1,82 @@ +package caboose + +import ( + "bytes" + "encoding/json" + "net/http" + "net/url" + "time" +) + +type logger struct { + queue chan log + freq time.Duration + client *http.Client + endpoint url.URL + done chan struct{} +} + +func newLogger(c *Config) *logger { + l := logger{ + queue: make(chan log, 5), + freq: c.LoggingInterval, + client: c.LoggingClient, + endpoint: c.LoggingEndpoint, + done: make(chan struct{}), + } + go l.background() + return &l +} + +func (l *logger) background() { + t := time.NewTimer(l.freq) + pending := make([]log, 0, 100) + for { + select { + case l := <-l.queue: + pending = append(pending, l) + case <-t.C: + if len(pending) > 0 { + //submit. + toSubmit := make([]log, len(pending)) + copy(toSubmit, pending) + pending = pending[:0] + go l.submit(toSubmit) + } + t.Reset(l.freq) + case <-l.done: + return + } + } +} + +func (l *logger) submit(logs []log) { + finalLogs := bytes.NewBuffer(nil) + enc := json.NewEncoder(finalLogs) + enc.Encode(logBatch{logs}) + l.client.Post(l.endpoint.String(), "application/json", finalLogs) +} + +func (l *logger) Close() { + close(l.done) +} + +type logBatch struct { + Logs []log `json:"bandwidthLogs"` +} + +type log struct { + CacheHit bool `json:"cacheHit"` + URL string `json:"url"` + LocalTime time.Time `json:"localTime"` + NumBytesSent int `json:"numBytesSent"` + RequestDuration float64 `json:"requestDuration"` // in seconds + RequestID string `json:"requestId"` + HTTPStatusCode int `json:"httpStatusCode"` + HTTPProtocol string `json:"httpProtocol"` + TTFBMS int `json:"ttfbMs"` + ClientAddress string `json:"clientAddress"` + Range string `json:"range"` + Referrer string `json:"referrer"` + UserAgent string `json:"userAgent"` +} diff --git a/pool.go b/pool.go index e4c6975..f237bdd 100644 --- a/pool.go +++ b/pool.go @@ -13,6 +13,7 @@ import ( "time" "github.com/buraksezer/consistent" + "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ) @@ -34,6 +35,7 @@ func (p *pool) loadPool() ([]string, error) { type pool struct { config *Config endpoints []Member + logger *logger c *consistent.Consistent lk sync.RWMutex started chan struct{} @@ -162,7 +164,32 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blocks.Bl var tmpl = "http://%s/ipfs/%s?format=raw" -func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (blocks.Block, error) { +func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (b blocks.Block, e error) { + start := time.Now() + fb := time.Now() + code := 0 + proto := "unknown" + respReq := &http.Request{} + received := 0 + defer func() { + p.logger.queue <- log{ + CacheHit: false, + URL: "", + LocalTime: start, + // TODO: does this include header sizes? + NumBytesSent: received, + RequestDuration: time.Now().Sub(start).Seconds(), + RequestID: uuid.NewString(), + HTTPStatusCode: code, + HTTPProtocol: proto, + TTFBMS: int(fb.Sub(start).Milliseconds()), + // my address + ClientAddress: "", + Range: "", + Referrer: respReq.Referer(), + UserAgent: respReq.UserAgent(), + } + }() u, err := url.Parse(fmt.Sprintf(tmpl, from, c)) if err != nil { return nil, err @@ -174,14 +201,19 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (blocks.Bloc "Accept": []string{"application/vnd.ipld.raw"}, }, }) + fb = time.Now() + code = resp.StatusCode + proto = resp.Proto if err != nil { return nil, err } + respReq = resp.Request defer resp.Body.Close() rb, err := io.ReadAll(resp.Body) if err != nil { return nil, err } + received = len(rb) if p.config.DoValidation { nc, err := c.Prefix().Sum(rb)