Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to brute #18

Merged
merged 7 commits into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions generator_brute.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@


channels = expand_clist(cfg["channel_range"])
channel_split = np.array_split(cfg["channel_range"],size)
n = len(channels) // size
channel_split = [channels[i:i+n] for i in range(0,len(channels),n)]
# Enforce 1:1 mapping of channels and tasks
assert(len(channel_split) == size)
# Channels this process is reading
Expand All @@ -53,7 +54,7 @@

# Get a data_loader
dobj = KstarEcei(shot=shotnr,data_path=datapath,clist=my_channel_range,verbose=False)
cfg.update({'TriggerTime':dobj.tt,'SampleRate':[dobj.fs/1e3],
cfg.update({'TriggerTime':dobj.tt.tolist(),'SampleRate':[dobj.fs/1e3],
'TFcurrent':dobj.itf/1e3,'Mode':dobj.mode,
'LoFreq':dobj.lo,'LensFocus':dobj.sf,'LensZoom':dobj.sz})

Expand All @@ -79,9 +80,12 @@
for i in range(nstep):
if(rank == 0):
print("Sending: {0:d} / {1:d}".format(i, nstep))
writer.BeginStep()
writer.put_data(varTime,np.array([tstarts[i],tstops[i]]))
writer.put_data(varData,data_all[i])
writer.EndStep()
t1 = time.time()
writer.writer.Close()

chunk_size = np.prod(data_arr.shape)*data_arr.itemsize/1024/1024
print("")
Expand Down
26 changes: 16 additions & 10 deletions generators/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from mpi4py import MPI
import adios2
import numpy as np
import pickle
import json

class writer_base():
def __init__(self, shotnr, id):
Expand Down Expand Up @@ -44,8 +44,8 @@ def DefineAttributes(self,attrsname,attrs):
atts, dict: Dictionary of key,value pairs to be put into attributes

"""
picklestr = pickle.dumps(attrs)
self.attrs = self.IO.DefineAttribute(attrsname,picklestr,"floats")
attrsstr = json.dumps(attrs)
self.attrs = self.IO.DefineAttribute(attrsname,attrsstr)

def Open(self):
"""Opens a new channel.
Expand All @@ -56,6 +56,13 @@ def Open(self):
if self.writer is None:
self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write)

def BeginStep(self):
"""wrapper for writer.BeginStep()"""
return self.writer.BeginStep()

def EndStep(self):
"""wrapper for writer.EndStep()"""
return self.writer.EndStep()

def put_data(self, var, data):
"""Opens a new stream and send data through it
Expand All @@ -66,15 +73,14 @@ def put_data(self, var, data):
"""

if self.writer is not None:
self.writer.BeginStep()
self.writer.Put(var, data, adios2.Mode.Sync)
self.writer.EndStep()


def __del__(self):
"""Close the IO."""
if self.writer is not None:
self.writer.Close()
#RMC - I find relying on this gives segfaults in bp files.
#Better to explicitly close it in the main program
#def __del__(self):
# """Close the IO."""
# if self.writer is not None:
# self.writer.Close()



Expand Down
9 changes: 8 additions & 1 deletion processors/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from mpi4py import MPI
import adios2
import numpy as np
import json


class reader_base():
Expand Down Expand Up @@ -44,11 +45,17 @@ def get_data(self, varname):
"""Attempt to load `varname` from the opened stream"""

var = self.IO.InquireVariable(varname)
io_array = np.zeros(np.prod(var.Shape()), dtype=np.float)
io_array = np.zeros(var.Shape(), dtype=np.float)
self.reader.Get(var, io_array, adios2.Mode.Sync)

return(io_array)

def get_attrs(self, attrsname):
"""Get json string `attrsname` from the opened stream"""

attrs = self.IO.InquireAttribute(attrsname)
return json.loads(attrs.DataString()[0])


def CurrentStep(self):
"""Wrapper for IO.CurrentStep()"""
Expand Down
36 changes: 13 additions & 23 deletions receiver_brute.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import socket
import queue
import threading
import pickle

import sys
from fluctana import *
Expand Down Expand Up @@ -52,11 +51,6 @@
cfg = json.load(df)
df.close()

datapath = cfg["datapath"]
resultspath = cfg["resultspath"]
shot = cfg["shot"]
my_analysis = cfg["analysis"][0]
gen_id = 2203 #TODO: Not clear if this was

#TODO: Remove for non-debug
if args.debug:
Expand Down Expand Up @@ -85,7 +79,8 @@ def get_data(self,type_str):
return data

def BeginStep(self):
return (self.current_step<len(self.dataSplit))
if (self.current_step<len(self.dataSplit)):
return adios2.stepStatus.OK

def CurrentStep(self):
return self.current_step
Expand All @@ -95,7 +90,7 @@ def EndStep(self):


shot = 18431; nchunk=10000
reader = read_stream(shot=shot,nchunk=nchunk,data_path=datapath)
reader = read_stream(shot=shot,nchunk=nchunk,data_path=cfg["datapath"])
#merge into cfg dict
cfg.update({'shot':shot,'nfft':1000,'window':'hann','overlap':0.0,'detrend':1,
'TriggerTime':reader.dobj.tt,'SampleRate':[reader.dobj.fs/1e3],
Expand All @@ -105,15 +100,11 @@ def EndStep(self):
def save_spec(results,tstep):
#TODO: Determine how to use adios2 efficiently instead (and how to read in like normal, e.g. without steps?)
#np.savez(resultspath+'delta.'+str(tstep).zfill(4)+'.npz',**results)
with adios2.open(resultspath+'delta.'+str(tstep).zfill(4)+'.bp','w') as fw:
with adios2.open(cfg["resultspath"]+'delta.'+str(tstep).zfill(4)+'.bp','w') as fw:
for key in results.keys():
fw.write(key,results[key],results[key].shape,[0]*len(results[key].shape),results[key].shape)


#HARDCODED fluctana, does all channels
#number of vertical and radial channels
NV = 24
NR = 8
A = FluctAna(verbose=False)
#TODO: Modify so it can take in a cfg set
#dobjAll = KstarEcei(shot=shot,cfg=cfg,clist=['ECEI_L0101-2408'],verbose=False)
Expand All @@ -128,18 +119,18 @@ def perform_analysis(channel_data, cfg, tstep, trange):
"""
logging.info(f"\tWorker: do analysis: tstep = {tstep}, rank = {rank}")
t0 = time.time()
if(my_analysis["name"] == "all"):
if(cfg["analysis"][0]["name"] == "all"):
results = {}
dobjAll = KstarEcei(shot=shot,cfg=cfg,clist=cfg["channel_range"],verbose=False)
dobjAll = KstarEcei(shot=cfg["shotnr"],cfg=cfg,clist=cfg["channel_range"],verbose=False)
if len(A.Dlist)==0:
A.Dlist.append(dobjAll)
else:
A.Dlist[0] = dobjAll
A.Dlist[0].data = channel_data
A.Dlist[0].time,_,_,_,_ = A.Dlist[0].time_base(trange)
#this could be done on rank==0 as Ralph imagined
A.fftbins(nfft=cfg['nfft'],window=cfg['window'],
overlap=cfg['overlap'],detrend=cfg['detrend'],full=1,scipy=True)
A.fftbins(nfft=cfg['fft_params']['nfft'],window=cfg['fft_params']['window'],
overlap=cfg['fft_params']['overlap'],detrend=cfg['fft_params']['detrend'],full=1,scipy=True)
results['stft'] = A.Dlist[0].spdata

Nchannels = channel_data.shape[0]
Expand Down Expand Up @@ -216,7 +207,7 @@ def dispatch():
# Only the master thread will open a data stream.
# General reader: engine type and params can be changed with the config file
if not args.debug:
reader = reader_gen(shot, gen_id, cfg["engine"], cfg["params"])
reader = reader_gen(cfg["shotnr"], 0, cfg["engine"], cfg["params"])
reader.Open()
else:
reader.get_all_data()
Expand All @@ -229,16 +220,15 @@ def dispatch():
tstart = time.time()
while(True):
stepStatus = reader.BeginStep()
if stepStatus == True:#adios2.StepStatus.OK:
if stepStatus == adios2.StepStatus.OK:
currentStep = reader.CurrentStep()
logging.info(f"Step {currentStep} started")
trange = list(reader.get_data("trange"))
channel_data = reader.get_data("floats")
currentStep = reader.CurrentStep()
if not cfg_update:
picklestr = reader.get_attrs("cfg")
cfg.update(pickle.loads(picklestr))
cfg.update(reader.get_attrs("cfg"))
cfg_update = True
reader.EndStep()
#print("rank {0:d}: Step".format(rank), reader.CurrentStep(), ", io_array = ", io_array)
else:
logging.info(f"Receiver: end of stream, rank = {rank}")
break
Expand Down
7 changes: 6 additions & 1 deletion tests_performance/config.receiver.brute.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
{
"datapath": "/global/cscratch1/sd/rchurchi/kstar_streaming/",
"shot": 18431,
"shotnr": 18431,
"fft_params" : {"nfft": 1000, "window": "hann", "overlap": 0.5, "detrend" :1},
"engine": "BP4",
"params": { "IPAddress": "203.230.120.125",
"Timeout": "60",
"OneToOneMode": "TRUE",
"OpenTimeoutSecs": "600"},
"analysis": [{"name" : "all"}],
"resultspath":"/global/cscratch1/sd/rchurchi/kstar_streaming/results/"
}
5 changes: 1 addition & 4 deletions tests_performance/job.generator.brute.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,5 @@ printf "%s" "$(<$0)"
echo ""

module load python
#DEBUG: Run to test local analysis
#srun -n 192 -c 2 python receiver_brute_mpi.py --debug

#NORMAL: Run to test streaming with adios2 analysis
srun -n 1 python generator_brute.py
srun -n 1 -c 2 python ~/delta_rmchurch/generator_brute.py --config config.generator.brute.json
2 changes: 1 addition & 1 deletion tests_performance/job.receiver.brute.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ module load python
#srun -n 192 -c 2 python receiver_brute_mpi.py --debug

#NORMAL: Run to test streaming with adios2 analysis
srun -n 192 -c 2 python receiver_brute.py
srun -n 192 -c 2 python receiver_brute.py --config config.receiver.brute.json