Skip to content

Commit

Permalink
Merge pull request #331 from yarpc/awilhelm/release
Browse files Browse the repository at this point in the history
Release v0.19.0
  • Loading branch information
Alexandre Wilhelm authored Mar 25, 2021
2 parents 9d1b717 + 2dcde4c commit 204b64a
Show file tree
Hide file tree
Showing 51 changed files with 3,435 additions and 218 deletions.
2 changes: 2 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ coverage:
if_not_found: success # if parent is not found report status as success, error, or failure
if_ci_failed: error # if ci fails report status as success, error, or failure

ignore:
- /testdata/**/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

/cover
/cover.out
/.idea
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

# 0.19.0 (2021-03-24)
* Add support for gRPC streaming:
- Support of benchmark and curl-like fashion mode
- Multiple requests can be passed through the CLI option `--request`, optionally delimited by space or comma
`--request='{"request": "1"} {"request": "2"}'` or `--request='{"request": "1"},{"request": "2"}'`
- Requests can be interactively passed from STDIN by setting CLI option `--request='-'`.
- Interval between each consecutive stream requests can be ensured by setting CLI option `--stream-interval='5s'`, this ensures there is at least an interval of 5 seconds between requests.
* Add option `grpc-max-response-size` to set the maximum response size of gRPC response. Default to 4mb.

# 0.18.0 (2020-06-26)
* Add support for benchmark output in JSON format.
* Fix panic due to upcasting serializer to disable envelopes.
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ install:

.PHONY: test
test:
go test -cover -race ./...
go test -cover -timeout 30s -race ./...


.PHONY: docs
Expand All @@ -28,5 +28,6 @@ docs:

.PHONY: test_ci
test_ci: install build
go test -coverprofile=cover.out -coverpkg=./... ./...
go test -timeout 30s -race -coverprofile=cover_temp.out -coverpkg=./... ./...
grep -v "^github.com/yarpc/yab/testdata/*" cover_temp.out > cover.out
go tool cover -html=cover.out -o cover.html
92 changes: 90 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Request Options:
--multiplexed-thrift Enables the Thrift TMultiplexedProtocol used
by services that host multiple Thrift services
on a single endpoint.
--stream-interval= Interval between consecutive stream message sends, applicable separately to every stream request opened on a connection.
Transport Options:
-s, --service= The TChannel/Hyperbahn service name
Expand All @@ -94,6 +95,7 @@ Transport Options:
-T, --topt= Transport options for TChannel, protocol
headers for HTTP
--http-method= The HTTP method to use (default: POST)
--grpc-max-response-size= Maximum response size for gRPC requests. Default value is 4MB
Benchmark Options:
-n, --max-requests= The maximum number of requests to make. 0
Expand All @@ -119,7 +121,7 @@ Help Options:
```

### Making a single request

#### Thrift
The following examples assume that the Thrift service running looks like:
```thrift
service KeyValue {
Expand All @@ -134,17 +136,77 @@ make a call to the `get` method by running:
yab -t ~/keyvalue.thrift -p localhost:12345 keyvalue KeyValue::get -r '{"key": "hello"}'
```

This specifies a single `host:port` using `-p`, but you can also specify multiple peers
#### Proto
The following examples assume that the Proto service running looks like:
```proto
service KeyValue {
rpc GetValue(Request) returns (Response) {}
rpc GetValueStream(stream Request) returns (stream Response) {}
}
message Request {
required string key = 1;
}
message Response {
required string value = 1;
}
```

If a gRPC service was running with name `KeyValue` on `localhost:12345` with proto package name `pkg.keyvalue` and
binary file containing a compiled protobuf FileDescriptorSet as `keyValue.proto.bin`, you can
make a call to the `GetValue` method by running:

```bash
yab keyvalue pkg.keyvalue/GetValue --file-descriptor-set-bin=keyValue.proto.bin -r '{"key": "hello"}' -p localhost:12345
```

You can make a call to the bi-directional stream method `GetValueStream` with multiple requests by running:
```bash
yab keyvalue pkg.keyvalue/GetValueStream --file-descriptor-set-bin=keyValue.proto.bin -r '{"key": "hello1"} {"key": "hello2"}' -p localhost:12345
```

You can also interactively pass request data (JSON or YAML) on STDIN to the bi-directional stream method `GetValueStream` by setting option `-request='-'`:

```bash
yab keyvalue pkg.keyvalue/GetValueStream --file-descriptor-set-bin=keyValue.proto.bin -r '-' -p localhost:12345

{"key": "hello1"} // STDIN request-1
{...} //Response-1

{"key": "hello2"} // STDIN request-2
{...} //Response-2
```
Note: YAML requests on STDIN must be delimited by `---` and followed by a newline.

Protobuf FileDescriptorSet can be generated by running:
```bash
protoc --include_imports --descriptor_set_out=keyValue.proto.bin keyValue.proto
```
Note : If [Server Reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) is enabled which provides information about publicly-accessible gRPC services on a server, then there is no need to specify the FileDescriptorSet binary:

```bash
yab keyvalue pkg.keyvalue/GetValue -r '{"key": "hello"} -p localhost:12345'
```

#### Specifying Peers
A single `host:port` is specified using `-p`, but you can also specify multiple peers
by passing the `-p` flag multiple times:
```bash
yab -t ~/keyvalue.thrift -p localhost:12345 -p localhost:12346 keyvalue KeyValue::get -r '{"key": "hello"}'
```
```bash
yab keyvalue pkg.keyvalue/GetValue -r '{"key": "hello"} -p localhost:12345 -p localhost:12346'
```

If you have a file containing a list of host:ports (either JSON or new line separated), you can
specify the file using `-P`:
```bash
yab -t ~/keyvalue.thrift -P ~/hosts.json keyvalue KeyValue::get -r '{"key": "hello"}'
```
```bash
yab keyvalue pkg.keyvalue/GetValue -r '{"key": "hello"} -P ~/hosts.json'
```

`yab` also supports HTTP, instead of the peer being a single `host:port`, you would use a URL:
```bash
Expand All @@ -170,6 +232,32 @@ connection (`--concurrency`).
yab -t ~/keyvalue.thrift -p localhost:12345 keyvalue KeyValue::get -r '{"key": "hello"}' -d 5s --rps 100 --connections 4
```

In a gRPC stream method benchmark, a stream benchmark request is considered successful when a stream sends all the requests and receives response messages successfully. Example stream benchmark command and output:
```bash
> yab keyvalue pkg.keyvalue/GetValueStream -r '{"key": "hello1"} {"key": "hello2"}' -p localhost:12345 --duration=1s

Benchmark parameters:
CPUs: 12
Connections: 24
Concurrency: 1
Max requests: 10000
Max duration: 1s
Max RPS: 0
Latencies:
0.5000: 779.01µs
0.9000: 2.034852ms
0.9500: 2.932846ms
0.9900: 11.698821ms
0.9990: 15.839751ms
0.9995: 16.651223ms
1.0000: 17.198644ms
Elapsed time (seconds): 0.49
Total requests: 10000
RPS: 20190.25
Total stream messages sent: 20000
Total stream messages received: 20000
```

[releases]: https://github.com/yarpc/yab/releases
[ci-img]: https://travis-ci.com/yarpc/yab.svg?branch=master
[ci]: https://travis-ci.com/yarpc/yab
Expand Down
65 changes: 41 additions & 24 deletions bench_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,53 @@ type peerTransport struct {
peerID int
}

type benchmarkMethod struct {
serializer encoding.Serializer
req *transport.Request
// benchmarkCaller exposes method to dispatch requests for benchmark.
type benchmarkCaller interface {
// Call dispatches a request using the provided transport.
Call(transport.Transport) (benchmarkCallReporter, error)

// CallMethodType returns the type of the RPC method invoked by `Call`.
CallMethodType() encoding.MethodType
}

// benchmarkCallReporter exposes method to access benchmark call report like latency.
type benchmarkCallReporter interface {
// Latency returns the time taken to send request and receive response.
Latency() time.Duration
}

// benchmarkStreamCallReporter exposes method to access benchmark stream call report
// like stream messages send and received.
type benchmarkStreamCallReporter interface {
// StreamMessagesSent returns number of stream messages sent from the client.
StreamMessagesSent() int

// StreamMessagesReceived returns number of stream messages received from the server.
StreamMessagesReceived() int
}

type benchmarkCallLatencyReport struct {
latency time.Duration
}

func newBenchmarkCallLatencyReport(latency time.Duration) benchmarkCallLatencyReport {
return benchmarkCallLatencyReport{latency}
}

func (r benchmarkCallLatencyReport) Latency() time.Duration {
return r.latency
}

// WarmTransport warms up a transport and returns it. The transport is warmed
// warmTransport warms up a transport and returns it. The transport is warmed
// up by making some number of requests through it.
func (m benchmarkMethod) WarmTransport(opts TransportOptions, resolved resolvedProtocolEncoding, warmupRequests int) (transport.Transport, error) {
func warmTransport(b benchmarkCaller, opts TransportOptions, resolved resolvedProtocolEncoding, warmupRequests int) (transport.Transport, error) {
transport, err := getTransport(opts, resolved, opentracing.NoopTracer{})
if err != nil {
return nil, err
}

for i := 0; i < warmupRequests; i++ {
_, err := makeRequest(transport, m.req)
_, err := b.Call(transport)
if err != nil {
return nil, err
}
Expand All @@ -59,21 +91,6 @@ func (m benchmarkMethod) WarmTransport(opts TransportOptions, resolved resolvedP
return transport, nil
}

func (m benchmarkMethod) call(t transport.Transport) (time.Duration, error) {
start := time.Now()
res, err := makeRequest(t, m.req)
duration := time.Since(start)

if err == nil {
err = m.serializer.CheckSuccess(res)
}
return duration, err
}

func (m benchmarkMethod) Method() string {
return m.req.Method
}

func peerBalancer(peers []string) func(i int) (string, int) {
numPeers := len(peers)
startOffset := rand.Intn(numPeers)
Expand All @@ -83,9 +100,9 @@ func peerBalancer(peers []string) func(i int) (string, int) {
}
}

// WarmTransports returns n transports that have been warmed up.
// warmTransports returns n transports that have been warmed up.
// No requests may fail during the warmup period.
func (m benchmarkMethod) WarmTransports(n int, tOpts TransportOptions, resolved resolvedProtocolEncoding, warmupRequests int) ([]peerTransport, error) {
func warmTransports(b benchmarkCaller, n int, tOpts TransportOptions, resolved resolvedProtocolEncoding, warmupRequests int) ([]peerTransport, error) {
peerFor := peerBalancer(tOpts.Peers)
transports := make([]peerTransport, n)
errs := make([]error, n)
Expand All @@ -99,7 +116,7 @@ func (m benchmarkMethod) WarmTransports(n int, tOpts TransportOptions, resolved
peerHostPort, peerIndex := peerFor(i)
tOpts.Peers = []string{peerHostPort}

tp, err := m.WarmTransport(tOpts, resolved, warmupRequests)
tp, err := warmTransport(b, tOpts, resolved, warmupRequests)
transports[i] = peerTransport{tp, peerIndex}
errs[i] = err
}(i, tOpts)
Expand Down
10 changes: 10 additions & 0 deletions bench_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type benchmarkState struct {
totalSuccess int
totalRequests int
latencies []time.Duration

totalStreamMessagesSent int
totalStreamMessagesReceived int
}

func newBenchmarkState(statter statsd.Client) *benchmarkState {
Expand Down Expand Up @@ -69,6 +72,8 @@ func (s *benchmarkState) merge(other *benchmarkState) {
s.totalErrors += other.totalErrors
s.totalSuccess += other.totalSuccess
s.totalRequests += other.totalRequests
s.totalStreamMessagesReceived += other.totalStreamMessagesReceived
s.totalStreamMessagesSent += other.totalStreamMessagesSent
}

func (s *benchmarkState) recordLatency(d time.Duration) {
Expand All @@ -79,6 +84,11 @@ func (s *benchmarkState) recordLatency(d time.Duration) {
s.statter.Timing("latency", d)
}

func (s *benchmarkState) recordStreamMessages(sent, received int) {
s.totalStreamMessagesSent += sent
s.totalStreamMessagesReceived += received
}

// Returns a mapping of quantiles to latency values
func (s *benchmarkState) getLatencies() map[float64]time.Duration {
sort.Sort(byDuration(s.latencies))
Expand Down
Loading

0 comments on commit 204b64a

Please sign in to comment.