Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
add logging of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed Feb 2, 2023
1 parent e177cca commit ec33a82
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 11 deletions.
22 changes: 16 additions & 6 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,35 @@ import (
type Config struct {
OrchestratorEndpoint url.URL
OrchestratorClient *http.Client
Client *http.Client
DoValidation bool
AffinityKey string
PoolRefresh time.Duration
PoolMaxSize int
MaxConcurrency int

LoggingEndpoint url.URL
LoggingClient *http.Client
LoggingInterval time.Duration

Client *http.Client

DoValidation bool
AffinityKey string
PoolRefresh time.Duration
PoolMaxSize int
MaxConcurrency int
}

var ErrNotImplemented error = errors.New("not implemented")

type Caboose struct {
config *Config
pool *pool
logger *logger
}

func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {
c := Caboose{
config: config,
pool: newPool(config),
logger: newLogger(config),
}
c.pool.logger = c.logger
if c.config.Client == nil {
c.config.Client = http.DefaultClient
}
Expand All @@ -43,6 +52,7 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {

func (c *Caboose) Close() {
c.pool.Close()
c.logger.Close()
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
Expand Down
12 changes: 9 additions & 3 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main1() int {
out := args.Get(1)

oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby")
le, _ := url.Parse("https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/")
saturnClient := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Expand All @@ -50,9 +51,14 @@ func main1() int {
cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorEndpoint: *oe,
OrchestratorClient: http.DefaultClient,
DoValidation: true,
PoolRefresh: 5 * time.Minute,
Client: &saturnClient,

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
Client: &saturnClient,
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/buraksezer/consistent v0.10.0
github.com/google/uuid v1.3.0
github.com/ipfs/go-block-format v0.1.1
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-ipfs-blockstore v1.2.0
Expand All @@ -18,7 +19,6 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
Expand Down
82 changes: 82 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package caboose

import (
"bytes"
"encoding/json"
"net/http"
"net/url"
"time"
)

type logger struct {
queue chan log
freq time.Duration
client *http.Client
endpoint url.URL
done chan struct{}
}

func newLogger(c *Config) *logger {
l := logger{
queue: make(chan log, 5),
freq: c.LoggingInterval,
client: c.LoggingClient,
endpoint: c.LoggingEndpoint,
done: make(chan struct{}),
}
go l.background()
return &l
}

func (l *logger) background() {
t := time.NewTimer(l.freq)
pending := make([]log, 0, 100)
for {
select {
case l := <-l.queue:
pending = append(pending, l)
case <-t.C:
if len(pending) > 0 {
//submit.
toSubmit := make([]log, len(pending))
copy(toSubmit, pending)
pending = pending[:0]
go l.submit(toSubmit)
}
t.Reset(l.freq)
case <-l.done:
return
}
}
}

func (l *logger) submit(logs []log) {
finalLogs := bytes.NewBuffer(nil)
enc := json.NewEncoder(finalLogs)
enc.Encode(logBatch{logs})
l.client.Post(l.endpoint.String(), "application/json", finalLogs)
}

func (l *logger) Close() {
close(l.done)
}

type logBatch struct {
Logs []log `json:"bandwidthLogs"`
}

type log struct {
CacheHit bool `json:"cacheHit"`
URL string `json:"url"`
LocalTime time.Time `json:"localTime"`
NumBytesSent int `json:"numBytesSent"`
RequestDuration float64 `json:"requestDuration"` // in seconds
RequestID string `json:"requestId"`
HTTPStatusCode int `json:"httpStatusCode"`
HTTPProtocol string `json:"httpProtocol"`
TTFBMS int `json:"ttfbMs"`
ClientAddress string `json:"clientAddress"`
Range string `json:"range"`
Referrer string `json:"referrer"`
UserAgent string `json:"userAgent"`
}
34 changes: 33 additions & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/buraksezer/consistent"
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
Expand All @@ -34,6 +35,7 @@ func (p *pool) loadPool() ([]string, error) {
type pool struct {
config *Config
endpoints []Member
logger *logger
c *consistent.Consistent
lk sync.RWMutex
started chan struct{}
Expand Down Expand Up @@ -162,7 +164,32 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blocks.Bl

var tmpl = "http://%s/ipfs/%s?format=raw"

func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (blocks.Block, error) {
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (b blocks.Block, e error) {
start := time.Now()
fb := time.Now()
code := 0
proto := "unknown"
respReq := &http.Request{}
received := 0
defer func() {
p.logger.queue <- log{
CacheHit: false,
URL: "",
LocalTime: start,
// TODO: does this include header sizes?
NumBytesSent: received,
RequestDuration: time.Now().Sub(start).Seconds(),
RequestID: uuid.NewString(),
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(fb.Sub(start).Milliseconds()),
// my address
ClientAddress: "",
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
}
}()
u, err := url.Parse(fmt.Sprintf(tmpl, from, c))
if err != nil {
return nil, err
Expand All @@ -174,14 +201,19 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid) (blocks.Bloc
"Accept": []string{"application/vnd.ipld.raw"},
},
})
fb = time.Now()
code = resp.StatusCode
proto = resp.Proto
if err != nil {
return nil, err
}
respReq = resp.Request
defer resp.Body.Close()
rb, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
received = len(rb)

if p.config.DoValidation {
nc, err := c.Prefix().Sum(rb)
Expand Down

0 comments on commit ec33a82

Please sign in to comment.