Skip to content

Commit

Permalink
Merge branch 'green_coa' into 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Lunga001 committed Mar 26, 2024
2 parents 3135116 + e920adb commit fd0ecd5
Show file tree
Hide file tree
Showing 32 changed files with 6,512 additions and 900 deletions.
10 changes: 10 additions & 0 deletions src/bika/coa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@
# Copyright 2019 by it's authors.

import logging
from bika.lims.api import get_request
from bika.coa.interfaces import IBikaCOALayer
from zope.i18nmessageid import MessageFactory

PRODUCT_NAME = "bika.coa"
PROFILE_ID = "profile-{}:default".format(PRODUCT_NAME)
logger = logging.getLogger(PRODUCT_NAME)
_ = MessageFactory(PRODUCT_NAME)


def initialize(context):
"""Initializer called when used as a Zope 2 product."""

logger.info("*** Initializing BIKA.COA ***")


def is_installed():
"""Returns whether the product is installed or not"""
request = get_request()
return IBikaCOALayer.providedBy(request)
171 changes: 140 additions & 31 deletions src/bika/coa/ajax.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,22 @@ def ajax_save_reports(self):
sample = getAdapter(sample, ISuperModel)
samples.append(sample)


csv_reports = []
is_multi_template = self.is_multi_template(template)
if template == "bika.coa:MultiGeochemistryBatch.pt":
if template == "bika.coa:GeoangolBatchMulti.pt":
csv_report = self.create_geochemistry_csv_report(samples,coa_num)
csv_reports = [csv_report for i in range(len(pdf_reports))]
elif template == "bika.coa:MultiBatch.pt":
elif template == "bika.coa:AWTCBatchMulti.pt":
csv_report= self.create_batch_csv_reports(samples,coa_num)
csv_reports = [csv_report for i in range(len(pdf_reports))]
if template == "bika.coa:MultiSampleTransposed.pt":
elif template == "bika.coa:ZimlabsTransposedMulti.pt":
csv_report = self.create_zlabs_csv_report(samples,coa_num)
csv_reports = [csv_report for i in range(len(pdf_reports))]
elif template == 'bika.hydrochem:HydroChemSystemsMulti.pt':
csv_reports = [""]
elif template == "bika.cannabis:CannabisUSMulti.pt":
csv_report = self.create_csv_reports_cannabis(samples, coa_num)
csv_reports = [csv_report for i in range(len(pdf_reports))]
elif is_multi_template:
csv_report = self.create_csv_reports(samples)
csv_reports = [csv_report for i in range(len(pdf_reports))]
Expand Down Expand Up @@ -199,12 +204,19 @@ def create_batch_csv_reports(self,samples,coa_num):
if len(headers_line) != len(top_headers):
for i in top_headers[len(headers_line):]:
headers_line.append('')

sample_data, penultimate_field_analyses, penultimate_lab_analyses = self.create_sample_rows(group_cats,field_analyses,lab_analyses)
header_rows = self.merge_header_and_values(top_headers,headers_line)

final_field_analyses = self.sort_analyses_to_list(penultimate_field_analyses)
final_lab_analyses = self.sort_analyses_to_list(penultimate_lab_analyses)
sample_data = self.get_sample_header_data(samples)
sample_ids = sample_data[0]
penultimate_field_analyses = self.create_field_sample_rows(
group_cats, field_analyses, sample_ids)
penultimate_lab_analyses = self.create_lab_sample_rows(
group_cats, lab_analyses, sample_ids)

header_rows = self.merge_header_and_values(top_headers, headers_line)
final_field_analyses = self.sort_analyses_to_list(
penultimate_field_analyses)
final_lab_analyses = self.sort_analyses_to_list(
penultimate_lab_analyses)

for row in header_rows:
writer.writerow(row)
Expand Down Expand Up @@ -405,31 +417,40 @@ def sort_analyses_to_list(self,analyses):
key_sort = sorted(title_sort, key=lambda x:(x[1][-1] is None,x[1][-1]))
return key_sort

def create_sample_rows(self,grouped_analyses,field_analyses,lab_analyses):
sample_ids = ["Sample ID"]
sample_Points = ["Sample Points"]
sample_Types = ["Sample Types"]
def create_field_sample_rows(
self, grouped_analyses, field_analyses, sample_ids):

if grouped_analyses.get("field"):
for field_Analysis_service in grouped_analyses.get("field"):
if field_Analysis_service.get("sampleID") not in sample_ids:
sample_ids.append(field_Analysis_service.get("sampleID"))
sample_Points.append(field_Analysis_service.get("samplePoint"))
sample_Types.append(field_Analysis_service.get("sampleType"))
position_at_top = sample_ids.index(field_Analysis_service.get("sampleID")) - 1
title = field_Analysis_service.get('title')
field_analyses[title][position_at_top] = field_Analysis_service.get("result")
for field_AS in grouped_analyses.get("field"):
position_at_top = sample_ids.index(
field_AS.get("sampleID")) - 1
title = field_AS.get('title')
field_analyses[title][position_at_top] = field_AS.get(
"result")
return field_analyses

def create_lab_sample_rows(
self, grouped_analyses, lab_analyses, sample_ids):

if grouped_analyses.get("lab"):
for lab_Analysis_service in grouped_analyses.get("lab"):
if lab_Analysis_service.get("sampleID") not in sample_ids:
sample_ids.append(lab_Analysis_service.get("sampleID"))
sample_Points.append(lab_Analysis_service.get("samplePoint"))
sample_Types.append(lab_Analysis_service.get("sampleType"))
position_at_top = sample_ids.index(lab_Analysis_service.get("sampleID")) - 1
title = lab_Analysis_service.get('title')
for lab_AS in grouped_analyses.get("lab"):
position_at_top = sample_ids.index(
lab_AS.get("sampleID")) - 1
title = lab_AS.get('title')
if lab_analyses.get(title):
lab_analyses.get(title)[position_at_top] = lab_Analysis_service.get("result")
sample_headers = [sample_ids,sample_Points,sample_Types]
return sample_headers,field_analyses,lab_analyses
lab_analyses.get(title)[position_at_top] = lab_AS.get(
"result")
return lab_analyses

def get_sample_header_data(self, samples):
sample_ids = ["Sample ID"]
sample_points = ["Sample Points"]
sample_types = ["Sample Types"]
for sample in samples:
sample_ids.append(sample.Title())
sample_points.append(sample.getSamplePointTitle())
sample_types.append(sample.getSampleTypeTitle())
return [sample_ids, sample_points, sample_types]

def merge_header_and_values(self,headers,values):
"""Merge the headers and their values to make writing to CSV easier"""
Expand Down Expand Up @@ -611,6 +632,94 @@ def create_csv_report(self, sample):

return output.getvalue()

def create_csv_reports_cannabis(self, samples, coa_num):
analyses = []
output = StringIO.StringIO()
client_headers = [["COA ID", coa_num],
["Client", samples[0].Client.title],
["Client Contact", samples[0].Contact.title]]
headers = ["Sample ID", "Tracking ID", "Client Sample ID",
"Batch ID", "License", "Sample Type", "Strain",
"Lot", "Cultivation Batch ID", "Quantity",
"Date Sampled", "Date Received",
"Date Verified", "Date Published"]
body = []
for sample in samples:
date_received = sample.DateReceived
date_sampled = sample.DateSampled
if date_sampled:
date_sampled = date_sampled.strftime('%m-%d-%y')
if date_received:
date_received = date_received.strftime('%m-%d-%y')
date_verified = sample.getDateVerified()
if date_verified:
date_verified = date_verified.strftime('%m-%d-%y')
license_num = sample.License
if license_num:
license_num = license_num.license_number
strain = sample.Strain
if strain:
strain = strain.title
date_published = sample.DatePublished
if date_published:
date_published = date_published.strftime('%m-%d-%y')

writer = csv.writer(output)
line = [
sample.id,
sample.TrackingID,
sample.ClientSampleID,
sample.getBatchID(),
license_num,
sample.SampleTypeTitle,
strain,
sample.LotNumber,
sample.CultivationBatchID,
sample.Quantity,
date_sampled,
date_received,
date_verified,
date_published
]

analyses = sample.getAnalyses(full_objects=True)
group_cats = {}
for analysis in analyses:
analysis_info = {'title': analysis.Title(),
'result': analysis.getFormattedResult(html=False),
'unit': analysis.getService().getUnit()}
if analysis.getCategoryTitle() not in group_cats.keys():
group_cats[analysis.getCategoryTitle()] = []
group_cats[analysis.getCategoryTitle()].append(analysis_info)

for g_cat in sorted(group_cats.keys()):
for a_info in group_cats[g_cat]:
title = a_info['title']
result = a_info['result']
unit = a_info['unit']
title_unit = title
if unit:
title_unit = '{} ({})'.format(title, unit)
if len(line) != len(headers):
for i in headers[len(line):]:
line.append('')

if title_unit not in headers:
headers.append(title_unit)
line.append(result)
else:
line[headers.index(title_unit)] = result

body.append(line)

for header in client_headers:
writer.writerow(header)
writer.writerow(headers)
for rec in body:
writer.writerow(rec)

return output.getvalue()

def create_csv_reports(self, samples):
analyses = []
output = StringIO.StringIO()
Expand Down
105 changes: 0 additions & 105 deletions src/bika/coa/browser/analysisrequest/published_results.py

This file was deleted.

6 changes: 6 additions & 0 deletions src/bika/coa/browser/configure.zcml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<configure xmlns="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser">

<include package=".listingview" />

</configure>
Empty file.
18 changes: 18 additions & 0 deletions src/bika/coa/browser/listingview/configure.zcml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configure xmlns="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser">

<!-- main and batch samples lists -->
<subscriber
for="bika.lims.browser.analysisrequest.AnalysisRequestPublishedResults
zope.interface.Interface"
provides="senaite.app.listing.interfaces.IListingViewAdapter"
factory=".published_results.AnalysisRequestPublishedResults"/>

<!-- reports listing -->
<subscriber
for="bika.lims.browser.publish.reports_listing.ReportsListingView
zope.interface.Interface"
provides="senaite.app.listing.interfaces.IListingViewAdapter"
factory=".report_listing.ReportsListingViewAdapter"/>

</configure>
Loading

0 comments on commit fd0ecd5

Please sign in to comment.