From a0e34565f36b35043909e126b891b10b93ff3ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Wed, 11 Dec 2024 16:12:14 -0500 Subject: [PATCH] Add support for returning measurement_uid as part of oonimkall API (#1673) Implements: https://github.com/ooni/probe/issues/2521 --- cmd/ooniprobe/internal/nettests/nettests.go | 2 +- internal/cmd/oonireport/oonireport.go | 2 +- internal/engine/experiment.go | 4 +- .../engine/experiment_integration_test.go | 6 +-- internal/mocks/experiment.go | 4 +- internal/mocks/experiment_test.go | 6 +-- internal/mocks/submitter.go | 4 +- internal/mocks/submitter_test.go | 6 +-- internal/model/experiment.go | 4 +- internal/oonirun/experiment.go | 7 ++-- internal/oonirun/experiment_test.go | 4 +- internal/oonirun/inputprocessor.go | 6 +-- internal/oonirun/inputprocessor_test.go | 4 +- internal/oonirun/submitter.go | 6 +-- internal/oonirun/submitter_test.go | 9 ++-- internal/oonirun/v2_test.go | 2 +- internal/probeservices/collector.go | 14 +++---- internal/probeservices/collector_test.go | 42 ++++++++++++------- pkg/oonimkall/session.go | 7 +++- pkg/oonimkall/taskmodel.go | 1 + pkg/oonimkall/taskrunner.go | 3 +- pkg/oonimkall/taskrunner_test.go | 8 ++-- 22 files changed, 85 insertions(+), 66 deletions(-) diff --git a/cmd/ooniprobe/internal/nettests/nettests.go b/cmd/ooniprobe/internal/nettests/nettests.go index e3cf272510..847b905791 100644 --- a/cmd/ooniprobe/internal/nettests/nettests.go +++ b/cmd/ooniprobe/internal/nettests/nettests.go @@ -215,7 +215,7 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []model.Experim // Implementation note: SubmitMeasurement will fail here if we did fail // to open the report but we still want to continue. There will be a // bit of a spew in the logs, perhaps, but stopping seems less efficient. - if err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil { + if _, err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil { log.Debug(color.RedString("failure.measurement_submission")) if err := db.UploadFailed(c.msmts[idx64], err.Error()); err != nil { return errors.Wrap(err, "failed to mark upload as failed") diff --git a/internal/cmd/oonireport/oonireport.go b/internal/cmd/oonireport/oonireport.go index 8aa4ae3e72..d17953fc99 100644 --- a/internal/cmd/oonireport/oonireport.go +++ b/internal/cmd/oonireport/oonireport.go @@ -106,7 +106,7 @@ func submitAll(ctx context.Context, lines []string, subm model.Submitter) (int, for _, line := range lines { mm := toMeasurement(line) // submit the measurement - err := subm.Submit(ctx, mm) + _, err := subm.Submit(ctx, mm) if err != nil { return submitted, err } diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index 42a2373579..32b747637d 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -104,10 +104,10 @@ func (e *experiment) ReportID() string { // SubmitAndUpdateMeasurementContext implements [model.Experiment]. func (e *experiment) SubmitAndUpdateMeasurementContext( - ctx context.Context, measurement *model.Measurement) error { + ctx context.Context, measurement *model.Measurement) (string, error) { report := e.mrep.Get() if report == nil { - return errors.New("report is not open") + return "", errors.New("report is not open") } return report.SubmitMeasurement(ctx, measurement) } diff --git a/internal/engine/experiment_integration_test.go b/internal/engine/experiment_integration_test.go index 26a37509b4..a95a74f584 100644 --- a/internal/engine/experiment_integration_test.go +++ b/internal/engine/experiment_integration_test.go @@ -280,7 +280,7 @@ func runexperimentflow(t *testing.T, experiment model.Experiment, input string) } filename := tempfile.Name() tempfile.Close() - err = experiment.SubmitAndUpdateMeasurementContext(ctx, measurement) + _, err = experiment.SubmitAndUpdateMeasurementContext(ctx, measurement) if err != nil { t.Fatal(err) } @@ -323,7 +323,7 @@ func TestOpenReportIdempotent(t *testing.T) { t.Fatal("unexpected initial report ID") } ctx := context.Background() - if err := exp.SubmitAndUpdateMeasurementContext(ctx, &model.Measurement{}); err == nil { + if _, err := exp.SubmitAndUpdateMeasurementContext(ctx, &model.Measurement{}); err == nil { t.Fatal("we should not be able to submit before OpenReport") } err = exp.OpenReportContext(ctx) @@ -403,7 +403,7 @@ func TestSubmitAndUpdateMeasurementWithClosedReport(t *testing.T) { } exp := builder.NewExperiment() m := new(model.Measurement) - err = exp.SubmitAndUpdateMeasurementContext(context.Background(), m) + _, err = exp.SubmitAndUpdateMeasurementContext(context.Background(), m) if err == nil { t.Fatal("expected an error here") } diff --git a/internal/mocks/experiment.go b/internal/mocks/experiment.go index 3063ab3ae1..c12a7d4b48 100644 --- a/internal/mocks/experiment.go +++ b/internal/mocks/experiment.go @@ -22,7 +22,7 @@ type Experiment struct { MockSaveMeasurement func(measurement *model.Measurement, filePath string) error MockSubmitAndUpdateMeasurementContext func( - ctx context.Context, measurement *model.Measurement) error + ctx context.Context, measurement *model.Measurement) (string, error) MockOpenReportContext func(ctx context.Context) error } @@ -53,7 +53,7 @@ func (e *Experiment) SaveMeasurement(measurement *model.Measurement, filePath st } func (e *Experiment) SubmitAndUpdateMeasurementContext( - ctx context.Context, measurement *model.Measurement) error { + ctx context.Context, measurement *model.Measurement) (string, error) { return e.MockSubmitAndUpdateMeasurementContext(ctx, measurement) } diff --git a/internal/mocks/experiment_test.go b/internal/mocks/experiment_test.go index ecd1f157b9..d5e3a35831 100644 --- a/internal/mocks/experiment_test.go +++ b/internal/mocks/experiment_test.go @@ -93,11 +93,11 @@ func TestExperiment(t *testing.T) { t.Run("SubmitAndUpdateMeasurementContext", func(t *testing.T) { expected := errors.New("mocked err") e := &Experiment{ - MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error { - return expected + MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) (string, error) { + return "", expected }, } - err := e.SubmitAndUpdateMeasurementContext(context.Background(), &model.Measurement{}) + _, err := e.SubmitAndUpdateMeasurementContext(context.Background(), &model.Measurement{}) if !errors.Is(err, expected) { t.Fatal("unexpected err", err) } diff --git a/internal/mocks/submitter.go b/internal/mocks/submitter.go index e6af6460ba..a01371ef10 100644 --- a/internal/mocks/submitter.go +++ b/internal/mocks/submitter.go @@ -8,10 +8,10 @@ import ( // Submitter mocks model.Submitter. type Submitter struct { - MockSubmit func(ctx context.Context, m *model.Measurement) error + MockSubmit func(ctx context.Context, m *model.Measurement) (string, error) } // Submit calls MockSubmit -func (s *Submitter) Submit(ctx context.Context, m *model.Measurement) error { +func (s *Submitter) Submit(ctx context.Context, m *model.Measurement) (string, error) { return s.MockSubmit(ctx, m) } diff --git a/internal/mocks/submitter_test.go b/internal/mocks/submitter_test.go index a7928e9ff3..4dda2857ec 100644 --- a/internal/mocks/submitter_test.go +++ b/internal/mocks/submitter_test.go @@ -12,11 +12,11 @@ func TestSubmitter(t *testing.T) { t.Run("Submit", func(t *testing.T) { expect := errors.New("mocked error") s := &Submitter{ - MockSubmit: func(ctx context.Context, m *model.Measurement) error { - return expect + MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) { + return "", expect }, } - err := s.Submit(context.Background(), &model.Measurement{}) + _, err := s.Submit(context.Background(), &model.Measurement{}) if !errors.Is(err, expect) { t.Fatal("unexpected err", err) } diff --git a/internal/model/experiment.go b/internal/model/experiment.go index 31b80f01fb..319fe599d5 100644 --- a/internal/model/experiment.go +++ b/internal/model/experiment.go @@ -186,7 +186,7 @@ type Experiment interface { // SubmitAndUpdateMeasurementContext submits a measurement and updates the // fields whose value has changed as part of the submission. SubmitAndUpdateMeasurementContext( - ctx context.Context, measurement *Measurement) error + ctx context.Context, measurement *Measurement) (string, error) // OpenReportContext will open a report using the given context // to possibly limit the lifetime of this operation. @@ -322,7 +322,7 @@ type ExperimentTargetLoader interface { type Submitter interface { // Submit submits the measurement and updates its // report ID field in case of success. - Submit(ctx context.Context, m *Measurement) error + Submit(ctx context.Context, m *Measurement) (string, error) } // Saver saves a measurement on some persistent storage. diff --git a/internal/oonirun/experiment.go b/internal/oonirun/experiment.go index 0435b21d15..c28f451d71 100644 --- a/internal/oonirun/experiment.go +++ b/internal/oonirun/experiment.go @@ -263,10 +263,11 @@ type experimentSubmitterWrapper struct { logger model.Logger } -func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) error { - if err := sw.child.Submit(ctx, idx, m); err != nil { +func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) (string, error) { + mstUID, err := sw.child.Submit(ctx, idx, m) + if err != nil { sw.logger.Warnf("submitting measurement failed: %s", err.Error()) } // policy: we do not stop the loop if measurement submission fails - return nil + return mstUID, nil } diff --git a/internal/oonirun/experiment_test.go b/internal/oonirun/experiment_test.go index c840d7a175..3c841c907e 100644 --- a/internal/oonirun/experiment_test.go +++ b/internal/oonirun/experiment_test.go @@ -93,9 +93,9 @@ func TestExperimentRunWithFailureToSubmitAndShuffle(t *testing.T) { newTargetLoaderFn: nil, newSubmitterFn: func(ctx context.Context) (model.Submitter, error) { subm := &mocks.Submitter{ - MockSubmit: func(ctx context.Context, m *model.Measurement) error { + MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) { failedToSubmit++ - return errors.New("mocked error") + return "", errors.New("mocked error") }, } return subm, nil diff --git a/internal/oonirun/inputprocessor.go b/internal/oonirun/inputprocessor.go index 57b4f44898..d44a175568 100644 --- a/internal/oonirun/inputprocessor.go +++ b/internal/oonirun/inputprocessor.go @@ -87,7 +87,7 @@ func (ipsw inputProcessorSaverWrapper) SaveMeasurement( // InputProcessorSubmitterWrapper is InputProcessor's // wrapper for a Submitter implementation. type InputProcessorSubmitterWrapper interface { - Submit(ctx context.Context, idx int, m *model.Measurement) error + Submit(ctx context.Context, idx int, m *model.Measurement) (string, error) } type inputProcessorSubmitterWrapper struct { @@ -101,7 +101,7 @@ func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmit } func (ipsw inputProcessorSubmitterWrapper) Submit( - ctx context.Context, idx int, m *model.Measurement) error { + ctx context.Context, idx int, m *model.Measurement) (string, error) { return ipsw.submitter.Submit(ctx, m) } @@ -141,7 +141,7 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) { return 0, err } meas.AddAnnotations(ip.Annotations) - err = ip.Submitter.Submit(ctx, idx, meas) + _, err = ip.Submitter.Submit(ctx, idx, meas) if err != nil { // TODO(bassosimone): when re-reading this code, I find it confusing that // we return on error because I am always like "wait, this is not the right diff --git a/internal/oonirun/inputprocessor_test.go b/internal/oonirun/inputprocessor_test.go index 1c8ee6ca6b..9cfb53ce9c 100644 --- a/internal/oonirun/inputprocessor_test.go +++ b/internal/oonirun/inputprocessor_test.go @@ -55,9 +55,9 @@ type FakeInputProcessorSubmitter struct { } func (fips *FakeInputProcessorSubmitter) Submit( - ctx context.Context, m *model.Measurement) error { + ctx context.Context, m *model.Measurement) (string, error) { fips.M = append(fips.M, m) - return fips.Err + return "", fips.Err } func TestInputProcessorSubmissionFailed(t *testing.T) { diff --git a/internal/oonirun/submitter.go b/internal/oonirun/submitter.go index ee0ddc7565..326619ec91 100644 --- a/internal/oonirun/submitter.go +++ b/internal/oonirun/submitter.go @@ -46,8 +46,8 @@ func NewSubmitter(ctx context.Context, config SubmitterConfig) (Submitter, error type stubSubmitter struct{} -func (stubSubmitter) Submit(ctx context.Context, m *model.Measurement) error { - return nil +func (stubSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) { + return "", nil } var _ Submitter = stubSubmitter{} @@ -57,7 +57,7 @@ type realSubmitter struct { logger model.Logger } -func (rs realSubmitter) Submit(ctx context.Context, m *model.Measurement) error { +func (rs realSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) { rs.logger.Info("submitting measurement to OONI collector; please be patient...") return rs.subm.Submit(ctx, m) } diff --git a/internal/oonirun/submitter_test.go b/internal/oonirun/submitter_test.go index e1217fa207..133545e0d6 100644 --- a/internal/oonirun/submitter_test.go +++ b/internal/oonirun/submitter_test.go @@ -22,7 +22,8 @@ func TestSubmitterNotEnabled(t *testing.T) { t.Fatal("we did not get a stubSubmitter instance") } m := new(model.Measurement) - if err := submitter.Submit(ctx, m); err != nil { + _, err = submitter.Submit(ctx, m) + if err != nil { t.Fatal(err) } } @@ -32,11 +33,11 @@ type FakeSubmitter struct { Error error } -func (fs *FakeSubmitter) Submit(ctx context.Context, m *model.Measurement) error { +func (fs *FakeSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) { if fs.Calls != nil { fs.Calls.Add(1) } - return fs.Error + return "", fs.Error } var _ Submitter = &FakeSubmitter{} @@ -83,7 +84,7 @@ func TestNewSubmitterWithFailedSubmission(t *testing.T) { t.Fatal(err) } m := new(model.Measurement) - err = submitter.Submit(context.Background(), m) + _, err = submitter.Submit(context.Background(), m) if !errors.Is(err, expected) { t.Fatalf("not the error we expected: %+v", err) } diff --git a/internal/oonirun/v2_test.go b/internal/oonirun/v2_test.go index 63d3075af0..d1f11214d0 100644 --- a/internal/oonirun/v2_test.go +++ b/internal/oonirun/v2_test.go @@ -486,7 +486,7 @@ func TestV2MeasureDescriptor(t *testing.T) { // represents a fundamental failure in setting up the experiment sess.MockNewSubmitter = func(ctx context.Context) (model.Submitter, error) { subm := &mocks.Submitter{ - MockSubmit: func(ctx context.Context, m *model.Measurement) error { + MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) { panic("should not be called") }, } diff --git a/internal/probeservices/collector.go b/internal/probeservices/collector.go index 907c749b01..312d25b159 100644 --- a/internal/probeservices/collector.go +++ b/internal/probeservices/collector.go @@ -101,7 +101,7 @@ func (r reportChan) CanSubmit(m *model.Measurement) bool { // such that it contains the report ID for which it has been // submitted. Otherwise, we'll set the report ID to the empty // string, so that you know which measurements weren't submitted. -func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { +func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error) { // TODO(bassosimone): do we need to prevent measurement submission // if the measurement isn't consistent with the orig template? @@ -109,7 +109,7 @@ func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) URL, err := urlx.ResolveReference(r.client.BaseURL, fmt.Sprintf("/report/%s", r.ID), "") if err != nil { - return err + return "", err } apiReq := model.OOAPICollectorUpdateRequest{ @@ -131,13 +131,13 @@ func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) if err != nil { m.ReportID = "" - return err + return "", err } // TODO(bassosimone): we should use the session logger here but for now this stopgap // solution will allow observing the measurement URL for CLI users. log.Printf("Measurement URL: https://explorer.ooni.org/m/%s", updateResponse.MeasurementUID) - return nil + return updateResponse.MeasurementUID, nil } // ReportID returns the report ID. @@ -150,7 +150,7 @@ func (r reportChan) ReportID() string { type ReportChannel interface { CanSubmit(m *model.Measurement) bool ReportID() string - SubmitMeasurement(ctx context.Context, m *model.Measurement) error + SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error) } var _ ReportChannel = &reportChan{} @@ -182,14 +182,14 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter { // Submit submits the current measurement to the OONI backend created using // the ReportOpener passed to the constructor. -func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error { +func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) (string, error) { var err error sub.mu.Lock() defer sub.mu.Unlock() if sub.channel == nil || !sub.channel.CanSubmit(m) { sub.channel, err = sub.opener.OpenReport(ctx, NewReportTemplate(m)) if err != nil { - return err + return "", err } sub.logger.Infof("New reportID: %s", sub.channel.ReportID()) } diff --git a/internal/probeservices/collector_test.go b/internal/probeservices/collector_test.go index ff814f9089..3e8ea9604e 100644 --- a/internal/probeservices/collector_test.go +++ b/internal/probeservices/collector_test.go @@ -108,7 +108,8 @@ func TestReportLifecycle(t *testing.T) { // attempt to submit the measurement to the backend, which should succeed // since we've just opened a report for it - if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil { + _, err = report.SubmitMeasurement(context.Background(), &measurement) + if err != nil { t.Fatal(err) } @@ -168,7 +169,8 @@ func TestReportLifecycle(t *testing.T) { // attempt to submit the measurement to the backend, which should succeed // since we've just opened a report for it - if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil { + _, err = report.SubmitMeasurement(context.Background(), &measurement) + if err != nil { t.Fatal(err) } @@ -231,7 +233,8 @@ func TestReportLifecycle(t *testing.T) { // attempt to submit the measurement to the backend, which should succeed // since we've just opened a report for it - if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil { + _, err = report.SubmitMeasurement(context.Background(), &measurement) + if err != nil { t.Fatal(err) } @@ -376,7 +379,7 @@ func TestReportLifecycle(t *testing.T) { } // update the report - err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) + _, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) // we do expect an error if !errors.Is(err, netxlite.ECONNRESET) { @@ -418,7 +421,7 @@ func TestReportLifecycle(t *testing.T) { } // update the report - err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) + _, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) // we do expect an error if err == nil || err.Error() != "unexpected end of JSON input" { @@ -444,7 +447,7 @@ func TestReportLifecycle(t *testing.T) { } // update the report - err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) + _, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{}) // we do expect an error if err == nil || err.Error() != `parse "\t\t\t": net/url: invalid control character in URL` { @@ -665,7 +668,8 @@ func TestReportLifecycle(t *testing.T) { // attempt to submit the measurement to the backend, which should succeed // since we've just opened a report for it - if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil { + _, err = report.SubmitMeasurement(context.Background(), &measurement) + if err != nil { t.Fatal(err) } @@ -687,14 +691,14 @@ func (rrc *RecordingReportChannel) CanSubmit(m *model.Measurement) bool { return reflect.DeepEqual(NewReportTemplate(m), rrc.tmpl) } -func (rrc *RecordingReportChannel) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { +func (rrc *RecordingReportChannel) SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error) { if ctx.Err() != nil { - return ctx.Err() + return "", ctx.Err() } rrc.mu.Lock() defer rrc.mu.Unlock() rrc.m = append(rrc.m, m) - return nil + return "", nil } func (rrc *RecordingReportChannel) Close(ctx context.Context) error { @@ -755,15 +759,18 @@ func TestSubmitterLifecyle(t *testing.T) { submitter := NewSubmitter(rro, log.Log) ctx := context.Background() m1 := makeMeasurementWithoutTemplate("example") - if err := submitter.Submit(ctx, m1); err != nil { + _, err := submitter.Submit(ctx, m1) + if err != nil { t.Fatal(err) } m2 := makeMeasurementWithoutTemplate("example") - if err := submitter.Submit(ctx, m2); err != nil { + _, err = submitter.Submit(ctx, m2) + if err != nil { t.Fatal(err) } m3 := makeMeasurementWithoutTemplate("example_extended") - if err := submitter.Submit(ctx, m3); err != nil { + _, err = submitter.Submit(ctx, m3) + if err != nil { t.Fatal(err) } if len(rro.channels) != 2 { @@ -783,15 +790,18 @@ func TestSubmitterCannotOpenNewChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // fail immediately m1 := makeMeasurementWithoutTemplate("example") - if err := submitter.Submit(ctx, m1); !errors.Is(err, context.Canceled) { + _, err := submitter.Submit(ctx, m1) + if !errors.Is(err, context.Canceled) { t.Fatal("not the error we expected") } m2 := makeMeasurementWithoutTemplate("example") - if err := submitter.Submit(ctx, m2); !errors.Is(err, context.Canceled) { + _, err = submitter.Submit(ctx, m2) + if !errors.Is(err, context.Canceled) { t.Fatal(err) } m3 := makeMeasurementWithoutTemplate("example_extended") - if err := submitter.Submit(ctx, m3); !errors.Is(err, context.Canceled) { + _, err = submitter.Submit(ctx, m3) + if !errors.Is(err, context.Canceled) { t.Fatal(err) } if len(rro.channels) != 0 { diff --git a/pkg/oonimkall/session.go b/pkg/oonimkall/session.go index f557aeb94e..4b21316ac6 100644 --- a/pkg/oonimkall/session.go +++ b/pkg/oonimkall/session.go @@ -302,6 +302,9 @@ type SubmitMeasurementResults struct { // UpdatedReportID is the report ID used for the measurement. UpdatedReportID string + + // MeasurementUID is the measurement unique identifier returned from the backend + MeasurementUID string } // Submit submits the given measurement and returns the results. @@ -322,7 +325,8 @@ func (sess *Session) Submit(ctx *Context, measurement string) (*SubmitMeasuremen if err := json.Unmarshal([]byte(measurement), &mm); err != nil { return nil, err } - if err := sess.submitter.Submit(ctx.ctx, &mm); err != nil { + muid, err := sess.submitter.Submit(ctx.ctx, &mm) + if err != nil { return nil, err } data, err := json.Marshal(mm) @@ -330,6 +334,7 @@ func (sess *Session) Submit(ctx *Context, measurement string) (*SubmitMeasuremen return &SubmitMeasurementResults{ UpdatedMeasurement: string(data), UpdatedReportID: mm.ReportID, + MeasurementUID: muid, }, nil } diff --git a/pkg/oonimkall/taskmodel.go b/pkg/oonimkall/taskmodel.go index 41e5b19ea6..cd12340053 100644 --- a/pkg/oonimkall/taskmodel.go +++ b/pkg/oonimkall/taskmodel.go @@ -112,6 +112,7 @@ type eventMeasurementGeneric struct { Idx int64 `json:"idx"` Input string `json:"input"` JSONStr string `json:"json_str,omitempty"` + MeasurementUID string `json:"measurement_uid,omitempty"` } type eventStatusEnd struct { diff --git a/pkg/oonimkall/taskrunner.go b/pkg/oonimkall/taskrunner.go index 498bbc1fc8..094ff10c30 100644 --- a/pkg/oonimkall/taskrunner.go +++ b/pkg/oonimkall/taskrunner.go @@ -352,13 +352,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) { // if possible, submit the measurement to the OONI backend if !r.settings.Options.NoCollector { logger.Info("Submitting measurement... please, be patient") - err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m) + muid, err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m) warnOnFailure(logger, "cannot submit measurement", err) r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{ Idx: int64(idx), Input: target.Input(), JSONStr: string(data), Failure: measurementSubmissionFailure(err), + MeasurementUID: muid, }) } diff --git a/pkg/oonimkall/taskrunner_test.go b/pkg/oonimkall/taskrunner_test.go index 432a8b3073..afceb58034 100644 --- a/pkg/oonimkall/taskrunner_test.go +++ b/pkg/oonimkall/taskrunner_test.go @@ -228,8 +228,8 @@ func TestTaskRunnerRun(t *testing.T) { MockMeasureWithContext: func(ctx context.Context, target model.ExperimentTarget) (*model.Measurement, error) { return &model.Measurement{}, nil }, - MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error { - return nil + MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) (string, error) { + return "", nil }, }, @@ -667,8 +667,8 @@ func TestTaskRunnerRun(t *testing.T) { }, } } - fake.Experiment.MockSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error { - return errors.New("cannot submit") + fake.Experiment.MockSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) (string, error) { + return "", errors.New("cannot submit") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter)