From e031c501707ddfae52a1e6d9034df782353af1dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 11 Dec 2023 11:25:44 -0300 Subject: [PATCH] feat(databases): include CIHA database --- pysus/ftp/databases/ciha.py | 97 +++++++++++++++++++++++++++++++++++ pysus/ftp/databases/pni.py | 4 +- pysus/ftp/databases/sia.py | 4 +- pysus/ftp/databases/sih.py | 4 +- pysus/ftp/databases/sim.py | 4 +- pysus/ftp/databases/sinan.py | 7 +-- pysus/ftp/databases/sinasc.py | 4 +- 7 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 pysus/ftp/databases/ciha.py diff --git a/pysus/ftp/databases/ciha.py b/pysus/ftp/databases/ciha.py new file mode 100644 index 0000000..6948f3c --- /dev/null +++ b/pysus/ftp/databases/ciha.py @@ -0,0 +1,97 @@ +from typing import List, Union, Optional + +from pysus.ftp import Database, Directory, File +from pysus.ftp.utils import zfill_year, to_list, parse_UFs, UFs, MONTHS + + +class CIHA(Database): + name = "CIHA" + paths = (Directory("/dissemin/publicos/CIHA/201101_/Dados")) + metadata = { + "long_name": "Comunicação de Internação Hospitalar e Ambulatorial", + "source": "http://ciha.datasus.gov.br/CIHA/index.php", + "description": ( + "A CIHA foi criada para ampliar o processo de planejamento, programação, " + "controle, avaliação e regulação da assistência à saúde permitindo um " + "conhecimento mais abrangente e profundo dos perfis nosológico e " + "epidemiológico da população brasileira, da capacidade instalada e do " + "potencial de produção de serviços do conjunto de estabelecimentos de saúde " + "do País. O sistema permite o acompanhamento das ações e serviços de saúde " + "custeados por: planos privados de assistência à saúde; planos públicos; " + "pagamento particular por pessoa física; pagamento particular por pessoa " + "jurídica; programas e projetos federais (PRONON, PRONAS, PROADI); recursos " + "próprios das secretarias municipais e estaduais de saúde; DPVAT; gratuidade " + "e, a partir da publicação da Portaria GM/MS nº 2.905/2022, consórcios públicos. " + "As informações registradas na CIHA servem como base para o processo de " + "Certificação de Entidades Beneficentes de Assistência Social em Saúde (CEBAS) " + "e para monitoramento dos programas PRONAS e PRONON." + ), + } + groups = { + "CIHA": "Comunicação de Internação Hospitalar e Ambulatorial", + } + + def describe(self, file: File): + if not isinstance(file, File): + return file + + if file.extension.upper() in [".DBC", ".DBF"]: + group, _uf, year, month = self.format(file) + + try: + uf = UFs[_uf] + except KeyError: + uf = _uf + + description = { + "name": str(file.basename), + "group": self.groups[group], + "uf": uf, + "month": MONTHS[int(month)], + "year": zfill_year(year), + "size": file.info["size"], + "last_update": file.info["modify"], + } + + return description + return file + + def format(self, file: File) -> tuple: + group, _uf = file.name[:4].upper(), file.name[4:6].upper() + year, month = file.name[-4:-2], file.name[-2:] + return group, _uf, zfill_year(year), month + + def get_files( + self, + uf: Optional[Union[List[str], str]] = None, + year: Optional[Union[list, str, int]] = None, + month: Optional[Union[list, str, int]] = None, + group: Union[List[str], str] = "CIHA", + ) -> List[File]: + files = list(filter( + lambda f: f.extension.upper() in [".DBC", ".DBF"], self.files + )) + + groups = [gr.upper() for gr in to_list(group)] + + if not all(gr in list(self.groups) for gr in groups): + raise ValueError( + "Unknown CIHA Group(s): " + f"{set(groups).difference(list(self.groups))}" + ) + + files = list(filter(lambda f: self.format(f)[0] in groups, files)) + + if uf: + ufs = parse_UFs(uf) + files = list(filter(lambda f: self.format(f)[1] in ufs, files)) + + if year or str(year) in ["0", "00"]: + years = [zfill_year(str(m)[-2:]) for m in to_list(year)] + files = list(filter(lambda f: self.format(f)[2] in years, files)) + + if month: + months = [str(y)[-2:].zfill(2) for y in to_list(month)] + files = list(filter(lambda f: self.format(f)[3] in months, files)) + + return files diff --git a/pysus/ftp/databases/pni.py b/pysus/ftp/databases/pni.py index c54cc29..b8656bf 100644 --- a/pysus/ftp/databases/pni.py +++ b/pysus/ftp/databases/pni.py @@ -6,9 +6,9 @@ class PNI(Database): name = "PNI" - paths = [ + paths = ( Directory("/dissemin/publicos/PNI/DADOS"), - ] + ) metadata = { "long_name": "Sistema de Informações do Programa Nacional de Imunizações", "source": ( diff --git a/pysus/ftp/databases/sia.py b/pysus/ftp/databases/sia.py index 5ee5389..a207bc2 100644 --- a/pysus/ftp/databases/sia.py +++ b/pysus/ftp/databases/sia.py @@ -6,10 +6,10 @@ class SIA(Database): name = "SIA" - paths = [ + paths = ( Directory("/dissemin/publicos/SIASUS/199407_200712/Dados"), Directory("/dissemin/publicos/SIASUS/200801_/Dados"), - ] + ) metadata = { "long_name": "Sistema de Informações Ambulatoriais", "source": "http://sia.datasus.gov.br/principal/index.php", diff --git a/pysus/ftp/databases/sih.py b/pysus/ftp/databases/sih.py index 52e005b..6e763b0 100644 --- a/pysus/ftp/databases/sih.py +++ b/pysus/ftp/databases/sih.py @@ -6,10 +6,10 @@ class SIH(Database): name = "SIH" - paths = [ + paths = ( Directory("/dissemin/publicos/SIHSUS/199201_200712/Dados"), Directory("/dissemin/publicos/SIHSUS/200801_/Dados"), - ] + ) metadata = { "long_name": "Sistema de Informações Hospitalares", "source": ( diff --git a/pysus/ftp/databases/sim.py b/pysus/ftp/databases/sim.py index 2f20f53..fb18972 100644 --- a/pysus/ftp/databases/sim.py +++ b/pysus/ftp/databases/sim.py @@ -6,10 +6,10 @@ class SIM(Database): name = "SIM" - paths = [ + paths = ( Directory("/dissemin/publicos/SIM/CID10/DORES"), Directory("/dissemin/publicos/SIM/CID9/DORES"), - ] + ) metadata = { "long_name": "Sistema de Informação sobre Mortalidade", "source": "http://sim.saude.gov.br", diff --git a/pysus/ftp/databases/sinan.py b/pysus/ftp/databases/sinan.py index ac8dc9c..4a825d1 100644 --- a/pysus/ftp/databases/sinan.py +++ b/pysus/ftp/databases/sinan.py @@ -6,10 +6,10 @@ class SINAN(Database): name = "SINAN" - paths = [ + paths = ( Directory("/dissemin/publicos/SINAN/DADOS/FINAIS"), Directory("/dissemin/publicos/SINAN/DADOS/PRELIM"), - ] + ) metadata = { "long_name": "Doenças e Agravos de Notificação", "source": "https://portalsinan.saude.gov.br/", @@ -127,7 +127,8 @@ def get_files( if codes and not all(code in self.diseases for code in codes): raise ValueError( - f"Unknown disease(s): {set(codes).difference(set(self.diseases))}" + f"Unknown disease(s): {set( + codes).difference(set(self.diseases))}" ) files = list(filter(lambda f: self.format(f)[0] in codes, files)) diff --git a/pysus/ftp/databases/sinasc.py b/pysus/ftp/databases/sinasc.py index 9d28dd4..2769c6c 100644 --- a/pysus/ftp/databases/sinasc.py +++ b/pysus/ftp/databases/sinasc.py @@ -6,10 +6,10 @@ class SINASC(Database): name = "SINASC" - paths = [ + paths = ( Directory("/dissemin/publicos/SINASC/NOV/DNRES"), Directory("/dissemin/publicos/SINASC/ANT/DNRES"), - ] + ) metadata = { "long_name": "Sistema de Informações sobre Nascidos Vivos", "source": "http://sinasc.saude.gov.br/",