Skip to content

Commit

Permalink
Remove proto.NewBuffer usages
Browse files Browse the repository at this point in the history
  • Loading branch information
torsm committed Aug 25, 2023
1 parent 6c57302 commit 21965de
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 36 deletions.
32 changes: 23 additions & 9 deletions fleetspeak/src/client/https/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

log "github.com/golang/glog"
oldproto "github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"

"github.com/google/fleetspeak/fleetspeak/src/client/comms"
Expand Down Expand Up @@ -241,10 +240,13 @@ func (c *StreamingCommunicator) connect(ctx context.Context, host string, maxLif
return nil, err
}
ret.processed = pm
buf := oldproto.NewBuffer(make([]byte, 0, 1024))
if err := buf.EncodeMessage(wcd); err != nil {

buf, err := proto.Marshal(wcd)
if err != nil {
return nil, err
}
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(buf)))

br, bw := io.Pipe()

Expand Down Expand Up @@ -280,7 +282,8 @@ func (c *StreamingCommunicator) connect(ctx context.Context, host string, maxLif
// executes.
go func() {
binary.Write(bw, binary.LittleEndian, magic)
bw.Write(buf.Bytes())
bw.Write(sizeBuf)
bw.Write(buf)
}()
resp, err := c.hc.Do(req)
close(ok)
Expand Down Expand Up @@ -419,7 +422,6 @@ func (c *connection) writeLoop(bw *io.PipeWriter) {
log.V(2).Infof("<-%p: writeLoop stopped", c)
}()

buf := oldproto.NewBuffer(make([]byte, 0, 1024))
cnt := 1
var lastRate float64 // speed of last large-ish write, in bytes/sec
for {
Expand Down Expand Up @@ -454,25 +456,37 @@ func (c *connection) writeLoop(bw *io.PipeWriter) {
log.Errorf("Error creating streaming contact data: %v", err)
return
}
if err := buf.EncodeMessage(wcd); err != nil {

buf, err := proto.Marshal(wcd)
if err != nil {
log.Errorf("Error encoding streaming contact data: %v", err)
return
}
log.V(2).Infof("<-Starting write of %d bytes", len(buf.Bytes()))
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(buf)))

log.V(2).Infof("<-Starting write of %d bytes", len(buf))
start := time.Now()
s, err := bw.Write(buf.Bytes())
sizeWritten, err := bw.Write(sizeBuf)
if err != nil {
if c.ctx.Err() == nil {
log.Errorf("Error writing streaming contact data: %v", err)
}
return
}
bufWritten, err := bw.Write(buf)
if err != nil {
if c.ctx.Err() == nil {
log.Errorf("Error writing streaming contact data: %v", err)
}
return
}
s := sizeWritten + bufWritten
delta := time.Since(start)
log.V(2).Infof("<-Wrote streaming ContactData of %d messages, and %d bytes in %v", len(fsmsgs), s, delta)
if s > minSendBytesThreshold {
lastRate = float64(s) / (float64(delta) / float64(time.Second))
}
buf.Reset()
}
}

Expand Down
34 changes: 25 additions & 9 deletions fleetspeak/src/client/https/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ import (
"time"

log "github.com/golang/glog"
"google.golang.org/protobuf/proto"
oldproto "github.com/golang/protobuf/proto"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"

"github.com/google/fleetspeak/fleetspeak/src/client"
"github.com/google/fleetspeak/fleetspeak/src/client/config"
"github.com/google/fleetspeak/fleetspeak/src/client/service"
"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/comtesting"

anypb "google.golang.org/protobuf/types/known/anypb"
clpb "github.com/google/fleetspeak/fleetspeak/src/client/proto/fleetspeak_client"
fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
anypb "google.golang.org/protobuf/types/known/anypb"
)

func randBytes(n int) []byte {
Expand Down Expand Up @@ -148,17 +147,25 @@ func (s *streamingTestServer) Start() {
return
}
s.received <- &rcd
cd := fspb.ContactData{
cd := &fspb.ContactData{
AckIndex: cnt,
}
cnt++
out := oldproto.NewBuffer(make([]byte, 0, 1024))
if err := out.EncodeMessage(&cd); err != nil {

outBuf, err := proto.Marshal(cd)
if err != nil {
s.t.Errorf("Unable to encode response: %v", err)
return
}
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(outBuf)))

writeLock.Lock()
if _, err := res.Write(out.Bytes()); err != nil {
if _, err := res.Write(sizeBuf); err != nil {
s.t.Errorf("Unable to write response: %v", err)
return
}
if _, err := res.Write(outBuf); err != nil {
s.t.Errorf("Unable to write response: %v", err)
return
}
Expand All @@ -171,12 +178,21 @@ func (s *streamingTestServer) Start() {
for _, m := range cd.Messages {
m.Destination.ClientId = cid.Bytes()
}
if err := out.EncodeMessage(cd); err != nil {

outBuf, err := proto.Marshal(cd)
if err != nil {
s.t.Errorf("Unable to encode response: %v", err)
return
}
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(outBuf)))

writeLock.Lock()
if _, err := res.Write(out.Bytes()); err != nil {
if _, err := res.Write(sizeBuf); err != nil {
s.t.Errorf("Unable to write response: %v", err)
return
}
if _, err := res.Write(outBuf); err != nil {
s.t.Errorf("Unable to write response: %v", err)
return
}
Expand Down
17 changes: 10 additions & 7 deletions fleetspeak/src/server/https/https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"time"

log "github.com/golang/glog"
oldproto "github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"

"github.com/google/fleetspeak/fleetspeak/src/common"
Expand Down Expand Up @@ -200,22 +199,26 @@ func TestFile(t *testing.T) {
}

func makeWrapped() []byte {
cd := fspb.ContactData{
cd := &fspb.ContactData{
ClientClock: db.NowProto(),
}
b, err := proto.Marshal(&cd)
b, err := proto.Marshal(cd)
if err != nil {
log.Fatal(err)
}
wcd := fspb.WrappedContactData{
wcd := &fspb.WrappedContactData{
ContactData: b,
ClientLabels: []string{"linux", "test"},
}
buf := oldproto.NewBuffer(make([]byte, 0, 1024))
if err := buf.EncodeMessage(&wcd); err != nil {

buf, err := proto.Marshal(wcd)
if err != nil {
log.Fatal(err)
}
return buf.Bytes()
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(buf)))

return append(sizeBuf, buf...)
}

func readContact(body *bufio.Reader) (*fspb.ContactData, error) {
Expand Down
32 changes: 22 additions & 10 deletions fleetspeak/src/server/https/streaming_message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"time"

log "github.com/golang/glog"
oldproto "github.com/golang/protobuf/proto"
"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/server/comms"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
Expand Down Expand Up @@ -242,14 +241,21 @@ func (s streamingMessageServer) initialPoll(ctx context.Context, addr net.Addr,
}
pi.CacheHit = info.Client.Cached

out := oldproto.NewBuffer(make([]byte, 0, 1024))
// EncodeMessage prepends the size as a Varint.
if err := out.EncodeMessage(toSend); err != nil {
outBuf, err := proto.Marshal(toSend)
if err != nil {
info.Fin()
return nil, false, makeError(fmt.Sprintf("error preparing messages: %v", err), http.StatusInternalServerError)
}
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(outBuf)))

st = time.Now()
sz, err := res.Write(out.Bytes())
sizeWritten, err := res.Write(sizeBuf)
if err != nil {
info.Fin()
return nil, false, makeError(fmt.Sprintf("error writing body: %v", err), http.StatusInternalServerError)
}
bufWritten, err := res.Write(outBuf)
if err != nil {
info.Fin()
return nil, false, makeError(fmt.Sprintf("error writing body: %v", err), http.StatusInternalServerError)
Expand All @@ -258,7 +264,7 @@ func (s streamingMessageServer) initialPoll(ctx context.Context, addr net.Addr,

pi.WriteTime = time.Since(st)
pi.End = time.Now()
pi.WriteBytes = sz
pi.WriteBytes = sizeWritten + bufWritten
pi.Status = http.StatusOK
return info, more, nil
}
Expand Down Expand Up @@ -573,19 +579,25 @@ func (m *streamManager) writeOne(cd *fspb.ContactData) (stats.PollInfo, error) {
pi.End = db.Now()
}()

buf := oldproto.NewBuffer(make([]byte, 0, 1024))
if err := buf.EncodeMessage(cd); err != nil {
buf, err := proto.Marshal(cd)
if err != nil {
return pi, err
}
sizeBuf := make([]byte, 0, 16)
sizeBuf = binary.AppendUvarint(sizeBuf, uint64(len(buf)))

sw := time.Now()
s, err := m.res.Write(buf.Bytes())
sizeWritten, err := m.res.Write(sizeBuf)
if err != nil {
return pi, err
}
bufWritten, err := m.res.Write(buf)
if err != nil {
return pi, err
}
m.res.Flush()
pi.WriteTime = time.Since(sw)
pi.WriteBytes = s
pi.WriteBytes = sizeWritten + bufWritten
pi.Status = http.StatusOK

return pi, nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/Microsoft/go-winio v0.4.12
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.5.2
github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95
github.com/mattn/go-sqlite3 v1.11.0
github.com/pires/go-proxyproto v0.6.2
Expand All @@ -23,6 +22,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.10.0 // indirect
Expand Down

0 comments on commit 21965de

Please sign in to comment.