Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region: Add BenchmarkReceive #282

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions region/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase/compression/snappy"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/test"
Expand Down Expand Up @@ -1318,3 +1319,247 @@ func BenchmarkSendScanRequest(b *testing.B) {
}
})
}

func makeResponse(callID uint32, response proto.Message, cellblocks []byte,
compressor *compressor) []byte {
b := make([]byte, 4) // Reserve 4 bytes for size
header := &pb.ResponseHeader{
CallId: proto.Uint32(callID),
}
if len(cellblocks) > 0 {
if compressor != nil {
cellblocks = compressor.compressCellblocks(
net.Buffers{cellblocks}, uint32(len(cellblocks)))
}

header.CellBlockMeta = &pb.CellBlockMeta{Length: proto.Uint32(uint32(len(cellblocks)))}
}

b = protowire.AppendVarint(b, uint64(proto.Size(header)))
var err error
b, err = proto.MarshalOptions{}.MarshalAppend(b, header)
if err != nil {
panic(err)
}

b = protowire.AppendVarint(b, uint64(proto.Size(response)))
b, err = proto.MarshalOptions{}.MarshalAppend(b, response)
if err != nil {
panic(err)
}

b = append(b, cellblocks...)

// Put the final size in the first 4 bytes
binary.BigEndian.PutUint32(b[:4], uint32(len(b)-4))
return b
}

func cellblockLen(rowLen, familyLen, qualifierLen, valueLen int) int {
keyLength := 2 + rowLen + 1 + familyLen + qualifierLen + 8 + 1
keyValueLength := 4 + 4 + keyLength + valueLen
return 4 + keyValueLength
}

// Copied from hrpc/mutate.go
func appendCellblock(row []byte, family, qualifier string, value []byte, ts uint64, typ byte,
cbs []byte) []byte {
// cellblock layout:
//
// Header:
// 4 byte length of key + value
// 4 byte length of key
// 4 byte length of value
//
// Key:
// 2 byte length of row
// <row>
// 1 byte length of row family
// <family>
// <qualifier>
// 8 byte timestamp
// 1 byte type
//
// Value:
// <value>
keylength := 2 + len(row) + 1 + len(family) + len(qualifier) + 8 + 1
valuelength := len(value)

keyvaluelength := 4 + 4 + keylength + valuelength
i := len(cbs)
cbs = append(cbs, make([]byte,
cellblockLen(len(row), len(family), len(qualifier), len(value)))...)

// Header:
binary.BigEndian.PutUint32(cbs[i:], uint32(keyvaluelength))
i += 4
binary.BigEndian.PutUint32(cbs[i:], uint32(keylength))
i += 4
binary.BigEndian.PutUint32(cbs[i:], uint32(valuelength))
i += 4

// Key:
binary.BigEndian.PutUint16(cbs[i:], uint16(len(row)))
i += 2
i += copy(cbs[i:], row)
cbs[i] = byte(len(family))
i++
i += copy(cbs[i:], family)
i += copy(cbs[i:], qualifier)
binary.BigEndian.PutUint64(cbs[i:], ts)
i += 8
cbs[i] = typ
i++

// Value:
copy(cbs[i:], value)

return cbs
}

type fakeConn struct {
net.Conn
}

func (fakeConn) SetReadDeadline(t time.Time) error { return nil }

func BenchmarkReceive(b *testing.B) {
benchmark := func(c *client, reader *bytes.Reader, resp []byte,
call hrpc.Call) func(*testing.B) {
return func(b *testing.B) {
b.ReportAllocs()
for range b.N {
// Set read buffer to the encoded response
reader.Reset(resp)
// Put the RPC in the sent map so that it can be found by
// receive
c.sent[1] = call
if err := c.receive(reader); err != nil {
b.Fatal(err)
}
// Consume the result so that this same request can be
// reused on the next iteration
<-call.ResultChan()
}
}
}

c := &client{
conn: fakeConn{},
sent: make(map[uint32]hrpc.Call),
compressor: &compressor{snappy.New()},
}
reader := bytes.NewReader(nil)

scan, err := hrpc.NewScanStr(context.Background(), "table")
if err != nil {
b.Fatal(err)
}

cell := appendCellblock(
bytes.Repeat([]byte("0123456789"), 5), // 50-byte key
"f",
string(bytes.Repeat([]byte("9876543210"), 2)), // 20-byte qualifier
bytes.Repeat([]byte("abcdefghij"), 20), // 200-byte value
17356887651735688765,
byte(pb.CellType_PUT),
nil)

scanResponse := &pb.ScanResponse{
// TODO: Should this be 1 result with cellCount cells, or
// cellCount results each with 1 cell?
CellsPerResult: []uint32{uint32(1)},
PartialFlagPerResult: []bool{false},
}
resp := makeResponse(1, scanResponse, cell, c.compressor)
b.Run("scanSmall", benchmark(c, reader, resp, scan))

const twoHunderedKiB = 200 * 1024
cellCount200KB := twoHunderedKiB / len(cell)
cells200KB := bytes.Repeat(cell, cellCount200KB)

b.Logf("200KB Cell Count: %d", cellCount200KB)

scanResponse200KB := &pb.ScanResponse{
// TODO: Should this be 1 result with cellCount cells, or
// cellCount results each with 1 cell?
CellsPerResult: []uint32{uint32(cellCount200KB)},
PartialFlagPerResult: []bool{false},
}
resp = makeResponse(1, scanResponse200KB, cells200KB, c.compressor)

b.Run("scanResult200KBCellBlocks", benchmark(c, reader, resp, scan))

const twoMiB = 2 * 1024 * 1024
cellCount2MB := twoMiB / len(cell)
cells2MB := bytes.Repeat(cell, cellCount2MB)

b.Logf("2MB Cell Count: %d", cellCount2MB)

scanResponse2MB := &pb.ScanResponse{
// TODO: Should this be 1 result with cellCount cells, or
// cellCount results each with 1 cell?
CellsPerResult: []uint32{uint32(cellCount2MB)},
PartialFlagPerResult: []bool{false},
}
resp = makeResponse(1, scanResponse2MB, cells2MB, c.compressor)

b.Run("scanResult2MBCellBlocks", benchmark(c, reader, resp, scan))

put, err := hrpc.NewPutStr(context.Background(), "table", "key",
map[string]map[string][]byte{"cf": {"a": []byte("1")}})
if err != nil {
b.Fatal(err)
}
// Simple puts have an empty response
resp = makeResponse(1, &pb.MutateResponse{}, nil, nil)

b.Run("mutate", benchmark(c, reader, resp, put))

multi := newMulti(100)
multiResp := &pb.MultiResponse{
RegionActionResult: []*pb.RegionActionResult{{}},
}
calls := make([]hrpc.Call, 0, 100)
for i := range 100 {
put, err := hrpc.NewPutStr(context.Background(), "table", "key",
map[string]map[string][]byte{"cf": {"a": []byte("1")}})
if err != nil {
b.Fatal(err)
}
calls = append(calls, put)

multiResp.RegionActionResult[0].ResultOrException = append(
multiResp.RegionActionResult[0].ResultOrException,
&pb.ResultOrException{
Index: proto.Uint32(uint32(i + 1)),
Result: &pb.Result{},
},
)
}

multi.add(calls)
resp = makeResponse(1, multiResp, nil, nil)

b.Run("multiMutate100", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
reader.Reset(resp)
if multi.len() != 100 {
b.Fatalf("unexpected len: %d", multi.len())
}
c.sent[1] = multi
if err := c.receive(reader); err != nil {
b.Fatal(err)
}
// Consume the results so that this same request can
// be reused on the next iteration
for _, c := range calls {
<-c.ResultChan()
}
// Need to do this on every iteration because
// returnResults on a multi resets the fields of multi
multi.add(calls)
}
})
}