Skip to content

Commit

Permalink
refactored download function
Browse files Browse the repository at this point in the history
  • Loading branch information
gokulprathin8 committed Aug 14, 2024
1 parent e915839 commit 47b02ce
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 175 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/gw/database/HistoryRepository.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.gw.database;

import com.gw.jpa.History;
import com.gw.jpa.HistoryDTO;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import javax.transaction.Transactional;

Expand Down Expand Up @@ -163,4 +164,11 @@ public interface HistoryRepository extends JpaRepository<History, String> {
@Query(value = "SELECT * FROM history, gwprocess WHERE history.history_id = ?1 AND history.history_process = gwprocess.id", nativeQuery = true)
List<Object[]> findOneHistoryofProcess(String history_id);


@Query(
value = "SELECT * FROM history WHERE history_process IN (:processIds)",
nativeQuery = true)
List<History> findByProcessIds(@Param("processIds") List<String> processIds);


}
9 changes: 9 additions & 0 deletions src/main/java/com/gw/database/ProcessRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.gw.jpa.GWProcess;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
Expand Down Expand Up @@ -77,4 +80,10 @@ public interface ProcessRepository extends CrudRepository<GWProcess, String> {
*/
@Query(value = "select * from gwprocess where lang = 'jupyter'", nativeQuery = true)
Collection<GWProcess> findNotebookProcess();


@Query(
value = "SELECT * FROM gwprocess WHERE id IN :ids",
nativeQuery = true)
List<GWProcess> findProcessesByIds(@Param("ids") List<String> ids);
}
12 changes: 6 additions & 6 deletions src/main/java/com/gw/tools/ProcessTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -744,6 +741,9 @@ public String all_history(String pid, String mode) {
return all_history(pid, false, mode);

}




public List<GWProcess> getProcessesByIds(List<String> processIds) {
return processrepository.findProcessesByIds(processIds);
}
}
227 changes: 60 additions & 167 deletions src/main/java/com/gw/tools/WorkflowTool.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gw.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.gw.database.CheckpointRepository;
import com.gw.database.HistoryRepository;
import com.gw.database.ProcessRepository;
import com.gw.database.WorkflowRepository;
import com.gw.jpa.ExecutionStatus;
import com.gw.jpa.GWProcess;
Expand All @@ -14,13 +16,9 @@
import com.gw.utils.RandomString;
import java.io.File;
import java.nio.file.FileSystems;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.log4j.Logger;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
Expand All @@ -43,6 +41,9 @@ public class WorkflowTool {

@Autowired WorkflowRepository workflowrepository;

@Autowired
ProcessRepository processRepository;

@Autowired HistoryRepository historyrepository;

@Autowired CheckpointRepository checkpointrepository;
Expand Down Expand Up @@ -433,7 +434,7 @@ public String all_active_process() {
/**
* show the history of every execution of the workflow
*
* @param string
* @param workflow_id
* @return
*/
public String all_history(String workflow_id) {
Expand Down Expand Up @@ -587,198 +588,90 @@ public String getExportModeById(int mode_no) {
* @return
* @throws ParseException
*/
public String download(String wid, String option) throws ParseException {

public String download(String wid, String option) throws ParseException {
Workflow wf = this.getById(wid);

String fileurl = "download/temp/" + wf.getId() + ".zip";

String savefilepath =
bt.getFileTransferFolder() + wf.getId() + FileSystems.getDefault().getSeparator();
String savefilepath = bt.getFileTransferFolder() + wf.getId() + FileSystems.getDefault().getSeparator();

File tf = new File(savefilepath);
if (tf.exists()) {
bt.deleteDirectory(tf);
}
tf.mkdirs();

bt.deleteDirectory(tf);

if (!tf.exists()) tf.mkdirs();

String workflowstring = bt.toJSON(wf);
Gson gson = new Gson();

bt.writeString2File(workflowstring, savefilepath + "workflow.json");
// Write workflow to JSON
String workflowJson = gson.toJson(wf);
bt.writeString2File(workflowJson, savefilepath + "workflow.json");

if (option.contains("processcode")) {

JSONParser jsonParser = new JSONParser();

JSONArray arrayobj = (JSONArray) jsonParser.parse(wf.getNodes());

String codesavefile = savefilepath + "code" + FileSystems.getDefault().getSeparator();

File codef = new File(codesavefile);
if (!codef.exists()) {
codef.mkdirs();
}

if (!codef.exists()) codef.mkdirs();

StringBuffer processjson = new StringBuffer("[");

String prefix = "";

for (int i = 0; i < arrayobj.size(); i++) {

try {

JSONObject jsonObj = (JSONObject) arrayobj.get(i);

String process_workflow_id = (String) jsonObj.get("id");

String process_id = process_workflow_id.split("-")[0];

String targetsourcefile = codesavefile + pt.getProcessFileName(process_id);

if (new File(targetsourcefile).exists()) continue;

GWProcess p = pt.getProcessById(process_id);

bt.writeString2File(p.getCode(), targetsourcefile);

processjson.append(prefix);

prefix = ",";

processjson.append(pt.toJSON(p));

} catch (Exception e) {

e.printStackTrace();
}
List<String> processIds = new ArrayList<>();
for (Object obj : arrayobj) {
JSONObject jsonObj = (JSONObject) obj;
String process_workflow_id = (String) jsonObj.get("id");
String process_id = process_workflow_id.split("-")[0];
processIds.add(process_id);
}

processjson.append("]");
// Fetch all processes at once
List<GWProcess> processes = pt.getProcessesByIds(processIds);
String processJson = gson.toJson(processes);

bt.writeString2File(processJson, codesavefile + "process.json");

bt.writeString2File(processjson.toString(), codesavefile + "process.json");
// Write individual process code files
processes.parallelStream().forEach(process -> {
String targetSourceFile = codesavefile + pt.getProcessFileName(process.getId());
bt.writeString2File(process.getCode(), targetSourceFile);
});
}

if (option.contains("history")) {
String wfhistorysavefile = savefilepath + "history" + FileSystems.getDefault().getSeparator() + wid + ".json";

String wfhistorysavefile =
savefilepath + "history" + FileSystems.getDefault().getSeparator() + wid + ".json";

// first save all history of the workflow

// Fetch workflow history
List<History> histlist = historyrepository.findByWorkflowId(wid);
String workflowHistoryJson = gson.toJson(histlist);
bt.writeString2File(workflowHistoryJson, wfhistorysavefile);

StringBuffer workflowhistory = new StringBuffer("[");

String prefix = "";

for (History h : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(h.getIndicator())) {

continue;
}

String historystr = bt.toJSON(h);

workflowhistory.append(prefix);

prefix = ",";

workflowhistory.append(historystr);
}
;

workflowhistory.append("]");

bt.writeString2File(workflowhistory.toString(), wfhistorysavefile);

// second, save process history of one workflow execution into a file
HashSet<String> process_id_set = new HashSet<>();

// Collect process IDs and fetch process histories
HashSet<String> processIdSet = new HashSet<>();
for (History h : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(h.getIndicator())) {

if ("workflowwithprocesscodegoodhistory".equals(option) && !ExecutionStatus.DONE.equals(h.getIndicator())) {
continue;
}

String[] processhistorylist = h.getHistory_output().split(";");

prefix = "";

String processhistorysavefile =
savefilepath
+ "history"
+ FileSystems.getDefault().getSeparator()
+ h.getHistory_id()
+ ".json"; // all the process history of one workflow run

StringBuffer processhistorybuffer = new StringBuffer("[");

for (String processhitoryid : processhistorylist) {

Optional<History> hisop = historyrepository.findById(processhitoryid);

if (hisop.isPresent()) {

History hist = hisop.get();

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(hist.getIndicator())) {

continue;
}

processhistorybuffer.append(prefix);

prefix = ",";

processhistorybuffer.append(bt.toJSON(hist));

if (!process_id_set.contains(hist.getHistory_process()))
process_id_set.add(hist.getHistory_process());
}
}

processhistorybuffer.append("]");

bt.writeString2File(processhistorybuffer.toString(), processhistorysavefile);
String[] processHistoryList = h.getHistory_output().split(";");
Collections.addAll(processIdSet, processHistoryList);
}

// if need all the history of the involved processes, go into this if
if (option.contains("allhistory") || "workflowwithprocesscodegoodhistory".equals(option)) {

for (String history_process_id : process_id_set) {

histlist = historyrepository.findByProcessIdFull(history_process_id);
if (!processIdSet.isEmpty()) {
// Fetch all process histories at once
List<String> processIdList = new ArrayList<>(processIdSet);
List<History> allProcessHistories = historyrepository.findByProcessIds(processIdList);
Map<String, List<History>> processHistoriesMap = allProcessHistories.stream()
.collect(Collectors.groupingBy(History::getHistory_process));

StringBuffer allprocesshistorybuffer = new StringBuffer("[");
// Write each process history to its respective file
processIdSet.parallelStream().forEach(processId -> {
List<History> processHistories = processHistoriesMap.get(processId);
String processHistoryJson = gson.toJson(processHistories);

// every process has a history file
String allprocesshistorysavefile =
savefilepath
+ "history"
+ FileSystems.getDefault().getSeparator()
+ "process_"
+ history_process_id
+ ".json";

for (History hist : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(hist.getIndicator())) {

continue;
}

allprocesshistorybuffer.append(bt.toJSON(hist)).append(",");
}

allprocesshistorybuffer.append("]");

bt.writeString2File(allprocesshistorybuffer.toString(), allprocesshistorysavefile);
}
String processHistorySaveFile = savefilepath + "history" + FileSystems.getDefault().getSeparator() + "process_" + processId + ".json";
bt.writeString2File(processHistoryJson, processHistorySaveFile);
});
}
}

Expand Down

0 comments on commit 47b02ce

Please sign in to comment.