From e279f39903771e450d0275066a79a10bdeb32438 Mon Sep 17 00:00:00 2001 From: Trevor McMaster Date: Mon, 6 Nov 2023 14:46:58 -0700 Subject: [PATCH] Pewpew try hang (#177) * Simplified the provider names in example * Fixed the try hang for both provider_chain and provider spread - provider_loop still has issues with try scripts * Added example with provider collect * Added additional logging to find try script issues with provider_loop example * Fixed cargo fmt --- examples/provider_collect.yaml | 58 +++++++++++++++++++ examples/provider_loop_with_counter.yaml | 5 +- examples/provider_spread.yaml | 73 +++++++++++------------- lib/channel/src/lib.rs | 59 +++++++++++++++---- src/lib.rs | 1 + 5 files changed, 143 insertions(+), 53 deletions(-) create mode 100644 examples/provider_collect.yaml diff --git a/examples/provider_collect.yaml b/examples/provider_collect.yaml new file mode 100644 index 00000000..3e5a0463 --- /dev/null +++ b/examples/provider_collect.yaml @@ -0,0 +1,58 @@ +# Run this file with the test-server, then specify the PORT as a parameter to this try or run script. + +vars: + port: "${PORT}" + +load_pattern: + - linear: + from: 100% + to: 100% + over: 5s + +providers: + a: + range: {} + b: + response: {} + c: + response: {} + d: + response: {} + e: + response: {} + f: + response: {} + +loggers: + test: + to: stderr + +endpoints: + - method: POST + url: http://localhost:${port} + body: '{"a": ${a}}' + provides: + b: + select: response.body.a + on_demand: true + + - method: POST + declare: + c: collect(b, 10) # Get 10 + url: http://localhost:${port} + body: '{"c": ${c}}' + provides: + d: + # select: response.body.c + select: for_each[0] + for_each: + - response.body.c + on_demand: true + + - method: POST + url: http://localhost:${port} + body: '{"d": ${d}}' + peak_load: 1hps + logs: + test: + select: response.body.d \ No newline at end of file diff --git a/examples/provider_loop_with_counter.yaml b/examples/provider_loop_with_counter.yaml index 4234007b..b1a854dd 100644 --- a/examples/provider_loop_with_counter.yaml +++ b/examples/provider_loop_with_counter.yaml @@ -17,8 +17,6 @@ providers: response: {} c: response: {} - d: - response: {} loggers: test: @@ -43,8 +41,9 @@ endpoints: - method: POST url: http://localhost:${port} - body: '{"b": ${b.value}}' + body: '{"b": ${b.value},"counter": ${b.counter}}' peak_load: 5hps + # on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves provides: b: select: diff --git a/examples/provider_spread.yaml b/examples/provider_spread.yaml index a29fe830..4d48d63c 100644 --- a/examples/provider_spread.yaml +++ b/examples/provider_spread.yaml @@ -2,7 +2,6 @@ vars: port: "${PORT}" - createName: 'test:loadtest-' groupRate: 1 imagesPerGroup: 10 @@ -13,23 +12,19 @@ load_pattern: over: 5s providers: - group_range: # Counter for creating groups + a: # Counter for creating groups range: {} - image_range: # counter for creating images to put in groups + x: # counter for creating images to put in groups range: {} - group_created: # to continue the group APIs + b: # to continue the group APIs response: {} - group_created_for_images: # to create images + b2: # to create images response: {} - image_created: # to continue the image APIs + y: # to continue the image APIs response: {} - group_create_data: + c: response: {} - image_create_data: - response: {} - group_update_data: - response: {} - image_update_data: + z: response: {} loggers: @@ -42,15 +37,14 @@ endpoints: tags: type: create group body: '{ - "id":"${createName}${start_pad(group_range, 6, "0")}", - "name":"TEST-GROUP" + "a":"${start_pad(a, 6, "0")}" }' provides: - group_created: - select: response.body.id + b: + select: response.body.a where: response.status == 200 || response.status == 409 - group_created_for_images: - select: response.body.id + b2: + select: response.body.a for_each: - repeat(imagesPerGroup) # We need to create X copies so each image will have one where: response.status == 200 || response.status == 409 @@ -61,15 +55,14 @@ endpoints: tags: type: create image body: '{ - "id":"${createName}${start_pad(image_range, 8, "0")}", - "groupId":"${group_created_for_images}", - "name":"TEST-IMAGE" + "x":"${start_pad(x, 8, "0")}", + "b":"${b2}" }' provides: - image_created: + y: select: - id: response.body.id - groupId: response.body.groupId + x: response.body.x + b: response.body.b where: response.status == 200 || response.status == 409 peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups @@ -78,14 +71,14 @@ endpoints: tags: type: create group data body: '{ - "id":"${group_created}", + "b":"${b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"A-DATA" } }' provides: - group_create_data: - select: group_created + c: + select: response.body.b where: response.status == 200 peak_load: ${groupRate}hps @@ -94,41 +87,41 @@ endpoints: tags: type: create image body: '{ - "id":"${image_created.id}", - "groupId":"${image_created.groupId}", + "x":"${y.x}", + "b":"${y.b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"X-DATA" } }' provides: - image_create_data: - select: image_created # Puts in the whole object (id and groupId) + z: + select: y # Puts in the whole object (id and groupId) where: response.status == 200 peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups - method: PUT url: http://localhost:${port} body: '{ - "id":"${group_create_data}", + "c":"${c}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-A-DATA" } }' peak_load: ${groupRate}hps logs: test: - select: response.body.id + select: response.body.c - method: PUT url: http://localhost:${port} body: '{ - "id":"${image_create_data.id}", - "groupId":"${image_create_data.groupId}", + "x":"${z.x}", + "b":"${z.b}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-X-DATA" } }' peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups logs: test: - select: response.body.id \ No newline at end of file + select: response.body.x \ No newline at end of file diff --git a/lib/channel/src/lib.rs b/lib/channel/src/lib.rs index e9971f11..16efe5c8 100644 --- a/lib/channel/src/lib.rs +++ b/lib/channel/src/lib.rs @@ -181,6 +181,11 @@ impl Channel { self.on_demand_events.notify(1); } + /// notify all OnDemand with an event listener + fn notify_all_on_demand(&self) { + self.on_demand_events.notify(std::usize::MAX); + } + /// create a listener so an OnDemand can get notice when demand has been requested /// (a receiver tried to receive but the queue was empty) fn on_demand_listen(&self) -> EventListener { @@ -340,7 +345,13 @@ impl Sender { impl Clone for Sender { fn clone(&self) -> Self { let count = self.channel.increment_sender_count(); - info!("Sender::clone: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Clone channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); Sender { channel: self.channel.clone(), listener: None, @@ -353,8 +364,15 @@ impl Clone for Sender { impl Drop for Sender { fn drop(&mut self) { let count = self.channel.decrement_sender_count(); - info!("Sender::drop: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Drop channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); if count == 0 { + info!("Sender::Drop channel {}, notify_all_receivers", self.name()); self.channel.notify_all_receivers(); } } @@ -431,7 +449,7 @@ impl Sink for Sender { if self.no_receivers() { self.listener = None; debug!( - "Sink for Sender:poll_ready {} no_receivers, length: ${}", + "Sink for Sender:poll_ready {} no_receivers, length: {}", self.name(), self.len() ); @@ -531,9 +549,10 @@ pub struct Receiver { impl Clone for Receiver { fn clone(&self) -> Self { let count = self.channel.increment_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Clone cloning channel {}, new count: {}", - self.channel.name, count + "Receiver:Clone channel {}, new count: {}, on_demand_count: {}", + self.channel.name, count, on_demand_count ); Receiver { channel: self.channel.clone(), @@ -547,9 +566,10 @@ impl Clone for Receiver { impl Drop for Receiver { fn drop(&mut self) { let new_count = self.channel.decrement_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Drop channel {}, new count: {}", - self.channel.name, new_count + "Receiver:Drop channel {}, new count: {}, on_demand_count: {}", + self.channel.name, new_count, on_demand_count ); if new_count == 0 { // notify all senders so they will see there are no more receivers @@ -558,6 +578,8 @@ impl Drop for Receiver { "Receiver:Drop channel {}, notify_all_senders", self.channel.name ); + // When there are no more receivers we need to notify the on_demand in addition to the normal senders + self.channel.notify_all_on_demand(); self.channel.notify_all_senders(); } } @@ -570,13 +592,26 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { debug!( "Receiver:poll_next channel {}, length: {}", - self.channel.name, self.channel.name + self.channel.name, + self.channel.len() ); loop { if let Some(listener) = self.listener.as_mut() { match Pin::new(listener).poll(cx) { - Poll::Ready(()) => self.listener = None, - Poll::Pending => return Poll::Pending, + Poll::Ready(()) => { + debug!( + "Receiver:poll_next channel {}, listener Poll::Ready, listener = None", + self.channel.name + ); + self.listener = None; + } + Poll::Pending => { + debug!( + "Receiver:poll_next channel {}, listener Poll::Pending", + self.channel.name + ); + return Poll::Pending; + } } } @@ -674,6 +709,10 @@ impl Stream for OnDemandReceiver { self.channel.len() ); self.listener = None; + debug!( + "OnDemandReceiver::poll_next {} listener: None", + self.channel.name + ); return Poll::Ready(None); } diff --git a/src/lib.rs b/src/lib.rs index 06612eae..7c1235b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1013,6 +1013,7 @@ fn create_try_run_future( let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe()); let mut left = try_join_all(endpoint_calls).map(move |r| { + debug!("create_try_run_future try_join_all finish {:?}", r); let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed)); }); let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {