Skip to content

Commit

Permalink
Add "schedule" annotation to NIF functions (#148)
Browse files Browse the repository at this point in the history
This is needed to classify most of the NIF functions as
possible "dirty".
For smaller sets, those functions should not take more
than 1 millisecond to perform. But for larger datasets,
it's probably going to take more time.

This follows the recommendations from:
https://www.erlang.org/doc/man/erl_nif.html#lengthy_work

Also see rusterlium/rustler#402
  • Loading branch information
philss authored Mar 31, 2022
1 parent f6c88ae commit 746b648
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 105 deletions.
66 changes: 33 additions & 33 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ macro_rules! df_read_read {
};
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
#[allow(clippy::too_many_arguments)]
pub fn df_read_csv(
filename: &str,
Expand Down Expand Up @@ -96,14 +96,14 @@ fn dtype_from_str(dtype: &str) -> Result<DataType, ExplorerError> {
}
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_parquet(filename: &str) -> Result<ExDataFrame, ExplorerError> {
let f = File::open(filename)?;
let df = ParquetReader::new(f).finish()?;
Ok(ExDataFrame::new(df))
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_parquet(data: ExDataFrame, filename: &str) -> Result<(), ExplorerError> {
df_read!(data, df, {
let file = File::create(filename).expect("could not create file");
Expand All @@ -112,7 +112,7 @@ pub fn df_write_parquet(data: ExDataFrame, filename: &str) -> Result<(), Explore
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_to_csv(
data: ExDataFrame,
has_headers: bool,
Expand All @@ -130,7 +130,7 @@ pub fn df_to_csv(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_to_csv_file(
data: ExDataFrame,
filename: &str,
Expand All @@ -147,14 +147,14 @@ pub fn df_to_csv_file(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ipc(filename: &str) -> Result<ExDataFrame, ExplorerError> {
let f = File::open(filename)?;
let df = IpcReader::new(f).finish()?;
Ok(ExDataFrame::new(df))
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_ipc(data: ExDataFrame, filename: &str) -> Result<(), ExplorerError> {
df_read!(data, df, {
let mut file = File::create(filename).expect("could not create file");
Expand All @@ -163,7 +163,7 @@ pub fn df_write_ipc(data: ExDataFrame, filename: &str) -> Result<(), ExplorerErr
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ndjson(
filename: &str,
infer_schema_length: Option<usize>,
Expand All @@ -180,7 +180,7 @@ pub fn df_read_ndjson(
Ok(ExDataFrame::new(df))
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_ndjson(data: ExDataFrame, filename: &str) -> Result<(), ExplorerError> {
df_read!(data, df, {
let file = File::create(filename).expect("could not create file");
Expand All @@ -189,12 +189,12 @@ pub fn df_write_ndjson(data: ExDataFrame, filename: &str) -> Result<(), Explorer
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyIo")]
pub fn df_as_str(data: ExDataFrame) -> Result<String, ExplorerError> {
df_read!(data, df, { Ok(format!("{:?}", df)) })
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_from_map_rows(
rows: Vec<HashMap<Term, Option<ExAnyValue>>>,
) -> Result<ExDataFrame, ExplorerError> {
Expand Down Expand Up @@ -259,7 +259,7 @@ fn case_insensitive_sort(strings: &mut Vec<String>) {
strings.sort_by(|a, b| a.to_lowercase().cmp(&b.to_lowercase()))
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_from_keyword_rows(
rows: Vec<Vec<(Term, Option<ExAnyValue>)>>,
) -> Result<ExDataFrame, ExplorerError> {
Expand Down Expand Up @@ -315,7 +315,7 @@ pub fn df_fill_none(data: ExDataFrame, strategy: &str) -> Result<ExDataFrame, Ex
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_join(
data: ExDataFrame,
other: ExDataFrame,
Expand All @@ -342,7 +342,7 @@ pub fn df_join(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_get_columns(data: ExDataFrame) -> Result<Vec<ExSeries>, ExplorerError> {
df_read!(data, df, {
Ok(to_ex_series_collection(df.get_columns().clone()))
Expand Down Expand Up @@ -389,14 +389,14 @@ pub fn df_hstack(data: ExDataFrame, cols: Vec<ExSeries>) -> Result<ExDataFrame,
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_vstack(data: ExDataFrame, other: ExDataFrame) -> Result<ExDataFrame, ExplorerError> {
df_read_read!(data, other, df, df1, {
Ok(ExDataFrame::new(df.vstack(&df1.clone())?))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_drop_nulls(
data: ExDataFrame,
subset: Option<Vec<String>>,
Expand All @@ -407,23 +407,23 @@ pub fn df_drop_nulls(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_drop(data: ExDataFrame, name: &str) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let new_df = (&*df).drop(name)?;
Ok(ExDataFrame::new(new_df))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_select_at_idx(data: ExDataFrame, idx: usize) -> Result<Option<ExSeries>, ExplorerError> {
df_read!(data, df, {
let result = df.select_at_idx(idx).map(|s| ExSeries::new(s.clone()));
Ok(result)
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_column(data: ExDataFrame, name: &str) -> Result<ExSeries, ExplorerError> {
df_read!(data, df, {
let series = df.column(name).map(|s| ExSeries::new(s.clone()))?;
Expand All @@ -439,7 +439,7 @@ pub fn df_select(data: ExDataFrame, selection: Vec<&str>) -> Result<ExDataFrame,
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_filter(data: ExDataFrame, mask: ExSeries) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let filter_series = &mask.resource.0;
Expand All @@ -461,7 +461,7 @@ pub fn df_take(data: ExDataFrame, indices: Vec<u32>) -> Result<ExDataFrame, Expl
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_sort(
data: ExDataFrame,
by_column: &str,
Expand All @@ -473,7 +473,7 @@ pub fn df_sort(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_slice(
data: ExDataFrame,
offset: i64,
Expand All @@ -485,28 +485,28 @@ pub fn df_slice(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_head(data: ExDataFrame, length: Option<usize>) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let new_df = df.head(length);
Ok(ExDataFrame::new(new_df))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_tail(data: ExDataFrame, length: Option<usize>) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let new_df = df.tail(length);
Ok(ExDataFrame::new(new_df))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_clone(data: ExDataFrame) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, { Ok(ExDataFrame::new(df.clone())) })
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_melt(
data: ExDataFrame,
id_vars: Vec<&str>,
Expand All @@ -518,7 +518,7 @@ pub fn df_melt(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_drop_duplicates(
data: ExDataFrame,
maintain_order: bool,
Expand All @@ -533,15 +533,15 @@ pub fn df_drop_duplicates(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_to_dummies(data: ExDataFrame) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let new_df = df.to_dummies()?;
Ok(ExDataFrame::new(new_df))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_with_column(data: ExDataFrame, col: ExSeries) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let mut new_df = df.clone();
Expand All @@ -557,7 +557,7 @@ pub fn df_new(cols: Vec<ExSeries>) -> Result<ExDataFrame, ExplorerError> {
Ok(ExDataFrame::new(df))
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_set_column_names(
data: ExDataFrame,
names: Vec<&str>,
Expand All @@ -569,15 +569,15 @@ pub fn df_set_column_names(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_groups(data: ExDataFrame, groups: Vec<&str>) -> Result<ExDataFrame, ExplorerError> {
df_read!(data, df, {
let groups = df.groupby(groups)?.groups()?;
Ok(ExDataFrame::new(groups))
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_groupby_agg(
data: ExDataFrame,
groups: Vec<&str>,
Expand All @@ -589,7 +589,7 @@ pub fn df_groupby_agg(
})
}

#[rustler::nif]
#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_pivot_wider(
data: ExDataFrame,
id_cols: Vec<&str>,
Expand Down
Loading

0 comments on commit 746b648

Please sign in to comment.