-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WARNING: this class is currently experimental and does not have a stable API nor are all expected functions implemented.
- Loading branch information
Showing
3 changed files
with
129 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" | |
|
||
[project] | ||
name = "elody" | ||
version = "0.0.20" | ||
version = "0.0.21" | ||
description = "elody SDK for Python" | ||
readme = "README.md" | ||
authors = [{ name = "Inuits", email = "[email protected]" }] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import csv | ||
import re | ||
|
||
from io import StringIO | ||
from elody.exceptions import ColumnNotFoundException, IncorrectTypeException | ||
|
||
|
||
class CSVParser: | ||
def __init__(self, csvstring): | ||
self.csvstring = csvstring | ||
self.reader = csv.DictReader(self.__csv_string_to_file_object()) | ||
|
||
def _get_metadata_object(self, key, value, lang="en"): | ||
return { | ||
"key": key, | ||
"value": value, | ||
"lang": lang, | ||
} | ||
|
||
def _get_relation_object(self, type, key): | ||
return { | ||
"type": type, | ||
"key": key, | ||
} | ||
|
||
def _is_relation_field(self, field): | ||
if re.fullmatch("(has|is)[A-Z][a-z]+", field): | ||
return True | ||
return False | ||
|
||
def __csv_string_to_file_object(self): | ||
return StringIO(self.csvstring) | ||
|
||
|
||
class CSVSingleObject(CSVParser): | ||
def __init__(self, csvstring, object_type="entity"): | ||
super().__init__(csvstring) | ||
self.identifiers = list() | ||
self.metadata = list() | ||
self.object_type = object_type | ||
self.objects = list() | ||
self.relations = list() | ||
self.__init_fields() | ||
|
||
def get_entity(self): | ||
return self.__get_type("entity") | ||
|
||
def get_mediafile(self): | ||
return self.__get_type("mediafile") | ||
|
||
def __fill_metadata(self, key, value): | ||
if value: | ||
self.metadata.append(self._get_metadata_object(key, value)) | ||
|
||
def __fill_relations(self, type, key): | ||
if key: | ||
self.relations.append(self._get_relation_object(type, key)) | ||
|
||
def __get_type(self, type): | ||
if self.object_type != type: | ||
raise IncorrectTypeException(f"Not a {type}!") | ||
object = dict() | ||
for property_name, property in { | ||
"metadata": self.metadata, | ||
"relations": self.relations, | ||
"identifiers": self.identifiers, | ||
}.items(): | ||
if property: | ||
object[property_name] = property | ||
return object | ||
|
||
def __init_fields(self): | ||
for row in self.reader: | ||
for key, value in row.items(): | ||
if self._is_relation_field(key): | ||
self.__fill_relations(key, value) | ||
else: | ||
self.__fill_metadata(key, value) | ||
|
||
|
||
class CSVMultiObject(CSVParser): | ||
def __init__(self, csvstring, index_mapping={"entities": "entity_id"}): | ||
super().__init__(csvstring) | ||
self.index_mapping = index_mapping | ||
self.objects = dict() | ||
self.__fill_objects_from_csv() | ||
|
||
def get_entities(self): | ||
return self.objects.get("entities", list()) | ||
|
||
def get_mediafiles(self): | ||
return self.objects.get("mediafiles", list()) | ||
|
||
def __fill_objects_from_csv(self): | ||
indexed_dict = dict() | ||
for row in self.reader: | ||
if not all(x in row.keys() for x in self.index_mapping.values()): | ||
raise ColumnNotFoundException( | ||
f"Not all identifying columns are present in CSV" | ||
) | ||
for type, identifying_column in self.index_mapping.items(): | ||
id = row[identifying_column] | ||
if type not in indexed_dict: | ||
indexed_dict[type] = dict() | ||
if id not in indexed_dict[type]: | ||
indexed_dict[type][id] = dict() | ||
for root_property in ["metadata", "relations", "identifiers"]: | ||
if root_property not in indexed_dict[type][id]: | ||
indexed_dict[type][id][root_property] = list() | ||
for key, value in row.items(): | ||
if self._is_relation_field(key) and value: | ||
indexed_dict[type][id]["relations"].append( | ||
self._get_relation_object(key, value) | ||
) | ||
elif key not in self.index_mapping.values() and value: | ||
indexed_dict[type][id]["metadata"].append( | ||
self._get_metadata_object(key, value) | ||
) | ||
for metadata_type, objects in indexed_dict.items(): | ||
self.objects[metadata_type] = list(objects.values()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters