Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try hang scripting #178

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/provider_collect.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Run this file with the test-server, then specify the PORT as a parameter to this try or run script.

vars:
port: ${e:PORT}
load_pattern:
- !linear
from: 100%
to: 100%
over: 5s

providers:
a: !range
b: !response
buffer: 10
c: !response
d: !response
e: !response
f: !response

loggers:
test:
to: stderr

endpoints:
- method: POST
url: http://localhost:${v:port}
body: !str '{"a": ${p:a}}'
provides:
b:
query:
select: response.body.a
send: block
# on_demand: true

- method: POST
declare:
c: !c
collects:
- take: 10
from: ${p:b}
as: _c
then: ${p:_c}
url: http://localhost:${v:port}
body: !str '{"c": ${p:c}}'
# peak_load: 1hps
provides:
d:
query:
# select: response.body.c
select: for_each[0]
for_each:
- response.body.c
send: block
on_demand: true

- method: POST
url: http://localhost:${v:port}
body: !str '{"d": ${p:d}}'
peak_load: 1hps
logs:
test:
select: response.body.d
4 changes: 2 additions & 2 deletions examples/provider_loop_with_counter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ providers:
a: !range
b: !response
c: !response
d: !response

loggers:
test:
Expand All @@ -41,8 +40,9 @@ endpoints:

- method: POST
url: http://localhost:${v:port}
body: !str '{"b": ${x:${p:b}.value}}'
body: !str '{"b": ${x:${p:b}.value}, "counter": ${x:${p:b}.counter}}'
peak_load: 5hps
# on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves
provides:
b:
query:
Expand Down
71 changes: 33 additions & 38 deletions examples/provider_spread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

vars:
port: "${e:PORT}"
createName: 'test:loadtest-'
groupRate: 1
imagesPerGroup: 10

Expand All @@ -13,20 +12,18 @@ load_pattern:
over: 5s

providers:
group_range: # Counter for creating groups
a: # Counter for creating groups
!range
image_range: # counter for creating images to put in groups
x: # counter for creating images to put in groups
!range
group_created: # to continue the group APIs
b: # to continue the group APIs
!response
group_created_for_images: # to create images
b2: # to create images
!response
image_created: # to continue the image APIs
y: # to continue the image APIs
!response
group_create_data: !response
image_create_data: !response
group_update_data: !response
image_update_data: !response
c: !response
z: !response

loggers:
test:
Expand All @@ -38,18 +35,17 @@ endpoints:
tags:
type: create group
body: !str '{
"id":"${v:createName}${x:start_pad(${p:group_range}, 6, "0")}",
"name":"TEST-GROUP"
"a":"${x:start_pad(${p:a}, 6, "0")}"
}'
provides:
group_created:
b:
query:
select: response.body.id
select: response.body.a
where: response.status == 200 || response.status == 409
send: if_not_full
group_created_for_images:
b2:
query:
select: response.body.id
select: response.body.a
for_each:
- repeat(_v.imagesPerGroup) # We need to create X copies so each image will have one
where: response.status == 200 || response.status == 409
Expand All @@ -61,16 +57,15 @@ endpoints:
tags:
type: create image
body: !str '{
"id":"${v:createName}${x:start_pad(${p:image_range}, 8, "0")}",
"groupId":"${p:group_created_for_images}",
"name":"TEST-IMAGE"
"x":"${x:start_pad(${p:x}, 8, "0")}",
"b":"${p:b2}"
}'
provides:
image_created:
y:
query:
select:
id: response.body.id
groupId: response.body.groupId
x: response.body.x
b: response.body.b
where: response.status == 200 || response.status == 409
send: if_not_full
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
Expand All @@ -80,15 +75,15 @@ endpoints:
tags:
type: create group data
body: !str '{
"id":"${p:group_created}",
"b":"${p:b}",
"data":{
"subdata":"TEST-DATA"
"subdata":"A-DATA"
}
}'
provides:
group_create_data:
c:
query:
select: group_created
select: response.body.b
where: response.status == 200
send: if_not_full
peak_load: ${v:groupRate}hps
Expand All @@ -98,43 +93,43 @@ endpoints:
tags:
type: create image
body: !str '{
"id":"${x:${p:image_created}.id}",
"groupId":"${x:${p:image_created}.groupId}",
"x":"${x:${p:y}.x}",
"b":"${x:${p:y}.b}",
"data":{
"subdata":"TEST-DATA"
"subdata":"X-DATA"
}
}'
provides:
image_create_data:
z:
query:
select: image_created # Puts in the whole object (id and groupId)
select: y # Puts in the whole object (id and groupId)
where: response.status == 200
send: if_not_full
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups

- method: PUT
url: http://localhost:${v:port}
body: !str '{
"id":"${p:group_create_data}",
"c":"${p:c}",
"data":{
"subdata":"UPDATED-TEST-DATA"
"subdata":"UPDATED-A-DATA"
}
}'
peak_load: ${v:groupRate}hps
logs:
test:
select: response.body.id
select: response.body.c

- method: PUT
url: http://localhost:${v:port}
body: !str '{
"id":"${x:${p:image_create_data}.id}",
"groupId":"${x:${p:image_create_data}.groupId}",
"x":"${x:${p:z}.x}",
"b":"${x:${p:z}.b}",
"data":{
"subdata":"UPDATED-TEST-DATA"
"subdata":"UPDATED-X-DATA"
}
}'
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
logs:
test:
select: response.body.id
select: response.body.x
56 changes: 47 additions & 9 deletions lib/channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
self.on_demand_events.notify(1);
}

/// notify all OnDemand with an event listener
fn notify_all_on_demand(&self) {
self.on_demand_events.notify(std::usize::MAX);
}

/// create a listener so an OnDemand can get notice when demand has been requested
/// (a receiver tried to receive but the queue was empty)
fn on_demand_listen(&self) -> EventListener {
Expand Down Expand Up @@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
impl<T: Serialize> Clone for Sender<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_sender_count();
info!("Sender::clone: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
Sender {
channel: self.channel.clone(),
listener: None,
Expand All @@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
impl<T: Serialize> Drop for Sender<T> {
fn drop(&mut self) {
let count = self.channel.decrement_sender_count();
info!("Sender::drop: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
if count == 0 {
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
self.channel.notify_all_receivers();
}
}
Expand Down Expand Up @@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
if self.no_receivers() {
self.listener = None;
debug!(
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
"Sink for Sender:poll_ready {} no_receivers, length: {}",
self.name(),
self.len()
);
Expand Down Expand Up @@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
impl<T: Serialize> Clone for Receiver<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Clone cloning channel {}, new count: {}",
self.channel.name, count
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
self.channel.name, count, on_demand_count
);
Receiver {
channel: self.channel.clone(),
Expand All @@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
impl<T: Serialize> Drop for Receiver<T> {
fn drop(&mut self) {
let new_count = self.channel.decrement_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Drop channel {}, new count: {}",
self.channel.name, new_count
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
self.channel.name, new_count, on_demand_count
);
if new_count == 0 {
// notify all senders so they will see there are no more receivers
Expand All @@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
"Receiver:Drop channel {}, notify_all_senders",
self.channel.name
);
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
self.channel.notify_all_on_demand();
self.channel.notify_all_senders();
}
}
Expand All @@ -576,8 +598,20 @@ impl<T: Serialize> Stream for Receiver<T> {
loop {
if let Some(listener) = self.listener.as_mut() {
match Pin::new(listener).poll(cx) {
Poll::Ready(()) => self.listener = None,
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
debug!(
"Receiver:poll_next channel {}, listener Poll::Ready, listener = None",
self.channel.name
);
self.listener = None;
}
Poll::Pending => {
debug!(
"Receiver:poll_next channel {}, listener Poll::Pending",
self.channel.name
);
return Poll::Pending;
}
}
}

Expand Down Expand Up @@ -675,6 +709,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
self.channel.len()
);
self.listener = None;
debug!(
"OnDemandReceiver::poll_next {} listener: None",
self.channel.name
);
return Poll::Ready(None);
}

Expand Down
Loading
Loading