Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frequent Directions Modules #329

Open
wants to merge 61 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f2f968a
Added frequent directions module
Jul 17, 2023
7a589c3
Checkpoint. Not sure what has been changed, but parallel FD should be…
Jul 18, 2023
0a1df81
Updated freqdir
Jul 19, 2023
8514877
Checkpoint
Jul 19, 2023
4d752b4
Merge branch 'lcls-users:main' into main
john-winnicki Jul 19, 2023
f11a31f
Fixed gather command.
Jul 20, 2023
3b1f4a8
Merge branch 'main' of https://github.com/john-winnicki/btx
Jul 20, 2023
be8d312
checkpoint
Jul 21, 2023
7b26c0e
Added rank adaptive and tree merge. Both have not been tested yet
Jul 21, 2023
b8270aa
Parallel Rank Adaptive and Merge Tree appear to run. Can't say for su…
Jul 21, 2023
d5b8abd
Fixed treemerge and parallel rank adaptive FD. Things seem to work. N…
Jul 21, 2023
c204c3f
Separated FreqDir to create Merge Tree and also projection module. Sa…
Jul 25, 2023
07d624e
Checkpoint
Jul 28, 2023
1f801ef
Cleaned up code
Jul 28, 2023
222ba0b
Checkpoint. I don't think any significant changes have been made
Jul 30, 2023
fb68d4b
Refactored code and addressed many of the code review comments.
Aug 3, 2023
c0f4a42
Addressed more pull comments, fixed mean bug, and added more document…
Aug 3, 2023
a05ca8b
Added additional documentation for MergeTree and ApplyCompression mod…
Aug 4, 2023
832226d
Checkpoint. Not sure what has been changed
Aug 8, 2023
2a298ba
Checkpoint
Aug 9, 2023
d321fab
Added priority sampling in previous commit. Addressed minor comments …
Aug 10, 2023
ed9e9ea
Checkpoint. Didn't really change anythnig other than initial steps to…
Aug 16, 2023
79894e5
Set up parent class for dimension reduction called DimRed. Shared fun…
Aug 20, 2023
2c57322
Cleaned up code and added documentation where appropriate.
Aug 23, 2023
a3a25ed
Added UMAP visualization and wrapper class to FD module. Set up paren…
Aug 28, 2023
b3b57b6
Changed data grabbing to only retrieve data once at the beginning. Al…
Aug 31, 2023
e75809c
124 hz officially a achieved. Permission denied and h5 truncated, h5 …
Sep 2, 2023
697723e
Everything working and produces 120 hz processing speed. Added modula…
Sep 3, 2023
5a08eff
Fixed UMAP html
Sep 3, 2023
4692ae3
Fixed img range processed tracking and counting bug. Also moved data …
Sep 4, 2023
aa1b561
Cleaned up code. Produces 130hz with nice clustering.
Sep 4, 2023
7b182bd
Cleaned up code and made it so that you don't need to evenly divide e…
Sep 6, 2023
eb2c66d
Checkpoint. Not sure what changed
Sep 14, 2023
07c3ee8
disabling common mode correction in FredDir DataRetriever. Seems to b…
fredericpoitevin Sep 14, 2023
6f5fa51
typo
fredericpoitevin Sep 14, 2023
999a1f9
fixed cmpars behavior to disable common mode correction if requested
fredericpoitevin Sep 14, 2023
97a0af9
Merge pull request #1 from lcls-users/main-john
john-winnicki Sep 14, 2023
b4551fe
created FD sketch tasks and workflow.
fredericpoitevin Sep 14, 2023
71748da
created FD sketch tasks and workflow.
fredericpoitevin Sep 14, 2023
26531dd
Merge pull request #2 from lcls-users/main-john
john-winnicki Sep 15, 2023
d0b8545
Moved psana. Resolved cmpar bug. 430hz processing time not including …
Sep 15, 2023
8dd6ccc
Fixed reconstruction error. Removed double psana initialization. Move…
Sep 17, 2023
b8f9ab4
Checkpoint. I don't think there are any major changes.
Sep 24, 2023
0683896
Checkpoint. Runs well and synthetic data fixed.
Oct 3, 2023
9ccfb95
Separated visualization from sketching file
Oct 3, 2023
a64ccb0
Reverted separation changes.
Oct 3, 2023
260fe9a
Removing UMAP and HDBSCAN dependency from freqdir module. Also made d…
fredericpoitevin Oct 3, 2023
0bbad24
attempt at only having psana dependency where needed in freqdir
fredericpoitevin Oct 3, 2023
704a76e
Drafting FD tasks.
fredericpoitevin Oct 4, 2023
b4a7793
Fixed bug in importing btx and other libraries. Minor other changes I…
Oct 5, 2023
04195d8
Checkpoint
Oct 17, 2023
3c91bd0
added some logger INFO to ischeduler
fredericpoitevin Oct 20, 2023
c2b7ea5
when a conda environment is activated in write_dependencies (in ische…
fredericpoitevin Oct 20, 2023
5de1268
Some of these settings produced good visualization. I think you have …
Nov 8, 2023
64cabbf
Everything is working. It runs and produces beam profiles using scali…
Dec 29, 2023
e4ac86e
run, script, scalingscript, scalingrun. Fixed bug where images were n…
Dec 31, 2023
8f81c98
Checkpoint. This is working nice.
Jan 11, 2024
931de83
Pushing as a checkpoint. Not too sure what changed, but this code see…
Mar 12, 2024
c4abf71
Modified the script to allow for elog submission.
Mar 12, 2024
a254014
I don't think I changed anything.
Mar 12, 2024
1449dac
Not sure what changed
Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ adhoc/
__pycache__/

# cli -tmp yaml
tutorial/*-tmp.yaml
tutorial/*-tmp.yaml

*.h5
15 changes: 11 additions & 4 deletions btx/interfaces/ipsana.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class PsanaInterface:

def __init__(self, exp, run, det_type,
event_receiver=None, event_code=None, event_logic=True,
ffb_mode=False, track_timestamps=False, calibdir=None):
ffb_mode=False, track_timestamps=False, calibdir=None,
no_cmod=False):
self.exp = exp # experiment name, str
self.hutch = exp[:3] # hutch name, str
self.run = run # run number, int
Expand All @@ -21,10 +22,10 @@ def __init__(self, exp, run, det_type,
self.event_receiver = event_receiver # 'evr0' or 'evr1', str
self.event_code = event_code # event code, int
self.event_logic = event_logic # bool, if True, retain events with event_code; if False, keep all other events
self.set_up(det_type, ffb_mode, calibdir)
self.set_up(det_type, ffb_mode, calibdir, no_cmod)
self.counter = 0

def set_up(self, det_type, ffb_mode, calibdir=None):
def set_up(self, det_type, ffb_mode, calibdir=None, no_cmod=False):
"""
Instantiate DataSource and Detector objects; use the run
functionality to retrieve all psana.EventTimes.
Expand All @@ -37,6 +38,8 @@ def set_up(self, det_type, ffb_mode, calibdir=None):
if True, set up in an FFB-compatible style
calibdir: str
directory to alternative calibration files
no_cmod: bool
if True, deactivate common mode detector correction
"""
ds_args=f'exp={self.exp}:run={self.run}:idx'
if ffb_mode:
Expand All @@ -52,6 +55,7 @@ def set_up(self, det_type, ffb_mode, calibdir=None):
if calibdir is not None:
setOption('psana.calib_dir', calibdir)
self._calib_data_available()
self.no_cmod = no_cmod

def _calib_data_available(self):
"""
Expand Down Expand Up @@ -361,7 +365,10 @@ def get_images(self, num_images, assemble=True):
img = self.det.image(evt=evt)
else:
if self.calibrate:
img = self.det.calib(evt=evt)
cmpars = None
if self.no_cmod:
cmpars = [0,0,0]
img = self.det.calib(evt=evt, cmpars=cmpars)
else:
img = self.det.raw(evt=evt)
if self.det_type == 'epix10k2M':
Expand Down
13 changes: 12 additions & 1 deletion btx/interfaces/ischeduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ def _data_systems_management(self):

self.ana_conda_manage = f'{self.ana_conda_dir}conda1/manage/bin/'
self.ana_conda_bin = f'{self.ana_conda_dir}conda1/inst/envs/ana-4.0.47-py3/bin/'
self.pythonpath = None

def _find_python_path(self):
""" Determine the relevant python path. """
pythonpath=None
possible_paths = [f"{self.ana_conda_bin}python"]
if self.pythonpath is None:
possible_paths = [f"{self.ana_conda_bin}python"]
else:
possible_paths = [f"{self.pythonpath}"]

try:
pythonpath = os.environ['WHICHPYTHON']
Expand Down Expand Up @@ -118,12 +122,17 @@ def _write_dependencies(self, dependencies):
if "xgandalf" in dependencies:
dep_paths += "export PATH=/reg/g/cfel/crystfel/indexers/xgandalf/include/:$PATH\n"
dep_paths += "export PATH=/reg/g/cfel/crystfel/indexers/xgandalf/include/eigen3/Eigen/:$PATH"
if "fdviz" in dependencies:
dep_paths += f"conda activate /sdf/group/lcls/ds/tools/conda_envs/johnw-ana-4.0.48-py3"
self.pythonpath = "/sdf/group/lcls/ds/tools/conda_envs/johnw-ana-4.0.48-py3/bin/python"
dep_paths += "\n"

with open(self.jobfile, 'a') as jfile:
jfile.write(dep_paths)
logger.info(dep_paths)
if 'SIT_PSDM_DATA' in os.environ:
jfile.write(f"export SIT_PSDM_DATA={os.environ['SIT_PSDM_DATA']}\n")
logger.info(f"export SIT_PSDM_DATA={os.environ['SIT_PSDM_DATA']}\n")

def write_main(self, application, dependencies=[]):
""" Write application and source requested dependencies. """
Expand All @@ -133,6 +142,7 @@ def write_main(self, application, dependencies=[]):
pythonpath = self._find_python_path()
with open(self.jobfile, 'a') as jfile:
jfile.write(application.replace("python", pythonpath))
logger.info(application.replace("python", pythonpath))

def submit(self):
""" Submit to queue. """
Expand All @@ -143,3 +153,4 @@ def clean_up(self):
""" Add a line to delete submission file."""
with open(self.jobfile, 'a') as jfile:
jfile.write(f"if [ -f {self.jobfile} ]; then rm -f {self.jobfile}; fi")
logger.info(f"if [ -f {self.jobfile} ]; then rm -f {self.jobfile}; fi")
269 changes: 269 additions & 0 deletions btx/processing/dimRed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import os, csv, argparse

import numpy as np
from mpi4py import MPI

from matplotlib import pyplot as plt
from matplotlib import colors

import holoviews as hv
hv.extension('bokeh')
from holoviews.streams import Params

import panel as pn
import panel.widgets as pnw

from btx.misc.shortcuts import TaskTimer

from btx.interfaces.ipsana import (
PsanaInterface,
bin_data,
bin_pixel_index_map,
retrieve_pixel_index_map,
assemble_image_stack_batch,
)

class DimRed:

"""Dimension Reduction Parent Class."""

def __init__(
self,
exp,
run,
det_type,
start_offset=0,
num_images=10,
num_components=10,
batch_size=10,
priming=False,
downsample=False,
bin_factor=2,
output_dir="",
psi=None
):

self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()

if psi is None:
self.psi = PsanaInterface(exp=exp, run=run, det_type=det_type)
self.psi.counter = start_offset
else:
self.psi = psi
self.start_offset = start_offset

self.priming = priming
self.downsample = downsample
self.bin_factor = bin_factor
self.output_dir = output_dir

(
self.num_images,
self.num_components,
self.batch_size,
self.num_features,
) = self.set_params(num_images, num_components, batch_size, bin_factor)

self.split_indices, self.split_counts = distribute_indices_over_ranks(
self.num_features, self.size
)

self.task_durations = dict({})

self.num_incorporated_images = 0
self.outliers, self.pc_data = [], []

def get_params(self):
"""
Method to retrieve dimension reduction parameters.

Returns
-------
num_incorporated_images : int
number of images used to build model
num_components : int
number of components maintained in model
batch_size : int
batch size used in model updates
num_features : int
dimensionality of incorporated images
"""
return (
self.num_incorporated_images,
self.num_components,
self.batch_size,
self.num_features,
)

def set_params(self, num_images, num_components, batch_size, bin_factor):
"""
Method to initialize dimension reduction parameters.

Parameters
----------
num_images : int
Desired number of images to incorporate into model.
num_components : int
Desired number of components for model to maintain.
batch_size : int
Desired size of image block to be incorporated into model at each update.
bin_factor : int
Factor to bin data by.

Returns
-------
num_images : int
Number of images to incorporate into model.
num_components : int
Number of components for model to maintain.
batch_size : int
Size of image block to be incorporated into model at each update.
num_features : int
Number of features (dimension) in each image.
"""
max_events = self.psi.max_events
downsample = self.downsample

num_images = min(num_images, max_events) if num_images != -1 else max_events
num_components = min(num_components, num_images)
batch_size = min(batch_size, num_images)

# set d
det_shape = self.psi.det.shape()
num_features = np.prod(det_shape).astype(int)

if downsample:
if det_shape[-1] % bin_factor or det_shape[-2] % bin_factor:
print("Invalid bin factor, toggled off downsampling.")
self.downsample = False
else:
num_features = int(num_features / bin_factor**2)

return num_images, num_components, batch_size, num_features

def display_dashboard(self):
"""
Displays a pipca dashboard with a PC plot and intensity heatmap.
"""

start_img = self.start_offset

# Create PC dictionary and widgets
PCs = {f'PC{i}' : v for i, v in enumerate(self.pc_data, start=1)}
PC_options = list(PCs)

PCx = pnw.Select(name='X-Axis', value='PC1', options=PC_options)
PCy = pnw.Select(name='Y-Axis', value='PC2', options=PC_options)
widgets_scatter = pn.WidgetBox(PCx, PCy, width=100)

tap_source = None
posxy = hv.streams.Tap(source=tap_source, x=0, y=0)

# Create PC scatter plot
@pn.depends(PCx.param.value, PCy.param.value)
def create_scatter(PCx, PCy):
img_index_arr = np.arange(start_img, start_img + len(PCs[PCx]))
scatter_data = {**PCs, 'Image': img_index_arr}

opts = dict(width=400, height=300, color='Image', cmap='rainbow',
colorbar=True, show_grid=True, toolbar='above', tools=['hover'])
scatter = hv.Points(scatter_data, kdims=[PCx, PCy], vdims=['Image'],
label="%s vs %s" % (PCx.title(), PCy.title())).opts(**opts)

posxy.source = scatter
return scatter

# Define function to compute heatmap based on tap location
def tap_heatmap(x, y, pcx, pcy):
# Finds the index of image closest to the tap location
img_source = None
min_diff = None
square_diff = None

for i, (xv, yv) in enumerate(zip(PCs[pcx], PCs[pcy])):
square_diff = (x - xv) ** 2 + (y - yv) ** 2
if (min_diff is None or square_diff < min_diff):
min_diff = square_diff
img_source = i

# Downsample so heatmap is at most 100 x 100
counter = self.psi.counter
self.psi.counter = start_img + img_source
img = self.psi.get_images(1)
_, x_pixels, y_pixels = img.shape
self.psi.counter = counter

max_pixels = 100
bin_factor_x = int(x_pixels / max_pixels)
bin_factor_y = int(y_pixels / max_pixels)

while x_pixels % bin_factor_x != 0:
bin_factor_x += 1
while y_pixels % bin_factor_y != 0:
bin_factor_y += 1

img = img.reshape((x_pixels, y_pixels))
binned_img = img.reshape(int(x_pixels / bin_factor_x),
bin_factor_x,
int(y_pixels / bin_factor_y),
bin_factor_y).mean(-1).mean(1)

# Creates hm_data array for heatmap
bin_x_pixels, bin_y_pixels = binned_img.shape
rows = np.tile(np.arange(bin_x_pixels).reshape((bin_x_pixels, 1)), bin_y_pixels).flatten()
cols = np.tile(np.arange(bin_y_pixels), bin_x_pixels)

hm_data = np.stack((rows, cols, binned_img.flatten()))
hm_data = hm_data.T.reshape((bin_x_pixels * bin_y_pixels, 3))

opts = dict(width=400, height=300, cmap='plasma', colorbar=True, toolbar='above')
heatmap = hv.HeatMap(hm_data, label="Image %s" % (start_img+img_source)).aggregate(function=np.mean).opts(**opts)

return heatmap

# Connect the Tap stream to the tap_heatmap callback
stream1 = [posxy]
stream2 = Params.from_params({'pcx': PCx.param.value, 'pcy': PCy.param.value})
tap_dmap = hv.DynamicMap(tap_heatmap, streams=stream1+stream2)

return pn.Row(widgets_scatter, create_scatter, tap_dmap).servable('Cross-selector')


def distribute_indices_over_ranks(d, size):
"""

Parameters
----------
d : int
total number of dimensions
size : int
number of ranks in world

Returns
-------
split_indices : ndarray, shape (size+1 x 1)
division indices between ranks
split_counts : ndarray, shape (size x 1)
number of dimensions allocated per rank
"""

total_indices = 0
split_indices, split_counts = [0], []

for r in range(size):
num_per_rank = d // size
if r < (d % size):
num_per_rank += 1

split_counts.append(num_per_rank)

total_indices += num_per_rank
split_indices.append(total_indices)

split_indices = np.array(split_indices)
split_counts = np.array(split_counts)

return split_indices, split_counts

Loading
Loading