Skip to content

Commit

Permalink
Merge pull request #60 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Feb 8, 2024
2 parents c1c0373 + ecaa579 commit 145e994
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 14 deletions.
4 changes: 2 additions & 2 deletions bench/pyroscope_pipeline_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type request struct {
}

// Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse).
// Adjust collectorAddr to bench a your target if needed.
// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6
// Adjust collectorAddr to bench your target if needed.
// Example: go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/bench -benchtime 10s -count 6 -cpu 1
func BenchmarkPyroscopePipeline(b *testing.B) {
dist := []request{
{
Expand Down
4 changes: 3 additions & 1 deletion config/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ exporters:
max_interval: 30s
max_elapsed_time: 300s
clickhouseprofileexporter:
dsn: tcp://0.0.0.0:9000/qryn
dsn: clickhouse://0.0.0.0:9000/qryn
timeout: 10s
sending_queue:
queue_size: 100
Expand All @@ -81,6 +81,8 @@ exporters:
extensions:
health_check:
pprof:
mutex_profile_fraction: 100
block_profile_fraction: 100
zpages:
memory_ballast:
size_mib: 1000
Expand Down
12 changes: 11 additions & 1 deletion exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,20 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
tmp, _ = m.Get(columnDurationNs)
duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64)

tmp, _ = m.Get(columnPeriodType)
tmp, _ = m.Get(columnPayloadType)
payload_type[i] = tmp.AsString()

payload[i] = r.Body().Bytes().AsRaw()

ch.logger.Debug(
fmt.Sprintf("batch insert prepared row %d", i),
zap.Uint64(columnTimestampNs, timestamp_ns[i]),
zap.String(columnType, typ[i]),
zap.String(columnServiceName, service_name[i]),
zap.String(columnPeriodType, period_type[i]),
zap.String(columnPeriodUnit, period_unit[i]),
zap.String(columnPayloadType, payload_type[i]),
)
}

// column order here should match table column order
Expand Down
91 changes: 91 additions & 0 deletions loadtest/pyroscope_pipeline_k6_load_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* A simple k6 load test for pyroscope pipeline.
*
* Usage:
* - Average load test:
* k6 run pyroscope_pipeline_k6_load_test.js
* - Breakpoint test:
* k6 run pyroscope_pipeline_k6_load_test.js -e WORKLOAD=breakpoint
* - Smoke test:
* k6 run pyroscope_pipeline_k6_load_test.js -e WORKLOAD=smoke
*/
import http from 'k6/http';
import { check } from 'k6';

var averageWorkload = {
// example workload: 30k pods with 5m upload interval, assuming uniform random distribution,
// then average rps is 30,000/5/60=100
load_avg: {
executor: "ramping-vus",
stages: [
{ duration: "10s", target: 100 },
{ duration: "50s", target: 100 },
{ duration: "5s", target: 0 },
],
},
};

var breakpointWorkload = {
load_avg: {
executor: "ramping-vus",
stages: [
{ duration: "10s", target: 20 },
{ duration: "30s", target: 20 },
{ duration: "30s", target: 100 },
{ duration: "30s", target: 120 },
{ duration: "30s", target: 150 },
{ duration: "30s", target: 180 },
{ duration: "30s", target: 200 },
],
},
};

export const options = 'smoke' === __ENV.WORKLOAD ? null : {
thresholds: {
http_req_failed: [{threshold: "rate<0.000000001", abortOnFail: true }],
http_req_duration: ["med<100"],
http_req_duration: ["p(95)<300"],
http_req_duration: ["p(99)<1000"],
http_req_duration: ["max<3000"],
},
scenarios: 'breakpoint' === __ENV.WORKLOAD ? breakpointWorkload : averageWorkload,
};

const dist = [
{
urlParams: {
"name": "com.example.Test{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
"sampleRate": "100",
},
jfrgz: open("../receiver/pyroscopereceiver/testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz"/*unix-only*/, "b"),
},
{
urlParams: {
"name": "com.example.Test{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
},
jfrgz: open("../receiver/pyroscopereceiver/testdata/memory_alloc_live_example.jfr.gz"/*unix-only*/, "b"),
},
]
const collectorAddr = "http://0.0.0.0:8062"

let j = 0

export default function () {
const data = {
jfr: http.file(dist[j].jfrgz, "jfr"),
};
const params = dist[j].urlParams
const qs = Object.keys(params).map(k => `${k}=${params[k]}`).join("&")
const res = http.post(`${collectorAddr}/ingest?${encodeURI(qs)}`, data);

check(res, {
"response code was 204": (res) => res.status == 204,
});
j = (j + 1) % dist.length;
}
3 changes: 2 additions & 1 deletion receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ func (pa *jfrPprofParser) appendLocation(sampleType sampleType, prof *pprof_prot

// append new location with a single line referencing the new function, ignoring inlining without a line number
newl := &pprof_proto.Location{
ID: uint64(len(prof.Location)) + 1, // starts with 1 not 0
ID: uint64(len(prof.Location)) + 1, // starts with 1 not 0
// TODO: parse line numbers like https://github.com/grafana/jfr-parser/pull/27/files
Line: []pprof_proto.Line{{Function: newf}},
}

Expand Down
30 changes: 21 additions & 9 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,9 @@ func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *at
func acquireBuf(p *sync.Pool) *bytes.Buffer {
v := p.Get()
if v == nil {
v = new(bytes.Buffer)
return new(bytes.Buffer)
}
buf := v.(*bytes.Buffer)
return buf
return v.(*bytes.Buffer)
}

func releaseBuf(p *sync.Pool, buf *bytes.Buffer) {
Expand All @@ -240,6 +239,8 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
)
logs := plog.NewLogs()

recv.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery))

qs := req.URL.Query()
if tmp, ok = qs["format"]; ok && tmp[0] == "jfr" {
pa = jfrparser.NewJfrPprofParser()
Expand Down Expand Up @@ -284,9 +285,10 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque

sz := 0
rs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
for _, pr := range ps {
for i, pr := range ps {
r := rs.AppendEmpty()
r.SetTimestamp(pcommon.Timestamp(ns(pm.start)))
timestampNs := ns(pm.start)
r.SetTimestamp(pcommon.Timestamp(timestampNs))
m := r.Attributes()
m.PutStr("duration_ns", fmt.Sprint(ns(pm.end-pm.start)))
m.PutStr("service_name", pm.name)
Expand All @@ -300,6 +302,17 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
}
r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes())
sz += pr.Payload.Len()
recv.logger.Debug(
fmt.Sprintf("parsed profile %d", i),
zap.Uint64("timestamp_ns", timestampNs),
zap.String("type", pr.Type.Type),
zap.String("service_name", pm.name),
zap.String("period_type", pr.Type.PeriodType),
zap.String("period_unit", pr.Type.PeriodUnit),
zap.String("sample_types", strings.Join(pr.Type.SampleType, ",")),
zap.String("sample_units", strings.Join(pr.Type.SampleUnit, ",")),
zap.Uint8("payload_type", uint8(pr.PayloadType)),
)
}
// sz may be 0 and it will be recorded
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, "")))
Expand Down Expand Up @@ -367,8 +380,8 @@ func entitiesToStrings(entities []profile_types.SampleType) []any {

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
m.PutStr("type", prof.Type.Type)
s := m.PutEmptySlice("sample_types")

s := m.PutEmptySlice("sample_types")
err := s.FromRaw(stringToAnyArray(prof.Type.SampleType))
if err != nil {
return err
Expand All @@ -379,19 +392,18 @@ func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
if err != nil {
return err
}

// Correct type assertion for []profile.SampleType
result := prof.ValueAggregation.([]profile_types.SampleType)
s = m.PutEmptySlice("values_agg")

err = s.FromRaw(entitiesToStrings(result))

if err != nil {
return err
}

m.PutStr("period_type", prof.Type.PeriodType)
m.PutStr("period_unit", prof.Type.PeriodUnit)
m.PutStr("payload_type", fmt.Sprint(prof.PayloadType))

return nil
}

Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 145e994

Please sign in to comment.