diff --git a/src/conspiracies/docprocessing/docprocessor.py b/src/conspiracies/docprocessing/docprocessor.py index 8573a8d..8873f18 100644 --- a/src/conspiracies/docprocessing/docprocessor.py +++ b/src/conspiracies/docprocessing/docprocessor.py @@ -23,7 +23,6 @@ def _build_coref_pipeline(self): "safe_fastcoref", config={ "enable_progress_bar": False, - "model_architecture": "LingMessCoref", "device": ( "cuda" if self.prefer_gpu_for_coref and torch.cuda.is_available() @@ -127,41 +126,54 @@ def _set_user_data_on_docs(docs: Iterator[Tuple[Doc, Document]]) -> Iterator[Doc yield doc def _store_doc_bins(self, docs: Iterator[Doc], output_path: Path): + # FIXME: paths should be given elsewhere and not be inferred like this output_dir = Path(os.path.dirname(output_path)) / "spacy_docs" output_dir.mkdir(parents=True, exist_ok=True) + prev_doc_bins = glob( + (Path(os.path.dirname(output_path)) / "spacy_docs").as_posix() + "/*.bin", + ) + start_from = ( + max(int(os.path.basename(doc).replace(".bin", "")) for doc in prev_doc_bins) + if prev_doc_bins + else 0 + ) + size = self.doc_bin_size doc_bin = DocBin(store_user_data=True) - for i, doc in enumerate(docs, start=1): + for i, doc in enumerate(docs, start=start_from + 1): doc_bin.add(doc) if i % size == 0: - with open(output_dir / f"{i // size}.bin", "wb") as f: + with open(output_dir / f"{i}.bin", "wb") as f: f.write(doc_bin.to_bytes()) doc_bin = DocBin(store_user_data=True) yield doc + def _read_doc_bins(self, output_path: Path): + # FIXME: paths should be given elsewhere and not be inferred like this + count = 0 + for bin_file in glob( + (Path(os.path.dirname(output_path)) / "spacy_docs").as_posix() + "/*.bin", + ): + with open(bin_file, "rb") as bytes_data: + doc_bin = DocBin().from_bytes(bytes_data.read()) + for doc in doc_bin.get_docs(self.triplet_extraction_pipeline.vocab): + count += 1 + yield doc + print(f"Read {count} previously processed docs.") + def process_docs( self, docs: Iterable[Document], output_path: Path, continue_from_last=False, ): - if continue_from_last and os.path.exists(output_path): - already_processed = set() - - # FIXME: paths should be given elsewhere and not be inferred like this - for bin_file in glob( - (Path(os.path.dirname(output_path)) / "spacy_docs").as_posix() - + "/*.bin", - ): - with open(bin_file, "rb") as bytes_data: - doc_bin = DocBin().from_bytes(bytes_data.read()) - for doc in doc_bin.get_docs(self.triplet_extraction_pipeline.vocab): - id_ = doc.user_data["doc_metadata"]["id"] - already_processed.add(id_) - - print(f"Skipping {len(already_processed)} processed docs.") - docs = (doc for doc in docs if doc.id not in already_processed) + + if continue_from_last: + print( + "Reading previously processed documents! Disable 'continue_from_last' to avoid this.'", + ) + docs_to_jsonl(self._read_doc_bins(output_path), output_path) # The coreference pipeline tends to choke on too large batches because of an # extreme memory pressure, hence the small batch size @@ -184,10 +196,13 @@ def process_docs( with_user_data = self._set_user_data_on_docs(with_triplets) - stored = self._store_doc_bins(with_user_data, output_path) + docs_to_output = tqdm( + self._store_doc_bins(with_user_data, output_path), + desc="Processing documents", + ) docs_to_jsonl( - tqdm(stored), + docs_to_output, output_path, append=continue_from_last, ) diff --git a/src/conspiracies/run.py b/src/conspiracies/run.py index b2d0c15..a747f50 100644 --- a/src/conspiracies/run.py +++ b/src/conspiracies/run.py @@ -61,4 +61,11 @@ config = PipelineConfig.default_with_extra_config(cli_args) pipeline = Pipeline(config) + + logging.basicConfig( + level=logging.DEBUG, + filename=config.base.output_path + "/logfile", + filemode="w+", + ) + pipeline.run()