Skip to content

Commit

Permalink
Merge pull request #7 from databricks-industry-solutions/835
Browse files Browse the repository at this point in the history
835
  • Loading branch information
zavoraad authored Jul 25, 2024
2 parents 149e6bf + c791b5b commit 5679c8a
Show file tree
Hide file tree
Showing 14 changed files with 598 additions and 93 deletions.
46 changes: 41 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,37 @@ filename
from edi
) fgs
) trnx
) clms
) clms;

--Create a "Claims Header" table

drop table if exists claim_header;
create table claim_header as
select filename,
tax_id,
sender,
transaction_type,
clms.claim_header.*,
clms.diagnosis.*,
clms.patient.*,
clms.payer.*,
clms.providers.*
from stg_claims
clms.providers.*,
clms.patient.name as patient_name,
clms.patient.patient_relationship_cd,
clms.patient.street as patient_street,
clms.patient.city as patient_city,
clms.patient.zip as patient_zip,
clms.patient.dob as patient_dob,
clms.patient.dob_format as patient_dob_format,
clms.patient.gender_cd as patient_gender_cd,
clms.subscriber.subsciber_identifier,
clms.subscriber.name as subscriber_name,
clms.subscriber.subscriber_relationship_cd,
clms.subscriber.street as subscriber_street,
clms.subscriber.city as subscriber_city,
clms.subscriber.zip as subscriber_zip,
clms.subscriber.dob as subscriber_dob,
clms.subscriber.dob_format as subscriber_dob_format,
clms.subscriber.gender_cd as subscriber_gender_cd
from stg_claims;

--Create a "Claim Line" table
create table claim_line as
Expand All @@ -97,6 +113,26 @@ from stg_claims
![image](images/claim_header.png?raw=true)
![image](images/claim_line.png?raw=true)

### 835 sample

```python
df = spark.read.text("sampledata/835/*txt", wholetext = True)

rdd = (
df.withColumn("filename", input_file_name()).rdd
.map(lambda x: (x.asDict().get("filename"),x.asDict().get("value")))
.map(lambda x: (x[0], EDI(x[1])))
.map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} )
.map(lambda x: json.dumps(x))
)
claims = spark.read.json(rdd)

#Create Claims tables from the EDI transactions
#...
```
![image](images/remittance.png?raw=true)


## Different EDI Formats

Default format used is AnsiX12 (* as a delim and ~ as segment separator)
Expand Down
16 changes: 10 additions & 6 deletions databricksx12/edi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@ def segment_count(self):
#
# Returns all segments matching segment_name
#
def segments_by_name(self, segment_name, range_start=-1, range_end=None):
return [x for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))]
def segments_by_name(self, segment_name, range_start=-1, range_end=None, data = None):
if data is None:
data = self.data
return [x for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))]

#
# Returns a tuple of all segments matching segment_name and their index
#
def segments_by_name_index(self, segment_name, range_start=-1, range_end = None):
return [(i,x) for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))]
def segments_by_name_index(self, segment_name, range_start=-1, range_end = None, data = None):
if data is None:
data = self.data
return [(i,x) for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))]

#
# Return the first occurence of the specified index
#
def index_of_segment(self, segments, segment_name):
def index_of_segment(self, segments, segment_name, search_start_idx=0):
try:
return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name])
return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name and i >=search_start_idx])
except:
return -1 #not found

Expand Down
Loading

0 comments on commit 5679c8a

Please sign in to comment.