Skip to content

Commit

Permalink
Merge branch 'master' into tryHangScripting
Browse files Browse the repository at this point in the history
  • Loading branch information
tkmcmaster committed Nov 6, 2023
2 parents d914a5e + e279f39 commit 0f4bc8a
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 92 deletions.
62 changes: 62 additions & 0 deletions examples/provider_collect.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Run this file with the test-server, then specify the PORT as a parameter to this try or run script.

vars:
port: ${e:PORT}
load_pattern:
- !linear
from: 100%
to: 100%
over: 5s

providers:
a: !range
b: !response
buffer: 10
c: !response
d: !response
e: !response
f: !response

loggers:
test:
to: stderr

endpoints:
- method: POST
url: http://localhost:${v:port}
body: !str '{"a": ${p:a}}'
provides:
b:
query:
select: response.body.a
send: block
# on_demand: true

- method: POST
declare:
c: !c
collects:
- take: 10
from: ${p:b}
as: _c
then: ${p:_c}
url: http://localhost:${v:port}
body: !str '{"c": ${p:c}}'
# peak_load: 1hps
provides:
d:
query:
# select: response.body.c
select: for_each[0]
for_each:
- response.body.c
send: block
on_demand: true

- method: POST
url: http://localhost:${v:port}
body: !str '{"d": ${p:d}}'
peak_load: 1hps
logs:
test:
select: response.body.d
4 changes: 2 additions & 2 deletions examples/provider_loop_with_counter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ providers:
a: !range
b: !response
c: !response
d: !response

loggers:
test:
Expand All @@ -41,8 +40,9 @@ endpoints:

- method: POST
url: http://localhost:${v:port}
body: !str '{"b": ${x:${p:b}.value}}'
body: !str '{"b": ${x:${p:b}.value}, "counter": ${x:${p:b}.counter}}'
peak_load: 5hps
# on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves
provides:
b:
query:
Expand Down
71 changes: 33 additions & 38 deletions examples/provider_spread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

vars:
port: "${e:PORT}"
createName: 'test:loadtest-'
groupRate: 1
imagesPerGroup: 10

Expand All @@ -13,20 +12,18 @@ load_pattern:
over: 5s

providers:
group_range: # Counter for creating groups
a: # Counter for creating groups
!range
image_range: # counter for creating images to put in groups
x: # counter for creating images to put in groups
!range
group_created: # to continue the group APIs
b: # to continue the group APIs
!response
group_created_for_images: # to create images
b2: # to create images
!response
image_created: # to continue the image APIs
y: # to continue the image APIs
!response
group_create_data: !response
image_create_data: !response
group_update_data: !response
image_update_data: !response
c: !response
z: !response

loggers:
test:
Expand All @@ -38,18 +35,17 @@ endpoints:
tags:
type: create group
body: !str '{
"id":"${v:createName}${x:start_pad(${p:group_range}, 6, "0")}",
"name":"TEST-GROUP"
"a":"${x:start_pad(${p:a}, 6, "0")}"
}'
provides:
group_created:
b:
query:
select: response.body.id
select: response.body.a
where: response.status == 200 || response.status == 409
send: if_not_full
group_created_for_images:
b2:
query:
select: response.body.id
select: response.body.a
for_each:
- repeat(_v.imagesPerGroup) # We need to create X copies so each image will have one
where: response.status == 200 || response.status == 409
Expand All @@ -61,16 +57,15 @@ endpoints:
tags:
type: create image
body: !str '{
"id":"${v:createName}${x:start_pad(${p:image_range}, 8, "0")}",
"groupId":"${p:group_created_for_images}",
"name":"TEST-IMAGE"
"x":"${x:start_pad(${p:x}, 8, "0")}",
"b":"${p:b2}"
}'
provides:
image_created:
y:
query:
select:
id: response.body.id
groupId: response.body.groupId
x: response.body.x
b: response.body.b
where: response.status == 200 || response.status == 409
send: if_not_full
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
Expand All @@ -80,15 +75,15 @@ endpoints:
tags:
type: create group data
body: !str '{
"id":"${p:group_created}",
"b":"${p:b}",
"data":{
"subdata":"TEST-DATA"
"subdata":"A-DATA"
}
}'
provides:
group_create_data:
c:
query:
select: group_created
select: response.body.b
where: response.status == 200
send: if_not_full
peak_load: ${v:groupRate}hps
Expand All @@ -98,43 +93,43 @@ endpoints:
tags:
type: create image
body: !str '{
"id":"${x:${p:image_created}.id}",
"groupId":"${x:${p:image_created}.groupId}",
"x":"${x:${p:y}.x}",
"b":"${x:${p:y}.b}",
"data":{
"subdata":"TEST-DATA"
"subdata":"X-DATA"
}
}'
provides:
image_create_data:
z:
query:
select: image_created # Puts in the whole object (id and groupId)
select: y # Puts in the whole object (id and groupId)
where: response.status == 200
send: if_not_full
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups

- method: PUT
url: http://localhost:${v:port}
body: !str '{
"id":"${p:group_create_data}",
"c":"${p:c}",
"data":{
"subdata":"UPDATED-TEST-DATA"
"subdata":"UPDATED-A-DATA"
}
}'
peak_load: ${v:groupRate}hps
logs:
test:
select: response.body.id
select: response.body.c

- method: PUT
url: http://localhost:${v:port}
body: !str '{
"id":"${x:${p:image_create_data}.id}",
"groupId":"${x:${p:image_create_data}.groupId}",
"x":"${x:${p:z}.x}",
"b":"${x:${p:z}.b}",
"data":{
"subdata":"UPDATED-TEST-DATA"
"subdata":"UPDATED-X-DATA"
}
}'
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
logs:
test:
select: response.body.id
select: response.body.x
56 changes: 47 additions & 9 deletions lib/channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
self.on_demand_events.notify(1);
}

/// notify all OnDemand with an event listener
fn notify_all_on_demand(&self) {
self.on_demand_events.notify(std::usize::MAX);
}

/// create a listener so an OnDemand can get notice when demand has been requested
/// (a receiver tried to receive but the queue was empty)
fn on_demand_listen(&self) -> EventListener {
Expand Down Expand Up @@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
impl<T: Serialize> Clone for Sender<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_sender_count();
info!("Sender::clone: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
Sender {
channel: self.channel.clone(),
listener: None,
Expand All @@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
impl<T: Serialize> Drop for Sender<T> {
fn drop(&mut self) {
let count = self.channel.decrement_sender_count();
info!("Sender::drop: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
if count == 0 {
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
self.channel.notify_all_receivers();
}
}
Expand Down Expand Up @@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
if self.no_receivers() {
self.listener = None;
debug!(
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
"Sink for Sender:poll_ready {} no_receivers, length: {}",
self.name(),
self.len()
);
Expand Down Expand Up @@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
impl<T: Serialize> Clone for Receiver<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Clone cloning channel {}, new count: {}",
self.channel.name, count
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
self.channel.name, count, on_demand_count
);
Receiver {
channel: self.channel.clone(),
Expand All @@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
impl<T: Serialize> Drop for Receiver<T> {
fn drop(&mut self) {
let new_count = self.channel.decrement_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Drop channel {}, new count: {}",
self.channel.name, new_count
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
self.channel.name, new_count, on_demand_count
);
if new_count == 0 {
// notify all senders so they will see there are no more receivers
Expand All @@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
"Receiver:Drop channel {}, notify_all_senders",
self.channel.name
);
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
self.channel.notify_all_on_demand();
self.channel.notify_all_senders();
}
}
Expand All @@ -576,8 +598,20 @@ impl<T: Serialize> Stream for Receiver<T> {
loop {
if let Some(listener) = self.listener.as_mut() {
match Pin::new(listener).poll(cx) {
Poll::Ready(()) => self.listener = None,
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
debug!(
"Receiver:poll_next channel {}, listener Poll::Ready, listener = None",
self.channel.name
);
self.listener = None;
}
Poll::Pending => {
debug!(
"Receiver:poll_next channel {}, listener Poll::Pending",
self.channel.name
);
return Poll::Pending;
}
}
}

Expand Down Expand Up @@ -675,6 +709,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
self.channel.len()
);
self.listener = None;
debug!(
"OnDemandReceiver::poll_next {} listener: None",
self.channel.name
);
return Poll::Ready(None);
}

Expand Down
Loading

0 comments on commit 0f4bc8a

Please sign in to comment.