diff --git a/.gitignore b/.gitignore index 1706b30..f078f2c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ emit_sds_l1a.egg-info __pycache__ .coverage -.DS_Store \ No newline at end of file +.DS_Store +*.log +tmp \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 06fbca3..d14723d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,22 @@ All notable changes to this project will be documented in this file. Dates are d Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog). +#### [v1.5.0](https://github.com/emit-sds/emit-sds-l1a/compare/v1.4.1...v1.5.0) + +> 14 October 2022 + +- Corrupt frame handling in reassembly [`#9`](https://github.com/emit-sds/emit-sds-l1a/pull/9) +- Merge IOC hotfixes into develop [`#8`](https://github.com/emit-sds/emit-sds-l1a/pull/8) +- Save corrupt frames with '9' as acquisition status. During reassembly only use these frames if data is not compressed. [`8ce2620`](https://github.com/emit-sds/emit-sds-l1a/commit/8ce26200972cb2d3ebca109977b7c37abe9da578) +- Fill in missing gps times in line timestamps file [`e42477c`](https://github.com/emit-sds/emit-sds-l1a/commit/e42477cfb9372bdffb03fea655ba7870907214fb) +- Add util for checking packet order in HOSC data. [`f2e0519`](https://github.com/emit-sds/emit-sds-l1a/commit/f2e0519b8f8dc3e7d19ce2326990b08b9a5eeb53) + #### [v1.4.1](https://github.com/emit-sds/emit-sds-l1a/compare/v1.4.0...v1.4.1) > 26 July 2022 +- Merge develop into main for v1.4.1 [`#7`](https://github.com/emit-sds/emit-sds-l1a/pull/7) +- Update change log [`bfc5933`](https://github.com/emit-sds/emit-sds-l1a/commit/bfc59333caa4f8aefd211211748feca0886081e6) - Update version to 1.4.1 [`69d7df0`](https://github.com/emit-sds/emit-sds-l1a/commit/69d7df0bb6b9695a0ee11dd0b8ff94738f3ef6b3) - Use 7 digit default orbit id [`e073e9e`](https://github.com/emit-sds/emit-sds-l1a/commit/e073e9e3e50428477797de4533e017b2d03cd5a4) diff --git a/depacketize_science_frames.py b/depacketize_science_frames.py index 3464be2..6ba7a87 100644 --- a/depacketize_science_frames.py +++ b/depacketize_science_frames.py @@ -86,7 +86,10 @@ def main(): try: frame_binary = processor.read_frame() frame = Frame(frame_binary) - frame.save(frames_dir) + if frame.corrupt_name in processor.corrupt_frames: + frame.save(frames_dir, corrupt=True) + else: + frame.save(frames_dir, corrupt=False) frame_count += 1 except EOFError: break diff --git a/emit_sds_l1a/ccsds_packet.py b/emit_sds_l1a/ccsds_packet.py index 4138728..8919ba0 100644 --- a/emit_sds_l1a/ccsds_packet.py +++ b/emit_sds_l1a/ccsds_packet.py @@ -346,11 +346,8 @@ def truncated_frame(self): self._stats["truncated_frame_errors"] += 1 def corrupt_frame(self, frame): - name = "_".join([str(frame.dcid).zfill(10), frame.start_time.strftime("%Y%m%dt%H%M%S"), - str(frame.frame_count_in_acq).zfill(5), str(frame.planned_num_frames).zfill(5), - str(frame.acq_status), str(frame.processed_flag)]) - if name not in self._stats["corrupt_frames"]: - self._stats["corrupt_frames"].append(name) + if frame.corrupt_name not in self._stats["corrupt_frames"]: + self._stats["corrupt_frames"].append(frame.corrupt_name) def get_data_bytes_read(self): return self._stats["data_bytes_read"] @@ -395,6 +392,7 @@ def __init__(self, stream_path, pkt_format="1.3"): logger.debug(f"Initializing SciencePacketProcessor from path {stream_path} using FSW v{pkt_format}") self.stream = open(stream_path, "rb") self.pkt_format = pkt_format + self.corrupt_frames = set() self._cur_psc = -1 self._cur_coarse = -1 self._cur_fine = -1 @@ -425,7 +423,7 @@ def _read_next_packet(self): pkt = ScienceDataPacket(stream=self.stream, pkt_format=self.pkt_format) logger.debug(pkt) self._stats.ccsds_read(pkt) - pkt_hash = str(pkt.coarse_time) + str(pkt.fine_time) + str(pkt.pkt_seq_cnt) + pkt_hash = "_".join([str(pkt.coarse_time), str(pkt.fine_time), str(pkt.pkt_seq_cnt)]) # Handle case where packet is not valid if not pkt.is_valid: @@ -657,6 +655,7 @@ def _read_pkt_parts(self, start_pkt): logger.info(f"Inserted garbage packet with {pkt.MAX_DATA_LEN} bytes of data. Accum data is " f"now {data_accum_len}") self._stats.corrupt_frame(frame) + self.corrupt_frames.add(frame.corrupt_name) elif 0 < remaining_data_len < pkt.MAX_DATA_LEN: if self.pkt_format == "1.2.1": if pkt.pad_byte_flag == 0: @@ -674,6 +673,7 @@ def _read_pkt_parts(self, start_pkt): logger.info(f"Inserted garbage packet with {remaining_data_len} bytes of data. Accum data is " f"now {data_accum_len}") self._stats.corrupt_frame(frame) + self.corrupt_frames.add(frame.corrupt_name) pkt_parts.append(pkt) data_accum_len += len(pkt.data) diff --git a/emit_sds_l1a/frame.py b/emit_sds_l1a/frame.py index 5d964e5..2211158 100644 --- a/emit_sds_l1a/frame.py +++ b/emit_sds_l1a/frame.py @@ -158,6 +158,14 @@ def __init__(self, frame_binary): self.instrument_mode_desc = "No match" if self.instrument_mode == "no_match" else \ INSTRUMENT_MODES[self.instrument_mode]["desc"] + # Frame name and also corrupt name if needed + self.name = "_".join([str(self.dcid).zfill(10), self.start_time.strftime("%Y%m%dt%H%M%S"), + str(self.frame_count_in_acq).zfill(5), str(self.planned_num_frames).zfill(5), + str(self.acq_status), str(self.processed_flag)]) + self.corrupt_name = "_".join([str(self.dcid).zfill(10), self.start_time.strftime("%Y%m%dt%H%M%S"), + str(self.frame_count_in_acq).zfill(5), str(self.planned_num_frames).zfill(5), + str(9), str(self.processed_flag)]) + logger.debug(f"Initialized frame: {self}") def __repr__(self): @@ -218,12 +226,11 @@ def is_valid(self): f"Computed checksum: {self._compute_hdr_checksum()}") return is_valid - def save(self, out_dir): - fname = "_".join([str(self.dcid).zfill(10), self.start_time.strftime("%Y%m%dt%H%M%S"), - str(self.frame_count_in_acq).zfill(5), str(self.planned_num_frames).zfill(5), - str(self.acq_status), str(self.processed_flag)]) - - out_path = os.path.join(out_dir, fname) + def save(self, out_dir, corrupt=False): + if corrupt: + out_path = os.path.join(out_dir, self.corrupt_name) + else: + out_path = os.path.join(out_dir, self.name) logger.info("Writing frame to path %s" % out_path) logger.debug("data length is %s" % len(self.data)) diff --git a/reassemble_raw_cube.py b/reassemble_raw_cube.py index 6e8ff86..7664b85 100644 --- a/reassemble_raw_cube.py +++ b/reassemble_raw_cube.py @@ -15,6 +15,7 @@ from ait.core import dmc from argparse import RawTextHelpFormatter +from collections import OrderedDict import numpy as np import spectral.io.envi as envi @@ -106,6 +107,25 @@ def generate_line_count_lookup(line_headers, num_lines, increment, frame_num_str return lc_lookup +def interpolate_missing_gps_times(lt_rows): + # Populate x, y from available gps times + x = [] + y = [] + for row in lt_rows: + if int(row[1]) >= 0: + x.append(int(row[0])) + y.append(int(row[1])) + x = np.array(x) + y = np.array(y) + m, b = np.polyfit(x, y, 1) + + for row in lt_rows: + if int(row[1]) == -1: + row[1] = str(int(m * int(row[0]) + b)).zfill(19) + + return lt_rows + + def calculate_nanoseconds_since_gps_epoch(line_timestamp, os_time_timestamp, os_time): # Need to adjust line timestamp in the case where the clock rolls over (which happens about every 12 hours) if line_timestamp < os_time_timestamp: @@ -126,8 +146,9 @@ def get_utc_time_from_gps(gps_time): def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, stop_time, timing_info, processed_flag, - coadd_mode, num_bands, num_lines, image_dir, report_text, failed_decompression_list, - uncompressed_list, missing_frame_nums, logger): + compression_flag, coadd_mode, num_bands, num_lines, image_dir, report_text, + corrupt_frames_list, failed_decompression_list, uncompressed_list, missing_frame_nums, + logger): # Reassemble frames into ENVI image cube filling in missing and cloudy data with data flags # First create acquisition_id from frame start_time # Assume acquisitions are at least 1 second long @@ -158,18 +179,20 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, cloudy_frame_nums = [] line = 0 - lt_file = open(line_timestamps_path, "w") + lt_rows = [] lc_increment = 2 if processed_flag == 1 and coadd_mode == 1 else 1 lc_lookup = None corrupt_lines = [] + frame_corrupt_line_map = OrderedDict() num_valid_lines = 0 for path in acq_data_paths: frame_num_str = os.path.basename(path).split(".")[0].split("_")[2] status = int(os.path.basename(path).split(".")[0].split("_")[4]) start_line_in_frame = (int(frame_num_str) - start_index) * num_lines logger.info(f"Adding frame {path}") - # Non-cloudy frames - if status in (0, 1): + frame_corrupt_line_map[os.path.basename(path)] = [] + # If the data is compressed and not cloudy, OR if if the data is uncompressed and not cloudy or corrupt + if (compression_flag == 1 and status in (0, 1)) or (compression_flag == 0 and status in (0, 1, 9)): num_valid_lines += num_lines # Write frame to output array frame = np.memmap(path, shape=(num_lines, int(hdr["bands"]), int(hdr["samples"])), dtype=np.int16, mode="r") @@ -178,6 +201,8 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, # DN values are in the range from 0 to 16384 if processed_flag == 1: output[line:line + num_lines, 1:, :] = output[line:line + num_lines, 1:, :] + 8192 + else: + output[line:line + num_lines, 1:, :] = np.int16((np.int64(output[line:line + num_lines, 1:, :]) + 32768) >> 2) # Read line headers and process below line_headers = frame[:, 0, :] @@ -193,6 +218,9 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, f"Assuming that all lines are corrupt.") corrupt_lines += list(range(start_line_in_frame, start_line_in_frame + num_lines)) num_valid_lines -= num_lines + frame_corrupt_line_map[os.path.basename(path)] = list(range(num_lines)) + # Set all values in these lines to corrupt flag value + output[line:line + num_lines, 1:, :] = CORRUPT_LINE_FLAG else: logger.info(f"Found a good line count in frame {frame_num_str} and generated a line count lookup.") @@ -211,13 +239,37 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, os_time=timing_info[int(frame_num_str)]['os_time'] ) utc_time_str = get_utc_time_from_gps(nanosecs_since_gps).strftime("%Y-%m-%dT%H:%M:%S.%f") - lt_file.write(f"{str(start_line_in_frame + i).zfill(6)} {str(nanosecs_since_gps).zfill(19)} " - f"{utc_time_str} {str(line_timestamp).zfill(10)} {str(line_count).zfill(10)}\n") + + # Write to timestamps file, but must insert -1 gps time for corrupt lines + if lc_lookup is None: + # This line is corrupt since all lines in frame are corrupt in this case + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), utc_time_str, + str(line_timestamp).zfill(10), str(line_count).zfill(10)]) if lc_lookup is not None and lc_lookup[start_line_in_frame + i] != line_count: - logger.warning(f"Found corrupt line at line number {start_line_in_frame + i}") - corrupt_lines.append(start_line_in_frame + i) - num_valid_lines -= 1 + logger.warning(f"Found corrupt line header at line number {start_line_in_frame + i}") + corrupt_line_num = start_line_in_frame + i + prev_corrupt_line_num = corrupt_line_num - 1 + if prev_corrupt_line_num >= 0 and prev_corrupt_line_num not in corrupt_lines: + logger.warning(f"Setting previous line at index {prev_corrupt_line_num} as corrupt") + corrupt_lines.append(prev_corrupt_line_num) + num_valid_lines -= 1 + frame_corrupt_line_map[os.path.basename(path)].append(i - 1) + output[prev_corrupt_line_num, 1:, :] = CORRUPT_LINE_FLAG + if corrupt_line_num not in corrupt_lines: + logger.warning(f"Setting line at index {corrupt_line_num} as corrupt") + corrupt_lines.append(corrupt_line_num) + num_valid_lines -= 1 + frame_corrupt_line_map[os.path.basename(path)].append(i) + output[corrupt_line_num, 1:, :] = CORRUPT_LINE_FLAG + # Since this line header is corrupt, use -1 for gps time + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), utc_time_str, + str(line_timestamp).zfill(10), str(line_count).zfill(10)]) + + else: + # Not corrupt, so write all the values + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(nanosecs_since_gps).zfill(19), + utc_time_str, str(line_timestamp).zfill(10), str(line_count).zfill(10)]) # Cloudy frames if status in (4, 5): @@ -226,26 +278,44 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, dtype=np.int16) output[line:line + num_lines, :, :] = frame[:, :, :].copy() for i in range(num_lines): - lt_file.write(f"{str(start_line_in_frame + i).zfill(6)} {str(-1).zfill(19)} 0000-00-00T00:00:00.000000 " - f"{str(-1).zfill(10)} {str(-1).zfill(10)}\n") + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), "0000-00-00T00:00:00.000000", + str(-1).zfill(10), str(-1).zfill(10)]) # Missing frames if status == 6: frame = np.full(shape=(num_lines, int(hdr["bands"]), int(hdr["samples"])), fill_value=MISSING_FRAME_FLAG, dtype=np.int16) output[line:line + num_lines, :, :] = frame[:, :, :].copy() for i in range(num_lines): - lt_file.write(f"{str(start_line_in_frame + i).zfill(6)} {str(-1).zfill(19)} 0000-00-00T00:00:00.000000 " - f"{str(-1).zfill(10)} {str(-1).zfill(10)}\n") + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), "0000-00-00T00:00:00.000000", + str(-1).zfill(10), str(-1).zfill(10)]) # Failed decompression if status == 7: frame = np.full(shape=(num_lines, int(hdr["bands"]), int(hdr["samples"])), fill_value=CORRUPT_FRAME_FLAG, dtype=np.int16) output[line:line + num_lines, :, :] = frame[:, :, :].copy() for i in range(num_lines): - lt_file.write(f"{str(start_line_in_frame + i).zfill(6)} {str(-1).zfill(19)} 0000-00-00T00:00:00.000000 " - f"{str(-1).zfill(10)} {str(-1).zfill(10)}\n") + lt_rows.append([str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), "0000-00-00T00:00:00.000000", + str(-1).zfill(10), str(-1).zfill(10)]) + + # Corrupt and compressed + if compression_flag == 1 and status == 9: + frame = np.full(shape=(num_lines, int(hdr["bands"]), int(hdr["samples"])), + fill_value=CORRUPT_FRAME_FLAG, dtype=np.int16) + output[line:line + num_lines, :, :] = frame[:, :, :].copy() + for i in range(num_lines): + lt_rows.append( + [str(start_line_in_frame + i).zfill(6), str(-1).zfill(19), "0000-00-00T00:00:00.000000", + str(-1).zfill(10), str(-1).zfill(10)]) + line += num_lines del output + + # Generate gpstime_lookup and replace -1 gps times with interpolated values + if num_valid_lines >= 2: + lt_rows = interpolate_missing_gps_times(lt_rows) + lt_file = open(line_timestamps_path, "w") + for row in lt_rows: + lt_file.write(" ".join(row) + "\n") lt_file.close() # Create a reassembly report @@ -300,7 +370,17 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, f.write("\n".join(i for i in uncompressed_in_acq) + "\n") f.write("\n") - # Report on corrupt frames (frames that failed decompression) + # Report on corrupt frames + corrupt_frames_in_acq = list(set(acquisition_frame_nums) & set(corrupt_frames_list)) + corrupt_frames_in_acq.sort() + + f.write(f"Total corrupt frames encountered in this acquisition: {len(corrupt_frames_in_acq)}\n") + f.write("List of corrupt frame numbers (if any):\n") + if len(corrupt_frames_in_acq) > 0: + f.write("\n".join(i for i in corrupt_frames_in_acq) + "\n") + f.write("\n") + + # Report on frames that failed decompression failed_decompression_in_acq = list(set(acquisition_frame_nums) & set(failed_decompression_list)) failed_decompression_in_acq.sort() @@ -333,13 +413,10 @@ def reassemble_acquisition(acq_data_paths, start_index, stop_index, start_time, f.write(f"List of corrupt lines (if any):\n") if len(corrupt_lines) > 0: for i, line_num in enumerate(corrupt_lines): - if i > 100: - f.write(f"More than 100 corrupt lines. See line timestamp file.\n") - break f.write(f"{str(line_num).zfill(6)}\n") f.write("\n") - result = {"corrupt_lines": corrupt_lines} + result = {"corrupt_lines": corrupt_lines, "frame_corrupt_line_map": frame_corrupt_line_map} return result @@ -423,8 +500,10 @@ def main(): num_bands_list = [] num_lines_list = [] processed_flag_list = [] + compression_flag_list = [] coadd_mode_list = [] instrument_mode_list = [] + corrupt_frames_list = [] failed_decompression_list = [] uncompressed_list = [] line_counts = [None] * int(expected_frame_num_str) @@ -439,12 +518,19 @@ def main(): frame_binary = f.read() frame = Frame(frame_binary) uncomp_frame_path = os.path.join(image_dir, os.path.basename(path) + ".xio.decomp") + compression_flag_list.append(frame.compression_flag) # Check frame checksum logger.debug(f"Frame is valid: {frame.is_valid()}") - # Decompress if compression flag is set and frame is not cloudy, otherwise, just copy file - if frame.compression_flag == 1 and frame.cloudy_flag == 0: + # Check if frame is corrupt + is_corrupt = False + if os.path.basename(path).split(".")[0].split("_")[4] == "9": + is_corrupt = True + corrupt_frames_list.append(os.path.basename(path).split(".")[0].split("_")[2]) + + # Decompress if compression flag is set and frame is not cloudy and not corrupt, otherwise, just copy file + if frame.compression_flag == 1 and frame.cloudy_flag == 0 and not is_corrupt: # Decompress frame interleave_arg = "--" + args.interleave cmd = [args.flexcodec_exe, path, "-a", args.constants_path, "-i", args.init_data_path, "-v", @@ -465,7 +551,11 @@ def main(): logger.error(f"Removing {uncomp_frame_path}") os.remove(uncomp_frame_path) continue - # raise RuntimeError(output.stderr.decode("utf-8")) + + elif frame.compression_flag == 1 and is_corrupt: + # Do nothing - this frame will fail decompression so skip it + logger.info(f"Found compressed and corrupt frame at {path}. Not attempting to decompress or preserve.") + continue else: # Just copy the uncompressed frame and rename it @@ -512,6 +602,14 @@ def main(): # Update report with decompression stats failed_decompression_list.sort() uncompressed_list.sort() + corrupt_frames_list.sort() + + # Abort if there is a mix of compressed and non-compressed + compression_flag_list.sort() + for i in range(len(compression_flag_list)): + if compression_flag_list[i] != compression_flag_list[0]: + raise RuntimeError(f"Not all frames have the same compression flag: {compression_flag_list}") + compression_flag = compression_flag_list[0] # Check all frames have same number of bands # num_bands_list.sort() @@ -569,7 +667,12 @@ def main(): missing_frame_nums.sort() # Now remove failed decompression frame nums from missing frame nums list missing_frame_nums = list(set(missing_frame_nums) - set(failed_decompression_list)) + missing_frame_nums.sort() + # Also remove the corrupt frames which are purposely missing + missing_frame_nums = list(set(missing_frame_nums) - set(corrupt_frames_list)) + missing_frame_nums.sort() + logger.debug(f"List of corrupt frame numbers (if any): {corrupt_frames_list}") logger.debug(f"List of failed decompression frame numbers (if any): {failed_decompression_list}") logger.debug(f"List of missing frame numbers (if any): {missing_frame_nums}") @@ -579,11 +682,19 @@ def main(): os.path.join(image_dir, "_".join([dcid, start_stop_times[int(num)][0].strftime("%Y%m%dt%H%M%S"), num, expected_frame_num_str, "6"]))) - # Add failed decompressions into frame_data_paths list with acquisition status of "" to indicate failed. + # Add failed decompressions into frame_data_paths list with acquisition status of "7" to indicate failed. for num in failed_decompression_list: frame_data_paths.append( os.path.join(image_dir, "_".join([dcid, start_stop_times[int(num)][0].strftime("%Y%m%dt%H%M%S"), num, expected_frame_num_str, "7"]))) + + # If compressed, add corrupt paths into frame_data_paths list with acquisition status of "9" to indicate corrupt. + if compression_flag == 1: + for num in corrupt_frames_list: + frame_data_paths.append( + os.path.join(image_dir, "_".join([dcid, start_stop_times[int(num)][0].strftime("%Y%m%dt%H%M%S"), + num, expected_frame_num_str, "9"]))) + frame_data_paths.sort(key=lambda x: os.path.basename(x).split("_")[2]) # Update report based on frames @@ -603,6 +714,7 @@ def main(): f"{frame_chunksize} frames\n\n" logger.info(f"Using frame chunksize of {frame_chunksize} to split data collection into acquisitions.") total_corrupt_lines = 0 + combined_frame_corrupt_line_map = OrderedDict() # Only do the chunking if there is enough left over for another full chunk while i + (2 * frame_chunksize) <= num_frames: acq_data_paths = frame_data_paths[i: i + frame_chunksize] @@ -613,17 +725,21 @@ def main(): stop_time=start_stop_times[i + frame_chunksize - 1][1], timing_info=timing_info, processed_flag=processed_flag, + compression_flag=compression_flag, coadd_mode=coadd_mode, num_bands=num_bands, num_lines=num_lines, image_dir=image_dir, report_text=report_txt, + corrupt_frames_list=corrupt_frames_list, failed_decompression_list=failed_decompression_list, uncompressed_list=uncompressed_list, missing_frame_nums=missing_frame_nums, logger=logger) i += frame_chunksize total_corrupt_lines += len(result["corrupt_lines"]) + for key, value in result["frame_corrupt_line_map"].items(): + combined_frame_corrupt_line_map[key] = value # There will be one left over at the end that is the frame_chunksize + remaining frames acq_data_paths = frame_data_paths[i:] @@ -634,21 +750,31 @@ def main(): stop_time=start_stop_times[num_frames - 1][1], timing_info=timing_info, processed_flag=processed_flag, + compression_flag=compression_flag, coadd_mode=coadd_mode, num_bands=num_bands, num_lines=num_lines, image_dir=image_dir, report_text=report_txt, + corrupt_frames_list=corrupt_frames_list, failed_decompression_list=failed_decompression_list, uncompressed_list=uncompressed_list, missing_frame_nums=missing_frame_nums, logger=logger) total_corrupt_lines += len(result["corrupt_lines"]) + for key, value in result["frame_corrupt_line_map"].items(): + combined_frame_corrupt_line_map[key] = value # Write out a report for the data collection as a whole dcid_report_path = os.path.join(args.work_dir, f"{dcid}_reassembly_report.txt") with open(dcid_report_path, "w") as f: f.write(report_txt) + # Corrupt frames + f.write(f"Total corrupt frames in this data collection: {len(corrupt_frames_list)}\n") + f.write("List of corrupt frame numbers (if any):\n") + if len(corrupt_frames_list) > 0: + f.write("\n".join(i for i in corrupt_frames_list) + "\n") + f.write("\n") # Decompression errors f.write(f"Total decompression errors in this data collection: {len(failed_decompression_list)}\n") f.write("List of frame numbers that failed decompression (if any):\n") @@ -675,6 +801,11 @@ def main(): f.write("\n") # Corrupt Lines f.write(f"Total corrupt lines (line count mismatches) in this data collection: {total_corrupt_lines}\n") + f.write(f"List of corrupt lines (if any):\n") + for frame, line_nums in combined_frame_corrupt_line_map.items(): + if len(line_nums) > 0: + line_nums.sort() + f.write(f"{frame}: {line_nums}\n") logger.info("Done") diff --git a/setup.py b/setup.py index be51fcd..65c9c92 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setuptools.setup( name="emit_sds_l1a", - version="1.4.1", + version="1.5.0", author="Winston Olson-Duvall", author_email="winston.olson-duvall@jpl.nasa.gov", description=""" diff --git a/util/find_all_sync_words.py b/util/find_all_sync_words.py index 3da87a4..abb58aa 100644 --- a/util/find_all_sync_words.py +++ b/util/find_all_sync_words.py @@ -5,14 +5,15 @@ import itertools from emit_sds_l1a.ccsds_packet import ScienceDataPacket +from emit_sds_l1a.frame import Frame PRIMARY_HDR_LEN = 6 HEADER_SYNC_WORD = bytes.fromhex("81FFFF81") parser = argparse.ArgumentParser() parser.add_argument("infile") -parser.add_argument("method", type=int, default=1) -parser.add_argument("pkt_format", default="1.3") +parser.add_argument("--method", type=int, default=2) +parser.add_argument("--pkt_format", default="1.3") args = parser.parse_args() in_file = open(args.infile, "rb") @@ -45,10 +46,23 @@ indices.append(i) else: print("Not using itertools...") + last_index = 0 + last_size = 0 for i in range(len(data) - len(HEADER_SYNC_WORD)): if data[i: i + len(HEADER_SYNC_WORD)] == HEADER_SYNC_WORD: indices.append(i) + frame = Frame(data[i: i + 1280]) + fname = "_".join([str(frame.dcid).zfill(10), frame.start_time.strftime("%Y%m%dt%H%M%S"), + str(frame.frame_count_in_acq).zfill(5), str(frame.planned_num_frames).zfill(5), + str(frame.acq_status), str(frame.processed_flag)]) + print(f"- Size since last index: {i - last_index}") + print(f"- Correct size: {True if last_size + 1280 == i - last_index else False}") + print(f"Index: {i}, Frame: {fname}, Valid: {frame.is_valid()}, Data Size: {frame.data_size}") + last_index = i + last_size = frame.data_size + # if len(indices) > 2: + # break -print(indices) +# print(indices) print(f"Total sync words found: {len(indices)}") print(datetime.datetime.now()) diff --git a/util/packet_order_check.py b/util/packet_order_check.py new file mode 100644 index 0000000..0eb70d1 --- /dev/null +++ b/util/packet_order_check.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +import argparse + +from emit_sds_l1a.ccsds_packet import ScienceDataPacket + + +parser = argparse.ArgumentParser() +parser.add_argument("infile") +args = parser.parse_args() + +in_file = open(args.infile, "rb") + +prev_pkt = None +print_next_pkt = False +count = 0 + +while True: + try: + pkt = ScienceDataPacket(in_file) + if print_next_pkt: + print(f"Next pkt: coarse={pkt.coarse_time}, fine={pkt.fine_time}, psc={pkt.pkt_seq_cnt}") + print_next_pkt = False + if prev_pkt is not None: + if pkt.coarse_time - prev_pkt.coarse_time < -1: + print(f"Found out of order coarse times with difference of {pkt.coarse_time - prev_pkt.coarse_time}") + print(f"Prev pkt: coarse={prev_pkt.coarse_time}, fine={prev_pkt.fine_time}, psc={prev_pkt.pkt_seq_cnt}") + print(f"Curr pkt: coarse={pkt.coarse_time}, fine={pkt.fine_time}, psc={pkt.pkt_seq_cnt}") + print_next_pkt = True + count += 1 + prev_pkt = pkt + + except EOFError: + break + +print(f"Total packets: {count}")