Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre release #65

Merged
merged 15 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 105 additions & 103 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,146 +71,148 @@ func valueToStringArray(v pcommon.Value) ([]string, error) {
}

// Inserts a profile batch into the clickhouse server using columnar native protocol
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) {
b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}
return 0, fmt.Errorf("failed to prepare batch: %w", err)
}

// this implementation is tightly coupled to how pyroscope-java and pyroscopereceiver work
timestamp_ns := make([]uint64, 0)
typ := make([]string, 0)
service_name := make([]string, 0)
values_agg := make([][]tuple, 0)
sample_types_units := make([][]tuple, 0)
period_type := make([]string, 0)
period_unit := make([]string, 0)
tags := make([][]tuple, 0)
duration_ns := make([]uint64, 0)
payload_type := make([]string, 0)
payload := make([][]byte, 0)

// this implementation is tightly coupled to how pyroscope-java and pyroscopereciver work,
// specifically receiving a single profile at a time from the agent,
// and thus each batched resource logs slice contains a single log record
rl := ls.ResourceLogs()
sz := rl.Len()

timestamp_ns := make([]uint64, sz)
typ := make([]string, sz)
service_name := make([]string, sz)
values_agg := make([][]tuple, sz)
sample_types_units := make([][]tuple, sz)
period_type := make([]string, sz)
period_unit := make([]string, sz)
tags := make([][]tuple, sz)
duration_ns := make([]uint64, sz)
payload_type := make([]string, sz)
payload := make([][]byte, sz)

var (
r plog.LogRecord
m pcommon.Map
tmp pcommon.Value
tm map[string]any
lr plog.LogRecordSlice
r plog.LogRecord
m pcommon.Map
tmp pcommon.Value
tm map[string]any
offset int
s int
idx int
)
for i := 0; i < sz; i++ {
r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0)
m = r.Attributes()
timestamp_ns[i] = uint64(r.Timestamp())

tmp, _ = m.Get(columnType)
typ[i] = tmp.AsString()

tmp, _ = m.Get(columnServiceName)
service_name[i] = tmp.AsString()

sample_types, _ := m.Get("sample_types")

sample_units, _ := m.Get("sample_units")

sample_types_array, err := valueToStringArray(sample_types)
if err != nil {
return err
}

sample_units_array, err := valueToStringArray(sample_units)
if err != nil {
return err
}

values_agg_raw, ok := m.Get(columnValuesAgg)
if ok {
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
for i := 0; i < rl.Len(); i++ {
lr = rl.At(i).ScopeLogs().At(0).LogRecords()
for s = 0; s < lr.Len(); s++ {
r = lr.At(s)
m = r.Attributes()
timestamp_ns = append(timestamp_ns, uint64(r.Timestamp()))

tmp, _ = m.Get(columnType)
typ = append(typ, tmp.AsString())

tmp, _ = m.Get(columnServiceName)
service_name = append(service_name, tmp.AsString())

sample_types, _ := m.Get("sample_types")
sample_units, _ := m.Get("sample_units")
sample_types_array, err := valueToStringArray(sample_types)
if err != nil {
return err
return 0, err
}
values_agg[i] = append(values_agg[i], values_agg_tuple...)
}

sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {

sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units[i] = sample_types_units_item
tmp, _ = m.Get(columnPeriodType)
period_type[i] = tmp.AsString()

tmp, _ = m.Get(columnPeriodUnit)
period_unit[i] = tmp.AsString()

tmp, _ = m.Get(columnTags)
tm = tmp.Map().AsRaw()
tag, j := make([]tuple, len(tm)), 0
for k, v := range tm {
tag[j] = tuple{k, v.(string)}
j++
}
tags[i] = tag

tmp, _ = m.Get(columnDurationNs)
duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64)
sample_units_array, err := valueToStringArray(sample_units)
if err != nil {
return 0, err
}
values_agg_raw, ok := m.Get(columnValuesAgg)
if ok {
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
if err != nil {
return 0, err
}
values_agg = append(values_agg, values_agg_tuple)
}
sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {
sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units = append(sample_types_units, sample_types_units_item)

tmp, _ = m.Get(columnPayloadType)
payload_type[i] = tmp.AsString()
tmp, _ = m.Get(columnPeriodType)
period_type = append(period_type, tmp.AsString())

payload[i] = r.Body().Bytes().AsRaw()
tmp, _ = m.Get(columnPeriodUnit)
period_unit = append(period_unit, tmp.AsString())

ch.logger.Debug(
fmt.Sprintf("batch insert prepared row %d", i),
zap.Uint64(columnTimestampNs, timestamp_ns[i]),
zap.String(columnType, typ[i]),
zap.String(columnServiceName, service_name[i]),
zap.String(columnPeriodType, period_type[i]),
zap.String(columnPeriodUnit, period_unit[i]),
zap.String(columnPayloadType, payload_type[i]),
)
tmp, _ = m.Get(columnTags)
tm = tmp.Map().AsRaw()
tag, j := make([]tuple, len(tm)), 0
for k, v := range tm {
tag[j] = tuple{k, v.(string)}
j++
}
tags = append(tags, tag)

tmp, _ = m.Get(columnDurationNs)
dur, _ := strconv.ParseUint(tmp.Str(), 10, 64)
duration_ns = append(duration_ns, dur)

tmp, _ = m.Get(columnPayloadType)
payload_type = append(payload_type, tmp.AsString())

payload = append(payload, r.Body().Bytes().AsRaw())

idx = offset + s
ch.logger.Debug(
fmt.Sprintf("batch insert prepared row %d", idx),
zap.Uint64(columnTimestampNs, timestamp_ns[idx]),
zap.String(columnType, typ[idx]),
zap.String(columnServiceName, service_name[idx]),
zap.String(columnPeriodType, period_type[idx]),
zap.String(columnPeriodUnit, period_unit[idx]),
zap.Any(columnSampleTypesUnits, sample_types_units[idx]),
zap.String(columnPayloadType, payload_type[idx]),
)
}
offset += s
}

// column order here should match table column order
if err := b.Column(0).Append(timestamp_ns); err != nil {
return err
return 0, err
}
if err := b.Column(1).Append(typ); err != nil {
return err
return 0, err
}
if err := b.Column(2).Append(service_name); err != nil {
return err
return 0, err
}
if err := b.Column(3).Append(sample_types_units); err != nil {
return err
return 0, err
}
if err := b.Column(4).Append(period_type); err != nil {
return err
return 0, err
}
if err := b.Column(5).Append(period_unit); err != nil {
return err
return 0, err
}
if err := b.Column(6).Append(tags); err != nil {
return err
return 0, err
}
if err := b.Column(7).Append(duration_ns); err != nil {
return err
return 0, err
}
if err := b.Column(8).Append(payload_type); err != nil {
return err
return 0, err
}

if err := b.Column(9).Append(payload); err != nil {
return err
return 0, err
}
if err := b.Column(10).Append(values_agg); err != nil {
return err
return 0, err
}
return b.Send()
return offset, b.Send()
}

// Closes the clickhouse connection pool
Expand Down
7 changes: 4 additions & 3 deletions exporter/clickhouseprofileexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type clickhouseProfileExporter struct {

type clickhouseAccess interface {
// Inserts a profile batch into the clickhouse server
InsertBatch(profiles plog.Logs) error
InsertBatch(profiles plog.Logs) (int, error)

// Shuts down the clickhouse connection
Shutdown() error
Expand Down Expand Up @@ -63,13 +63,14 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti
// Sends the profiles to clickhouse server using the configured connection
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
start := time.Now().UnixMilli()
if err := exp.ch.InsertBatch(logs); err != nil {
sz, err := exp.ch.InsertBatch(logs)
if err != nil {
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len()))
exp.logger.Info("inserted batch", zap.Int("size", sz))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
case pa.jfrParser.TypeMap.T_EXECUTION_SAMPLE:
values[0] = 1 * int64(period)
ts := pa.jfrParser.GetThreadState(pa.jfrParser.ExecutionSample.State)
if ts != nil && ts.Name == "STATE_RUNNABLE" {
if ts != nil && ts.Name != "STATE_SLEEPING" {
pa.addStacktrace(sampleTypeCpu, pa.jfrParser.ExecutionSample.StackTrace, values[:1])
}
// TODO: this code is from github/grafana/pyroscope, need to validate that the qryn.js query simulator handles this branch as expected for wall
Expand Down
103 changes: 103 additions & 0 deletions receiver/pyroscopereceiver/pprofparser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package pprofparser

import (
"bytes"
"fmt"

pprof_proto "github.com/google/pprof/profile"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

type sampleType uint8

const (
sampleTypeCpu sampleType = iota
sampleTypeCount
)

type profileWrapper struct {
pprof *pprof_proto.Profile
prof profile_types.ProfileIR
}

type pProfParser struct {
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

// Creates a pprof parser that parse the accepted jfr buffer
func NewPprofParser() *pProfParser {
return &pProfParser{}
}

func (pa *pProfParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
// Parse pprof data
pProfData, err := pprof_proto.Parse(data)
if err != nil {
return nil, err
}

// Process pprof data and create SampleType slice
var sampleTypes []string
var sampleUnits []string
var valueAggregates []profile_types.SampleType

for i, st := range pProfData.SampleType {
sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type)
sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit)
sum, count := calculateSumAndCount(pProfData, i)
valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

var profiles []profile_types.ProfileIR
var profileType string

switch pProfData.PeriodType.Type {
case "cpu":
profileType = "process_cpu"
case "wall":
profileType = "wall"
case "mutex", "contentions":
profileType = "mutex"
case "goroutine":
profileType = "goroutines"
case "objects", "space", "alloc", "inuse":
profileType = "memory"
case "block":
profileType = "block"
}

profileTypeInfo := profile_types.ProfileType{
PeriodType: pProfData.PeriodType.Type,
PeriodUnit: pProfData.PeriodType.Unit,
SampleType: sampleTypes,
SampleUnit: sampleUnits,
Type: profileType,
}

// Create a new ProfileIR instance
profile := profile_types.ProfileIR{
ValueAggregation: valueAggregates,
Type: profileTypeInfo,
}
profile.Payload = new(bytes.Buffer)
pProfData.WriteUncompressed(profile.Payload)
// Append the profile to the result
profiles = append(profiles, profile)
return profiles, nil
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))
for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}
Loading
Loading