diff --git a/examples/provider_collect.yaml b/examples/provider_collect.yaml new file mode 100644 index 00000000..aca9c3e2 --- /dev/null +++ b/examples/provider_collect.yaml @@ -0,0 +1,62 @@ +# Run this file with the test-server, then specify the PORT as a parameter to this try or run script. + +vars: + port: ${e:PORT} +load_pattern: + - !linear + from: 100% + to: 100% + over: 5s + +providers: + a: !range + b: !response + buffer: 10 + c: !response + d: !response + e: !response + f: !response + +loggers: + test: + to: stderr + +endpoints: + - method: POST + url: http://localhost:${v:port} + body: !str '{"a": ${p:a}}' + provides: + b: + query: + select: response.body.a + send: block + # on_demand: true + + - method: POST + declare: + c: !c + collects: + - take: 10 + from: ${p:b} + as: _c + then: ${p:_c} + url: http://localhost:${v:port} + body: !str '{"c": ${p:c}}' + # peak_load: 1hps + provides: + d: + query: + # select: response.body.c + select: for_each[0] + for_each: + - response.body.c + send: block + on_demand: true + + - method: POST + url: http://localhost:${v:port} + body: !str '{"d": ${p:d}}' + peak_load: 1hps + logs: + test: + select: response.body.d \ No newline at end of file diff --git a/examples/provider_loop_with_counter.yaml b/examples/provider_loop_with_counter.yaml index e0a3786c..1ee6a421 100644 --- a/examples/provider_loop_with_counter.yaml +++ b/examples/provider_loop_with_counter.yaml @@ -14,7 +14,6 @@ providers: a: !range b: !response c: !response - d: !response loggers: test: @@ -41,8 +40,9 @@ endpoints: - method: POST url: http://localhost:${v:port} - body: !str '{"b": ${x:${p:b}.value}}' + body: !str '{"b": ${x:${p:b}.value}, "counter": ${x:${p:b}.counter}}' peak_load: 5hps + # on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves provides: b: query: diff --git a/examples/provider_spread.yaml b/examples/provider_spread.yaml index ab363d79..c48bc073 100644 --- a/examples/provider_spread.yaml +++ b/examples/provider_spread.yaml @@ -2,7 +2,6 @@ vars: port: "${e:PORT}" - createName: 'test:loadtest-' groupRate: 1 imagesPerGroup: 10 @@ -13,20 +12,18 @@ load_pattern: over: 5s providers: - group_range: # Counter for creating groups + a: # Counter for creating groups !range - image_range: # counter for creating images to put in groups + x: # counter for creating images to put in groups !range - group_created: # to continue the group APIs + b: # to continue the group APIs !response - group_created_for_images: # to create images + b2: # to create images !response - image_created: # to continue the image APIs + y: # to continue the image APIs !response - group_create_data: !response - image_create_data: !response - group_update_data: !response - image_update_data: !response + c: !response + z: !response loggers: test: @@ -38,18 +35,17 @@ endpoints: tags: type: create group body: !str '{ - "id":"${v:createName}${x:start_pad(${p:group_range}, 6, "0")}", - "name":"TEST-GROUP" + "a":"${x:start_pad(${p:a}, 6, "0")}" }' provides: - group_created: + b: query: - select: response.body.id + select: response.body.a where: response.status == 200 || response.status == 409 send: if_not_full - group_created_for_images: + b2: query: - select: response.body.id + select: response.body.a for_each: - repeat(_v.imagesPerGroup) # We need to create X copies so each image will have one where: response.status == 200 || response.status == 409 @@ -61,16 +57,15 @@ endpoints: tags: type: create image body: !str '{ - "id":"${v:createName}${x:start_pad(${p:image_range}, 8, "0")}", - "groupId":"${p:group_created_for_images}", - "name":"TEST-IMAGE" + "x":"${x:start_pad(${p:x}, 8, "0")}", + "b":"${p:b2}" }' provides: - image_created: + y: query: select: - id: response.body.id - groupId: response.body.groupId + x: response.body.x + b: response.body.b where: response.status == 200 || response.status == 409 send: if_not_full peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups @@ -80,15 +75,15 @@ endpoints: tags: type: create group data body: !str '{ - "id":"${p:group_created}", + "b":"${p:b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"A-DATA" } }' provides: - group_create_data: + c: query: - select: group_created + select: response.body.b where: response.status == 200 send: if_not_full peak_load: ${v:groupRate}hps @@ -98,16 +93,16 @@ endpoints: tags: type: create image body: !str '{ - "id":"${x:${p:image_created}.id}", - "groupId":"${x:${p:image_created}.groupId}", + "x":"${x:${p:y}.x}", + "b":"${x:${p:y}.b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"X-DATA" } }' provides: - image_create_data: + z: query: - select: image_created # Puts in the whole object (id and groupId) + select: y # Puts in the whole object (id and groupId) where: response.status == 200 send: if_not_full peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups @@ -115,26 +110,26 @@ endpoints: - method: PUT url: http://localhost:${v:port} body: !str '{ - "id":"${p:group_create_data}", + "c":"${p:c}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-A-DATA" } }' peak_load: ${v:groupRate}hps logs: test: - select: response.body.id + select: response.body.c - method: PUT url: http://localhost:${v:port} body: !str '{ - "id":"${x:${p:image_create_data}.id}", - "groupId":"${x:${p:image_create_data}.groupId}", + "x":"${x:${p:z}.x}", + "b":"${x:${p:z}.b}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-X-DATA" } }' peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups logs: test: - select: response.body.id + select: response.body.x diff --git a/lib/channel/src/lib.rs b/lib/channel/src/lib.rs index 6363725e..3ab6be72 100644 --- a/lib/channel/src/lib.rs +++ b/lib/channel/src/lib.rs @@ -181,6 +181,11 @@ impl Channel { self.on_demand_events.notify(1); } + /// notify all OnDemand with an event listener + fn notify_all_on_demand(&self) { + self.on_demand_events.notify(std::usize::MAX); + } + /// create a listener so an OnDemand can get notice when demand has been requested /// (a receiver tried to receive but the queue was empty) fn on_demand_listen(&self) -> EventListener { @@ -340,7 +345,13 @@ impl Sender { impl Clone for Sender { fn clone(&self) -> Self { let count = self.channel.increment_sender_count(); - info!("Sender::clone: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Clone channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); Sender { channel: self.channel.clone(), listener: None, @@ -353,8 +364,15 @@ impl Clone for Sender { impl Drop for Sender { fn drop(&mut self) { let count = self.channel.decrement_sender_count(); - info!("Sender::drop: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Drop channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); if count == 0 { + info!("Sender::Drop channel {}, notify_all_receivers", self.name()); self.channel.notify_all_receivers(); } } @@ -431,7 +449,7 @@ impl Sink for Sender { if self.no_receivers() { self.listener = None; debug!( - "Sink for Sender:poll_ready {} no_receivers, length: ${}", + "Sink for Sender:poll_ready {} no_receivers, length: {}", self.name(), self.len() ); @@ -531,9 +549,10 @@ pub struct Receiver { impl Clone for Receiver { fn clone(&self) -> Self { let count = self.channel.increment_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Clone cloning channel {}, new count: {}", - self.channel.name, count + "Receiver:Clone channel {}, new count: {}, on_demand_count: {}", + self.channel.name, count, on_demand_count ); Receiver { channel: self.channel.clone(), @@ -547,9 +566,10 @@ impl Clone for Receiver { impl Drop for Receiver { fn drop(&mut self) { let new_count = self.channel.decrement_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Drop channel {}, new count: {}", - self.channel.name, new_count + "Receiver:Drop channel {}, new count: {}, on_demand_count: {}", + self.channel.name, new_count, on_demand_count ); if new_count == 0 { // notify all senders so they will see there are no more receivers @@ -558,6 +578,8 @@ impl Drop for Receiver { "Receiver:Drop channel {}, notify_all_senders", self.channel.name ); + // When there are no more receivers we need to notify the on_demand in addition to the normal senders + self.channel.notify_all_on_demand(); self.channel.notify_all_senders(); } } @@ -576,8 +598,20 @@ impl Stream for Receiver { loop { if let Some(listener) = self.listener.as_mut() { match Pin::new(listener).poll(cx) { - Poll::Ready(()) => self.listener = None, - Poll::Pending => return Poll::Pending, + Poll::Ready(()) => { + debug!( + "Receiver:poll_next channel {}, listener Poll::Ready, listener = None", + self.channel.name + ); + self.listener = None; + } + Poll::Pending => { + debug!( + "Receiver:poll_next channel {}, listener Poll::Pending", + self.channel.name + ); + return Poll::Pending; + } } } @@ -675,6 +709,10 @@ impl Stream for OnDemandReceiver { self.channel.len() ); self.listener = None; + debug!( + "OnDemandReceiver::poll_next {} listener: None", + self.channel.name + ); return Poll::Ready(None); } diff --git a/lib/config-wasm/tests/legacy_examples/provider_collect.yaml b/lib/config-wasm/tests/legacy_examples/provider_collect.yaml new file mode 100644 index 00000000..3e5a0463 --- /dev/null +++ b/lib/config-wasm/tests/legacy_examples/provider_collect.yaml @@ -0,0 +1,58 @@ +# Run this file with the test-server, then specify the PORT as a parameter to this try or run script. + +vars: + port: "${PORT}" + +load_pattern: + - linear: + from: 100% + to: 100% + over: 5s + +providers: + a: + range: {} + b: + response: {} + c: + response: {} + d: + response: {} + e: + response: {} + f: + response: {} + +loggers: + test: + to: stderr + +endpoints: + - method: POST + url: http://localhost:${port} + body: '{"a": ${a}}' + provides: + b: + select: response.body.a + on_demand: true + + - method: POST + declare: + c: collect(b, 10) # Get 10 + url: http://localhost:${port} + body: '{"c": ${c}}' + provides: + d: + # select: response.body.c + select: for_each[0] + for_each: + - response.body.c + on_demand: true + + - method: POST + url: http://localhost:${port} + body: '{"d": ${d}}' + peak_load: 1hps + logs: + test: + select: response.body.d \ No newline at end of file diff --git a/lib/config-wasm/tests/legacy_examples/provider_loop_with_counter.yaml b/lib/config-wasm/tests/legacy_examples/provider_loop_with_counter.yaml index 4234007b..b1a854dd 100644 --- a/lib/config-wasm/tests/legacy_examples/provider_loop_with_counter.yaml +++ b/lib/config-wasm/tests/legacy_examples/provider_loop_with_counter.yaml @@ -17,8 +17,6 @@ providers: response: {} c: response: {} - d: - response: {} loggers: test: @@ -43,8 +41,9 @@ endpoints: - method: POST url: http://localhost:${port} - body: '{"b": ${b.value}}' + body: '{"b": ${b.value},"counter": ${b.counter}}' peak_load: 5hps + # on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves provides: b: select: diff --git a/lib/config-wasm/tests/legacy_examples/provider_spread.yaml b/lib/config-wasm/tests/legacy_examples/provider_spread.yaml index a29fe830..4d48d63c 100644 --- a/lib/config-wasm/tests/legacy_examples/provider_spread.yaml +++ b/lib/config-wasm/tests/legacy_examples/provider_spread.yaml @@ -2,7 +2,6 @@ vars: port: "${PORT}" - createName: 'test:loadtest-' groupRate: 1 imagesPerGroup: 10 @@ -13,23 +12,19 @@ load_pattern: over: 5s providers: - group_range: # Counter for creating groups + a: # Counter for creating groups range: {} - image_range: # counter for creating images to put in groups + x: # counter for creating images to put in groups range: {} - group_created: # to continue the group APIs + b: # to continue the group APIs response: {} - group_created_for_images: # to create images + b2: # to create images response: {} - image_created: # to continue the image APIs + y: # to continue the image APIs response: {} - group_create_data: + c: response: {} - image_create_data: - response: {} - group_update_data: - response: {} - image_update_data: + z: response: {} loggers: @@ -42,15 +37,14 @@ endpoints: tags: type: create group body: '{ - "id":"${createName}${start_pad(group_range, 6, "0")}", - "name":"TEST-GROUP" + "a":"${start_pad(a, 6, "0")}" }' provides: - group_created: - select: response.body.id + b: + select: response.body.a where: response.status == 200 || response.status == 409 - group_created_for_images: - select: response.body.id + b2: + select: response.body.a for_each: - repeat(imagesPerGroup) # We need to create X copies so each image will have one where: response.status == 200 || response.status == 409 @@ -61,15 +55,14 @@ endpoints: tags: type: create image body: '{ - "id":"${createName}${start_pad(image_range, 8, "0")}", - "groupId":"${group_created_for_images}", - "name":"TEST-IMAGE" + "x":"${start_pad(x, 8, "0")}", + "b":"${b2}" }' provides: - image_created: + y: select: - id: response.body.id - groupId: response.body.groupId + x: response.body.x + b: response.body.b where: response.status == 200 || response.status == 409 peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups @@ -78,14 +71,14 @@ endpoints: tags: type: create group data body: '{ - "id":"${group_created}", + "b":"${b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"A-DATA" } }' provides: - group_create_data: - select: group_created + c: + select: response.body.b where: response.status == 200 peak_load: ${groupRate}hps @@ -94,41 +87,41 @@ endpoints: tags: type: create image body: '{ - "id":"${image_created.id}", - "groupId":"${image_created.groupId}", + "x":"${y.x}", + "b":"${y.b}", "data":{ - "subdata":"TEST-DATA" + "subdata":"X-DATA" } }' provides: - image_create_data: - select: image_created # Puts in the whole object (id and groupId) + z: + select: y # Puts in the whole object (id and groupId) where: response.status == 200 peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups - method: PUT url: http://localhost:${port} body: '{ - "id":"${group_create_data}", + "c":"${c}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-A-DATA" } }' peak_load: ${groupRate}hps logs: test: - select: response.body.id + select: response.body.c - method: PUT url: http://localhost:${port} body: '{ - "id":"${image_create_data.id}", - "groupId":"${image_create_data.groupId}", + "x":"${z.x}", + "b":"${z.b}", "data":{ - "subdata":"UPDATED-TEST-DATA" + "subdata":"UPDATED-X-DATA" } }' peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups logs: test: - select: response.body.id \ No newline at end of file + select: response.body.x \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index e4d1530b..50ece6eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1055,6 +1055,7 @@ ${{join(response.headers_all, '\n', ': ')}}\n\ let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe()); let mut left = try_join_all(endpoint_calls).map(move |r| { + debug!("create_try_run_future try_join_all finish {:?}", r); let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed)); }); let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {