Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA]: Add parquet support to write_to_file_stage.py #980

Closed
2 tasks done
tgrunzweig-cpacket opened this issue Jun 9, 2023 · 3 comments · Fixed by #1937
Closed
2 tasks done

[FEA]: Add parquet support to write_to_file_stage.py #980

tgrunzweig-cpacket opened this issue Jun 9, 2023 · 3 comments · Fixed by #1937
Assignees
Labels
external This issue was filed by someone outside of the Morpheus team feature request New feature or request

Comments

@tgrunzweig-cpacket
Copy link

tgrunzweig-cpacket commented Jun 9, 2023

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

Medium

Please provide a clear description of problem this feature solves

I want morpheus to write inference outputs in parquet format.

The current implementation of write_to_file_stage.py has some JSON and csv specific instructions in _convert_to_strings(), and raises NotImplementedError for other file types.

The inference outputs files in DFP can end up being pretty big, and are likely further processed, so a machine readable efficient format (like parquet) would make deployment more efficient.

Describe your ideal solution

write_to_file_stage.py

           def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):

                # Ensure our directory exists
                os.makedirs(os.path.realpath(os.path.dirname(self._output_file)), exist_ok=True)



                    def write_to_file(x: MessageMeta):
                        
                     # new
                      if(path.suffix(self._output_file) == '.parquet'):
                        x.df.to_paruqet(self._output_file)

                      else:
                        # Open up the file handle
                         with open(self._output_file, "a") as out_file:
                        # end new
                              lines = self._convert_to_strings(x.df)
                ....

Describe any alternatives you have considered

No response

Additional context

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct
  • I have searched the open feature requests and have found no duplicates for this feature request
@tgrunzweig-cpacket tgrunzweig-cpacket added the feature request New feature or request label Jun 9, 2023
@jarmak-nv jarmak-nv added the Needs Triage Need team to review and classify label Jun 9, 2023
@jarmak-nv
Copy link
Contributor

Hi @Tzahi-cpacket!

Thanks for submitting this issue - our team has been notified and we'll get back to you as soon as we can!
In the mean time, feel free to add any relevant information to this issue.

@mdemoret-nv
Copy link
Contributor

@Tzahi-cpacket After looking into this more, one issue is that our write_to_file_stage.py will iteratively write to files as messages are received and keep the file stream open for the duration of the stage. With CSV and JSON lines format, this works well to offload data to disk. However, the only library I found that supports appending to Parquet files is fastparquet. So this leaves a few options:

  1. Keep all messages in memory until pipeline shutdown and then concat them into a single DataFrame which can be written to disk
  2. Write to parquet using the fastparquet engine in pandas
    1. This would require converting every DataFrame from GPU->CPU before writing to disk which will have a performance penalty
  3. Write to a partitioned parquet dataset
    1. Each message would be treated as a different partition
    2. A rough outline of how this would work can be found here: https://docs.rapids.ai/api/cudf/legacy/api_docs/api/cudf.io.parquet.parquetdatasetwriter/

Are any of these options preferable over another?

@tgrunzweig-cpacket
Copy link
Author

tgrunzweig-cpacket commented Jul 5, 2023 via email

@jarmak-nv jarmak-nv added the external This issue was filed by someone outside of the Morpheus team label Sep 5, 2023
@mdemoret-nv mdemoret-nv removed the Needs Triage Need team to review and classify label Dec 13, 2023
@mdemoret-nv mdemoret-nv added this to the 24.10 - Release milestone Aug 18, 2024
@morpheus-bot-test morpheus-bot-test bot moved this from Todo to Review - Ready for Review in Morpheus Boards Oct 9, 2024
@rapids-bot rapids-bot bot closed this as completed in d8041ba Nov 22, 2024
@github-project-automation github-project-automation bot moved this from Review - Ready for Review to Done in Morpheus Boards Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
external This issue was filed by someone outside of the Morpheus team feature request New feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

5 participants