-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Stream.pmap, parallele map for streaming data #7
base: master
Are you sure you want to change the base?
Conversation
carriage/stream.py
Outdated
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | ||
''' | ||
if processes is None: | ||
processes = os.cpu_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這應該是 mp.Pool 內建就會做的事。應該維持 None 即可
carriage/stream.py
Outdated
''' | ||
if processes is None: | ||
processes = os.cpu_count() | ||
pool_chunk_size = max(chunk_size // processes, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看不懂爲何要除以 processes 數量。系統容許的 queue 大小應該跟 processes 個數無關。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我怕 queue 爆掉而已~
所以加了個每個 chunk 會是多少這件事~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我改成就傳給 Pool
好了~
carriage/stream.py
Outdated
if not data: | ||
break | ||
yield from pool.imap(f, data, pool_chunk_size) | ||
return Stream(iter(gen())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
要用 @as_stream 包裝比較好。
@as_stream 處理了不少事情,包含 StreamTable 與 Stream 的相容。這樣可以保證 Stream 所有 method 都可以在 StreamTable使用。
另外 iter() 應該是多餘的。
@dboyliao ☝️ 👆 |
Orz 我現在才看到~ |
Testing script updated and pass