-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adding server for active components #526
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #526 +/- ##
==========================================
+ Coverage 81.21% 81.28% +0.06%
==========================================
Files 42 42
Lines 1826 1854 +28
Branches 1826 1854 +28
==========================================
+ Hits 1483 1507 +24
- Misses 269 270 +1
- Partials 74 77 +3 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 1 unresolved discussion (waiting on @lev-starkware)
crates/mempool_infra/src/component_server/local_component_server.rs
line 189 at r1 (raw file):
// let component_future = async move {component.start().await }.boxed(); let component_future = async move { component.start().await }; let request_responce_future = self.request_response_loop();
got a typo here
Suggestion:
request_response_future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 9 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
crates/mempool_infra/src/component_server/local_component_server.rs
line 187 at r1 (raw file):
async fn start(&mut self) { let mut component = self.component.clone(); // let component_future = async move {component.start().await }.boxed();
Please remove the comment.
Code quote:
// let component_future = async move {component.start().await }.boxed();
crates/mempool_infra/src/component_server/local_component_server.rs
line 199 at r1 (raw file):
} }; error!("Servers ended with unexpected Ok.");
Please change to singular phrasing (drop "s").
Code quote:
s
crates/mempool_infra/src/component_server/local_component_server.rs
line 211 at r1 (raw file):
tx.send(res).await.expect("Response connection should be open."); } }
Can this code be unified with the implementation of the "passive" server impl?
Code quote:
async fn request_response_loop(&mut self) {
while let Some(request_and_res_tx) = self.rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
let res = self.component.handle_request(request).await;
tx.send(res).await.expect("Response connection should be open.");
}
}
crates/mempool_infra/tests/active_component_server_client_test.rs
line 21 at r1 (raw file):
pub type ValueC = i64; pub(crate) type ResultC = ClientResult<ValueC>;
Is pub
required here?
Code quote:
pub type ValueC = i64;
pub(crate) type ResultC = ClientResult<ValueC>;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 57 at r1 (raw file):
Ok(()) } }
What does this chunk do? Why are there sleep
s here?
Code quote:
#[async_trait]
impl ComponentStarter for ComponentC {
async fn start(&mut self) -> Result<(), ComponentStartError> {
for _ in 0..10 {
sleep(Duration::from_millis(200)).await;
self.c_increment_value().await;
}
let val = self.c_get_value().await;
assert!(val >= 10);
let () = pending().await;
Ok(())
}
}
crates/mempool_infra/tests/active_component_server_client_test.rs
line 68 at r1 (raw file):
pub enum ComponentCResponse { Value(ValueC), }
The convention is to name both the request and the response variants after the function name. I understand there's a duplication in the response enum, but IMO that's preferred over breaking the convention.
You can also change the inc
function to not return the value.
Code quote:
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentCRequest {
CIncValue,
CGetValue,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentCResponse {
Value(ValueC),
}
crates/mempool_infra/tests/active_component_server_client_test.rs
line 73 at r1 (raw file):
pub(crate) trait ComponentCClientTrait: Send + Sync { async fn c_increment_value(&self) -> ResultC; async fn c_get_value(&self) -> ResultC;
Please change the trait fn names to match the enum variant names. See the mempool trait and enums:
pub trait MempoolClient: Send + Sync {
async fn add_tx(&self, mempool_input: MempoolInput) -> MempoolClientResult<()>;
async fn get_txs(&self, n_txs: usize) -> MempoolClientResult<Vec<ThinTransaction>>;
}
#[derive(Debug)]
pub enum MempoolRequest {
AddTransaction(MempoolInput),
GetTransactions(usize),
}
#[derive(Debug)]
pub enum MempoolResponse {
AddTransaction(MempoolResult<()>),
GetTransactions(MempoolResult<Vec<ThinTransaction>>),
}
Code quote:
async fn c_increment_value(&self) -> ResultC;
async fn c_get_value(&self) -> ResultC;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 170 at r1 (raw file):
// Wait for the components to finish incrementing of the ComponentC::value. sleep(Duration::from_millis(2100)).await;
Is there a way to avoid sleeping in tests? This results in a sub-optimal CI (sleep for too short time -- ci randomly fails, sleep for too long -- redundant time).
For example, can the test run until it receives the expected value?
@uriel-starkware has done something similar in a previous test IIRC.
Code quote:
// Wait for the components to finish incrementing of the ComponentC::value.
sleep(Duration::from_millis(2100)).await;
ce0b63d
to
735b962
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 9 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @uriel-starkware)
crates/mempool_infra/src/component_server/local_component_server.rs
line 187 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please remove the comment.
Done.
crates/mempool_infra/src/component_server/local_component_server.rs
line 189 at r1 (raw file):
Previously, uriel-starkware wrote…
got a typo here
Done.
crates/mempool_infra/src/component_server/local_component_server.rs
line 199 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please change to singular phrasing (drop "s").
Done.
crates/mempool_infra/src/component_server/local_component_server.rs
line 211 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Can this code be unified with the implementation of the "passive" server impl?
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 21 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Is
pub
required here?
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 57 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
What does this chunk do? Why are there
sleep
s here?
I want to change the value from both the component itself and the other component. I changed the test, so there is no sleep now.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 68 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
The convention is to name both the request and the response variants after the function name. I understand there's a duplication in the response enum, but IMO that's preferred over breaking the convention.
You can also change the
inc
function to not return the value.
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 73 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please change the trait fn names to match the enum variant names. See the mempool trait and enums:
pub trait MempoolClient: Send + Sync { async fn add_tx(&self, mempool_input: MempoolInput) -> MempoolClientResult<()>; async fn get_txs(&self, n_txs: usize) -> MempoolClientResult<Vec<ThinTransaction>>; } #[derive(Debug)] pub enum MempoolRequest { AddTransaction(MempoolInput), GetTransactions(usize), } #[derive(Debug)] pub enum MempoolResponse { AddTransaction(MempoolResult<()>), GetTransactions(MempoolResult<Vec<ThinTransaction>>), }
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 170 at r1 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Is there a way to avoid sleeping in tests? This results in a sub-optimal CI (sleep for too short time -- ci randomly fails, sleep for too long -- redundant time).
For example, can the test run until it receives the expected value?
@uriel-starkware has done something similar in a previous test IIRC.
Changed test. No sleep anymore.
6f09c1e
to
62f0548
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r2, all commit messages.
Reviewable status: 1 of 3 files reviewed, 9 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 35 at r3 (raw file):
value: Arc::new(Mutex::new(value)), test_end_count: Arc::new(Mutex::new(0)), test_loop_count,
Please rename this fields to be more indicative, IIUC:
test_end_count
-> max_iterations
test_loop_count
-> current_iteration
Code quote:
test_end_count: Arc::new(Mutex::new(0)),
test_loop_count,
crates/mempool_infra/tests/active_component_server_client_test.rs
line 54 at r3 (raw file):
*self.test_end_count.lock().await } }
I'm slightly confused regarding what this component does and what is being tested. If I understand correctly the main point is to test the loop runs successfully. If so, do we still need the value
variable? I'd prefer this test to be as simple as possible while still capturing the gist of things, i.e., the concurrency of a component.
Code quote:
impl ComponentC {
pub fn new(value: ValueC, test_loop_count: ValueC) -> Self {
Self {
value: Arc::new(Mutex::new(value)),
test_end_count: Arc::new(Mutex::new(0)),
test_loop_count,
}
}
pub async fn c_get_value(&self) -> ValueC {
*self.value.lock().await
}
pub async fn c_increment_value(&self) {
*self.value.lock().await += 1;
}
pub async fn increment_test_end_count(&self) {
*self.test_end_count.lock().await += 1;
}
pub async fn get_test_end_count(&self) -> ValueC {
*self.test_end_count.lock().await
}
}
crates/mempool_infra/tests/active_component_server_client_test.rs
line 60 at r3 (raw file):
async fn start(&mut self) -> Result<(), ComponentStartError> { for _ in 0..self.test_loop_count { task::yield_now().await;
Is explicitly yielding required? Doesn't the await
in the following line achieve the same purpose?
Code quote:
task::yield_now().await;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 64 at r3 (raw file):
} let val = self.c_get_value().await; assert!(val >= self.test_loop_count);
Why is this assert needed? What does it test?
Code quote:
assert!(val >= self.test_loop_count);
crates/mempool_infra/tests/active_component_server_client_test.rs
line 127 at r3 (raw file):
assert!(val >= self.test_loop_count); self.d_send_test_end().await; let () = pending().await;
What does this do?
Code quote:
let () = pending().await;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 194 at r3 (raw file):
let c_client = LocalComponentClient::new(tx_c); let start_time = Instant::now();
Why does the test / function have to verify that their run times are shorter than 2 seconds?
Code quote:
let start_time = Instant::now();
crates/mempool_infra/tests/active_component_server_client_test.rs
line 200 at r3 (raw file):
while start_time.elapsed() < time_out && test_end_count < 2 { test_end_count = c_client.c_get_test_end_count().await.unwrap(); task::yield_now().await;
Redundant?
Code quote:
task::yield_now().await;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 8 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @lev-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 60 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Is explicitly yielding required? Doesn't the
await
in the following line achieve the same purpose?
await does not == yield. It only means there's a chance of yielding in the interior code. The yielding happens when the program waits for an IO operation/lock/, in case of explicitly yields with the above command and when the task ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 9 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @lev-starkware)
a discussion (no related file):
As a test that verifies concurrency without relying on sleeps and yields, I think this needs to be carefully considered.
I suggest a different approach that will capture this idea:
- Construct a channel connecting from the main test code to Component C that transfers a command to increment an inner member value
- Component C will have a while loop listening to this channel
- Component C is the server that returns the inner value
- Component D queries the Component C client for the value
- Do interleaving instructions of sending commands for increment through the special channel and querying with the D component for the value
- Expect each time to have a growing number and do it for X times (doesn't really matter how many, just don't do it less than 3, I think)
This is just an idea. you will not need any sleep nor yields and it will showcase that both tasks are running in concurrent and will be more succinct
b129b2f
to
93f4f2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 9 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @lev-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 60 at r3 (raw file):
Previously, uriel-starkware wrote…
await does not == yield. It only means there's a chance of yielding in the interior code. The yielding happens when the program waits for an IO operation/lock/, in case of explicitly yields with the above command and when the task ends.
For some reason, parts of my comment were deleted and made nonsensical. If unclear, I'll explain but long story short .await
!= yield()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 9 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @uriel-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 35 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please rename this fields to be more indicative, IIUC:
test_end_count
->max_iterations
test_loop_count
->current_iteration
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 54 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
I'm slightly confused regarding what this component does and what is being tested. If I understand correctly the main point is to test the loop runs successfully. If so, do we still need the
value
variable? I'd prefer this test to be as simple as possible while still capturing the gist of things, i.e., the concurrency of a component.
Explained.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 64 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Why is this assert needed? What does it test?
After incrementing the counter N times in this task and also incrementing it by another task the counter value should be at least N.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 127 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
What does this do?
Mimicking real start function.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 194 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Why does the test / function have to verify that their run times are shorter than 2 seconds?
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 200 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Redundant?
Changed to 1 milisec delay. In any case, it's done to lower CPU consumption by the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r2, all commit messages.
Reviewable status: 2 of 3 files reviewed, 7 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @uriel-starkware from a discussion.
Reviewable status: 2 of 3 files reviewed, 10 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 64 at r3 (raw file):
Previously, lev-starkware wrote…
After incrementing the counter N times in this task and also incrementing it by another task the counter value should be at least N.
But it's not part of the test, it's part of the component. Is it a sanity check you added for debugging? If so, please remove this, it's confusing imo.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 37 at r4 (raw file):
max_iterations, c_test_ended: Arc::new(Mutex::new(false)), d_test_ended: Arc::new(Mutex::new(false)),
I'd like all the names to be within the same "domain name":
test_counter
-> counter
or current_iteration
max_iterations
-> max_count
or max_iterations
I prefer counter
and max_count
, but current_iteration
and max_iterations
also work.
Also, please rename these to reflect their purpose
c_test_ended
-> internal_invocations_completed
d_test_ended
-> external_invocations_completed
Code quote:
test_counter: Arc::new(Mutex::new(init_value)),
max_iterations,
c_test_ended: Arc::new(Mutex::new(false)),
d_test_ended: Arc::new(Mutex::new(false)),
crates/mempool_infra/tests/active_component_server_client_test.rs
line 73 at r4 (raw file):
self.c_set_c_test_end().await; // Mimicing real start function that should not return.
Typo.
Code quote:
Mimicing
crates/mempool_infra/tests/active_component_server_client_test.rs
line 100 at r4 (raw file):
async fn c_get_value(&self) -> ResultC; async fn c_set_d_test_end(&self) -> ClientResult<()>; async fn c_test_end_check(&self) -> ClientResult<bool>;
Please be consistent with type definitions: Either wrap each return type with a designated type, or none of them. I think the latter is preferred here, since ResultC
is used only once.
Code quote:
async fn c_inc_value(&self) -> ClientResult<()>;
async fn c_get_value(&self) -> ResultC;
async fn c_set_d_test_end(&self) -> ClientResult<()>;
async fn c_test_end_check(&self) -> ClientResult<bool>;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 100 at r4 (raw file):
async fn c_get_value(&self) -> ResultC; async fn c_set_d_test_end(&self) -> ClientResult<()>; async fn c_test_end_check(&self) -> ClientResult<bool>;
Please rename the functions / enum variants / inner variables consistently.
Code quote:
async fn c_inc_value(&self) -> ClientResult<()>;
async fn c_get_value(&self) -> ResultC;
async fn c_set_d_test_end(&self) -> ClientResult<()>;
async fn c_test_end_check(&self) -> ClientResult<bool>;
crates/mempool_infra/tests/active_component_server_client_test.rs
line 136 at r4 (raw file):
self.d_send_test_end().await; // Mimicing real start function that should not return.
Typo.
Code quote:
Mimicing
crates/mempool_infra/tests/active_component_server_client_test.rs
line 210 at r4 (raw file):
while !test_ended { test_ended = c_client.c_test_end_check().await.unwrap(); sleep(delay).await; // Lower CPU usage.
Please remove all artificial "delays" from the test.
The focus of this test is about the components making progress in some order, and not about forcing the scheduler / runtime to behave in a specific manner.
We will not have such delays in production code and as such should avoid relying on them in tests.
For (slightly) faster task swapping you can reduce the channel size from 32 to 1. This effectively blocks a task from sending a message until the previously sent one is read, resulting in more frequent yields. In theory, this helps the component behind acquire the runtime and finish its iterations, avoiding the potentially-wasteful "busy-wait" completion loop.
Also, you can reduce the iteration counter from 1024 to channel_size * 3, that should achieve the same effect with a faster execution time.
Code quote:
sleep(delay).await; // Lower CPU usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 3 files reviewed, 10 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
a discussion (no related file):
Previously, uriel-starkware wrote…
As a test that verifies concurrency without relying on sleeps and yields, I think this needs to be carefully considered.
I suggest a different approach that will capture this idea:
- Construct a channel connecting from the main test code to Component C that transfers a command to increment an inner member value
- Component C will have a while loop listening to this channel
- Component C is the server that returns the inner value
- Component D queries the Component C client for the value
- Do interleaving instructions of sending commands for increment through the special channel and querying with the D component for the value
- Expect each time to have a growing number and do it for X times (doesn't really matter how many, just don't do it less than 3, I think)
This is just an idea. you will not need any sleep nor yields and it will showcase that both tasks are running in concurrent and will be more succinct
Essentially you're suggesting that D only queries the value instead of also incrementing it. I think there's a subtle point that might be missed by this setting though: if there's a bug such that D is blocked until C finishes, then in the suggested setting the test will pass, while actually we'd like it to fail. In the current setting, where C queries itself, it is assured that D has completed and that they worked concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 3 files reviewed, 11 unresolved discussions (waiting on @lev-starkware and @uriel-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 20 at r4 (raw file):
use tokio::time::{sleep, Duration}; type ValueC = i64;
Re-reading this, IMO this type definition is redundant. It was originally added to the other test components to test that each component has its own value type. When's there only a single value at play it is just redundant.
Please remove ValueC
and use the actual type. It should also be an unsigned type, I think usize
fits best here as it is used a counter.
Code quote:
type ValueC = i64;
commit-id:fa8f56cc
93f4f2a
to
f943d3d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 3 files reviewed, 9 unresolved discussions (waiting on @Itay-Tsabary-Starkware and @uriel-starkware)
crates/mempool_infra/tests/active_component_server_client_test.rs
line 64 at r3 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
But it's not part of the test, it's part of the component. Is it a sanity check you added for debugging? If so, please remove this, it's confusing imo.
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 20 at r4 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Re-reading this, IMO this type definition is redundant. It was originally added to the other test components to test that each component has its own value type. When's there only a single value at play it is just redundant.
Please removeValueC
and use the actual type. It should also be an unsigned type, I thinkusize
fits best here as it is used a counter.
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 37 at r4 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
I'd like all the names to be within the same "domain name":
test_counter
->counter
orcurrent_iteration
max_iterations
->max_count
ormax_iterations
I prefer
counter
andmax_count
, butcurrent_iteration
andmax_iterations
also work.Also, please rename these to reflect their purpose
c_test_ended
->internal_invocations_completed
d_test_ended
->external_invocations_completed
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 100 at r4 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please be consistent with type definitions: Either wrap each return type with a designated type, or none of them. I think the latter is preferred here, since
ResultC
is used only once.
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 100 at r4 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please rename the functions / enum variants / inner variables consistently.
Done.
crates/mempool_infra/tests/active_component_server_client_test.rs
line 210 at r4 (raw file):
Previously, Itay-Tsabary-Starkware wrote…
Please remove all artificial "delays" from the test.
The focus of this test is about the components making progress in some order, and not about forcing the scheduler / runtime to behave in a specific manner.
We will not have such delays in production code and as such should avoid relying on them in tests.For (slightly) faster task swapping you can reduce the channel size from 32 to 1. This effectively blocks a task from sending a message until the previously sent one is read, resulting in more frequent yields. In theory, this helps the component behind acquire the runtime and finish its iterations, avoiding the potentially-wasteful "busy-wait" completion loop.
Also, you can reduce the iteration counter from 1024 to channel_size * 3, that should achieve the same effect with a faster execution time.
Done.
This change is