Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed self.lanes to CUR_LANES #201

Merged
merged 8 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 76 additions & 59 deletions extras/AFC.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def __init__(self, config):
self.VarFile = config.get('VarFile')
self.current = None
self.error_state = False
self.units = {}
self.lanes = {}
self.extruders = {}
self.stepper = {}
self.tool_cmds={}
self.afc_monitoring = False

Expand Down Expand Up @@ -157,19 +159,19 @@ def cmd_AFC_STATUS(self, gcmd):
"""
status_msg = ''

for UNIT in self.lanes.keys():
for UNIT in self.units.keys():
# Find the maximum length of lane names to determine the column width
max_lane_length = max(len(lane) for lane in self.lanes[UNIT].keys())
max_lane_length = max(len(lane) for lane in self.units[UNIT].keys())

status_msg += '<span class=info--text>{} Status</span>\n'.format(UNIT)

# Create a dynamic format string that adjusts based on lane name length
header_format = '{:<{}} | Prep | Load |\n'
status_msg += header_format.format("LANE", max_lane_length)

for LANE in self.lanes[UNIT].keys():
for LANE in self.units[UNIT].keys():
lane_msg = ''
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + LANE)
CUR_LANE = self.AFC.stepper[LANE]
CUR_HUB = self.printer.lookup_object('AFC_hub '+ UNIT)
CUR_EXTRUDER = self.printer.lookup_object('AFC_extruder ' + CUR_LANE.extruder_name)
if self.current != None:
Expand Down Expand Up @@ -229,7 +231,7 @@ def cmd_SET_BOWDEN_LENGTH(self, gcmd):

# If hub is not passed in try and get hub if a lane is currently loaded
if hub is None and self.current is not None:
CUR_LANE= self.printer.lookup_object('AFC_stepper ' + self.current)
CUR_LANE = self.AFC.stepper[self.current]
hub = CUR_LANE.unit
elif hub is None and self.current is None:
self.gcode.respond_info("A lane is not loaded please specify hub to adjust bowden length")
Expand Down Expand Up @@ -277,7 +279,10 @@ def cmd_LANE_MOVE(self, gcmd):
"""
lane = gcmd.get('LANE', None)
distance = gcmd.get_float('DISTANCE', 0)
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
CUR_LANE.move(distance, self.short_moves_speed, self.short_moves_accel, True)

def save_pos(self):
Expand Down Expand Up @@ -331,7 +336,8 @@ def save_vars(self):
make it more readable for users
"""
with open(self.VarFile+ '.unit', 'w') as f:
f.write(json.dumps(self.lanes, indent=4))
status = self.get_status(0)
f.write(json.dumps(status, indent=4))
with open(self.VarFile+ '.tool', 'w') as f:
f.write(json.dumps(self.extruders, indent=4))

Expand All @@ -355,7 +361,10 @@ def cmd_HUB_CUT_TEST(self, gcmd):
"""
lane = gcmd.get('LANE', None)
self.gcode.respond_info('Testing Hub Cut on Lane: ' + lane)
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
CUR_HUB = self.printer.lookup_object('AFC_hub ' + CUR_LANE.unit)
CUR_HUB.hub_cut(CUR_LANE)
self.gcode.respond_info('Hub cut Done!')
Expand Down Expand Up @@ -385,11 +394,10 @@ def cmd_TEST(self, gcmd):
self.ERROR.AFC_error('Must select LANE', False)
return
self.gcode.respond_info('TEST ROUTINE')
try:
CUR_LANE = self.printer.lookup_object('AFC_stepper '+lane)
except error:
self.ERROR.fix( 'could not find stepper {}'.format(lane), CUR_LANE ) #send to error handling
return
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
self.gcode.respond_info('Testing at full speed')
CUR_LANE.assist(-1)
self.reactor.pause(self.reactor.monotonic() + 1)
Expand Down Expand Up @@ -429,7 +437,10 @@ def cmd_HUB_LOAD(self, gcmd):
None
"""
lane = gcmd.get('LANE', None)
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
CUR_HUB = self.printer.lookup_object('AFC_hub '+ CUR_LANE.unit)
if CUR_LANE.prep_state == False: return

Expand All @@ -444,11 +455,9 @@ def cmd_HUB_LOAD(self, gcmd):
while CUR_HUB.state == True:
CUR_LANE.move(CUR_HUB.move_dis * -1, self.short_moves_speed, self.short_moves_accel)
CUR_LANE.status = ''
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
CUR_LANE.do_enable(False)
CUR_LANE.hub_load = True
self.lanes[CUR_LANE.unit][CUR_LANE.name]['hub_loaded'] = CUR_LANE.hub_load
self.save_vars()

cmd_LANE_UNLOAD_help = "Unload lane from extruder"
Expand All @@ -469,14 +478,16 @@ def cmd_LANE_UNLOAD(self, gcmd):
None
"""
lane = gcmd.get('LANE', None)
CUR_LANE = self.printer.lookup_object('AFC_stepper '+ lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
CUR_HUB = self.printer.lookup_object('AFC_hub '+ CUR_LANE.unit)
if CUR_LANE.name != self.current:
# Setting status as ejecting so if filament is removed and de-activates the prep sensor while
# extruder motors are still running it does not trigger infinite spool or pause logic
# once user removes filament lanes status will go to None
CUR_LANE.status = 'ejecting'
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
CUR_LANE.do_enable(True)
if CUR_LANE.hub_load:
Expand All @@ -486,9 +497,7 @@ def cmd_LANE_UNLOAD(self, gcmd):
CUR_LANE.move( CUR_HUB.move_dis * -1, self.short_moves_speed, self.short_moves_accel, True)
CUR_LANE.move( CUR_HUB.move_dis * -5, self.short_moves_speed, self.short_moves_accel)
CUR_LANE.do_enable(False)
self.lanes[CUR_LANE.unit][CUR_LANE.name]['hub_loaded'] = CUR_LANE.hub_load
CUR_LANE.status = ''
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()

# Removing spool from vars since it was ejected
Expand Down Expand Up @@ -516,7 +525,10 @@ def cmd_TOOL_LOAD(self, gcmd):
None
"""
lane = gcmd.get('LANE', None)
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
self.TOOL_LOAD(CUR_LANE)

def TOOL_LOAD(self, CUR_LANE):
Expand Down Expand Up @@ -557,7 +569,6 @@ def TOOL_LOAD(self, CUR_LANE):

# Set the lane status to 'loading' and activate the loading LED.
CUR_LANE.status = 'loading'
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
self.afc_led(self.led_loading, CUR_LANE.led_index)

Expand Down Expand Up @@ -608,7 +619,6 @@ def TOOL_LOAD(self, CUR_LANE):

# Synchronize lane's extruder stepper and finalize tool loading.
CUR_LANE.status = 'Tooled'
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
CUR_LANE.extruder_stepper.sync_to_extruder(CUR_LANE.extruder_name)

Expand Down Expand Up @@ -636,8 +646,8 @@ def TOOL_LOAD(self, CUR_LANE):
break
CUR_LANE.extruder_stepper.sync_to_extruder(CUR_LANE.extruder_name)
# Update tool and lane status.
self.printer.lookup_object('AFC_stepper ' + CUR_LANE.name).status = 'tool'
self.lanes[CUR_LANE.unit][CUR_LANE.name]['tool_loaded'] = True
CUR_LANE.status = 'tool'
CUR_LANE.tool_loaded = True
self.current = CUR_LANE.name
CUR_EXTRUDER.enable_buffer()

Expand All @@ -653,9 +663,9 @@ def TOOL_LOAD(self, CUR_LANE):
self.gcode.run_script_from_command(self.wipe_cmd)

# Update lane and extruder state for tracking.
self.lanes[CUR_LANE.unit][CUR_LANE.name]['hub_loaded'] = True
CUR_LANE.hub_loaded = True
self.extruders[CUR_LANE.extruder_name]['lane_loaded'] = CUR_LANE.name
self.SPOOL.set_active_spool(self.lanes[CUR_LANE.unit][CUR_LANE.name]['spool_id'])
self.SPOOL.set_active_spool(CUR_LANE.spool_id)
self.afc_led(self.led_tool_loaded, CUR_LANE.led_index)
self.save_vars()
else:
Expand Down Expand Up @@ -692,7 +702,10 @@ def cmd_TOOL_UNLOAD(self, gcmd):
lane = gcmd.get('LANE', self.current)
if lane == None:
return
CUR_LANE = self.printer.lookup_object('AFC_stepper '+ lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
self.TOOL_UNLOAD(CUR_LANE)

# User manually unloaded spool from toolhead, remove spool from active status
Expand Down Expand Up @@ -735,7 +748,6 @@ def TOOL_UNLOAD(self, CUR_LANE):
extruder = self.toolhead.get_extruder()
self.heater = extruder.get_heater()
CUR_LANE.status = 'unloading'
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
# Disable the buffer if it's active.
CUR_EXTRUDER.disable_buffer()
Expand Down Expand Up @@ -820,8 +832,7 @@ def TOOL_UNLOAD(self, CUR_LANE):
CUR_LANE.move(CUR_HUB.afc_bowden_length * -1, self.long_moves_speed, self.long_moves_accel, True)

# Clear toolhead's loaded state for easier error handling later.
self.lanes[CUR_LANE.unit][CUR_LANE.name]['tool_loaded'] = False
self.lanes[CUR_LANE.unit][CUR_LANE.name]['hub_loaded'] = CUR_LANE.hub_load
CUR_LANE.tool_loaded = False
self.extruders[CUR_LANE.extruder_name]['lane_loaded'] = ''
self.save_vars()

Expand Down Expand Up @@ -859,7 +870,6 @@ def TOOL_UNLOAD(self, CUR_LANE):
CUR_LANE.hub_load = True
self.afc_led(self.led_ready, CUR_LANE.led_index)
CUR_LANE.status = None
self.lanes[CUR_LANE.unit][CUR_LANE.name]['status']=CUR_LANE.status
self.save_vars()
self.current = None
CUR_LANE.do_enable(False)
Expand Down Expand Up @@ -923,24 +933,32 @@ def cmd_CHANGE_TOOL(self, gcmd):
self.in_toolchange = True

# Lookup the lane object for the requested lane.
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)
if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
# Check if the lane has completed the preparation process required for tool changes.
if CUR_LANE._afc_prep_done:
# Log the tool change operation for debugging or informational purposes.
self.gcode.respond_info("Tool Change - {} -> {}".format(self.current, lane))

# If a current lane is loaded, unload it first.
if self.current is not None:
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + self.current)
if self.current not in self.AFC.stepper:
self.AFC.gcode.respond_info(self.current + ' Unknown')
return
CUR_LANE = self.AFC.stepper[self.current]
if not self.TOOL_UNLOAD(CUR_LANE):
# Abort if the unloading process fails.
msg = (' UNLOAD ERROR NOT CLEARED')
self.ERROR.fix(msg, CUR_LANE) #send to error handling
return

# Switch to the new lane for loading.
CUR_LANE = self.printer.lookup_object('AFC_stepper ' + lane)

if lane not in self.AFC.stepper:
self.AFC.gcode.respond_info(lane + ' Unknown')
return
CUR_LANE = self.AFC.stepper[lane.name]
# Load the new lane and restore the toolhead position if successful.
if self.TOOL_LOAD(CUR_LANE) and not self.error_state:
self.gcode.respond_info("{} is now loaded in toolhead".format(lane))
Expand Down Expand Up @@ -976,30 +994,30 @@ def HexConvert(self,tmp):
def get_status(self, eventtime):
str = {}
numoflanes = 0
for UNIT in self.lanes.keys():
for UNIT in self.units.keys():
try:
screen_mac = self.printer.lookup_object('AFC_screen ' + UNIT).mac
except error:
screen_mac = 'None'
str[UNIT]={}
for NAME in self.lanes[UNIT].keys():
LANE=self.printer.lookup_object('AFC_stepper '+ NAME)
for NAME in self.units[UNIT].keys():
CUR_LANE=self.stepper[NAME]
str[UNIT][NAME]={}
str[UNIT][NAME]['LANE'] = LANE.index
str[UNIT][NAME]['map'] = LANE.map
str[UNIT][NAME]['load'] = bool(LANE.load_state)
str[UNIT][NAME]["prep"] =bool(LANE.prep_state)
str[UNIT][NAME]["tool_loaded"] = self.lanes[UNIT][NAME]['tool_loaded']
str[UNIT][NAME]["loaded_to_hub"] = self.lanes[UNIT][NAME]['hub_loaded']
str[UNIT][NAME]["material"]=self.lanes[UNIT][NAME]['material']
str[UNIT][NAME]["spool_id"]=self.lanes[UNIT][NAME]['spool_id']
str[UNIT][NAME]["color"]=self.lanes[UNIT][NAME]['color']
str[UNIT][NAME]["weight"]=self.lanes[UNIT][NAME]['weight']
str[UNIT][NAME]["runout_lane"]=self.lanes[LANE.unit][LANE.name]['runout_lane']
filiment_stat=self.get_filament_status(LANE).split(':')
str[UNIT][NAME]['LANE'] = CUR_LANE.index
str[UNIT][NAME]['map'] = CUR_LANE.map
str[UNIT][NAME]['load'] = bool(CUR_LANE.load_state)
str[UNIT][NAME]["prep"] =bool(CUR_LANE.prep_state)
str[UNIT][NAME]["tool_loaded"] = CUR_LANE.tool_loaded
str[UNIT][NAME]["loaded_to_hub"] = CUR_LANE.loaded_to_hub
str[UNIT][NAME]["material"]=CUR_LANE.material
str[UNIT][NAME]["spool_id"]=CUR_LANE.spool_id
str[UNIT][NAME]["color"]=CUR_LANE.color
str[UNIT][NAME]["weight"]=CUR_LANE.weight
str[UNIT][NAME]["runout_lane"]=CUR_LANE.runout_lane
filiment_stat=self.get_filament_status(CUR_LANE).split(':')
str[UNIT][NAME]['filament_status']=filiment_stat[0]
str[UNIT][NAME]['filament_status_led']=filiment_stat[1]
str[UNIT][NAME]['status'] = LANE.status if LANE.status is not None else ''
str[UNIT][NAME]['status'] = CUR_LANE.status if CUR_LANE.status is not None else ''
numoflanes +=1
str[UNIT]['system']={}
str[UNIT]['system']['type'] = self.printer.lookup_object('AFC_hub '+ UNIT).type
Expand All @@ -1009,17 +1027,17 @@ def get_status(self, eventtime):

str["system"]={}
str["system"]['current_load']= self.current
str["system"]['num_units'] = len(self.lanes)
str["system"]['num_units'] = len(self.units)
str["system"]['num_lanes'] = numoflanes
str["system"]['num_extruders'] = len(self.extruders)
str["system"]["extruders"]={}

for EXTRUDE in self.extruders.keys():
str["system"]["extruders"][EXTRUDE]={}
CUR_EXTRUDER = self.printer.lookup_object('AFC_extruder ' + EXTRUDE)
str["system"]["extruders"][EXTRUDE]['lane_loaded'] = self.extruders[LANE.extruder_name]['lane_loaded']
str["system"]["extruders"][EXTRUDE]['lane_loaded'] = self.extruders[CUR_LANE.extruder_name]['lane_loaded']
if CUR_EXTRUDER.tool_start == "buffer":
if self.extruders[LANE.extruder_name]['lane_loaded'] == '':
if self.extruders[CUR_LANE.extruder_name]['lane_loaded'] == '':
str ["system"]["extruders"][EXTRUDE]['tool_start_sensor'] = False
else:
str["system"]["extruders"][EXTRUDE]['tool_start_sensor'] = True
Expand Down Expand Up @@ -1070,14 +1088,13 @@ def TcmdAssign(self, CUR_LANE):
for x in range(99):
cmd = 'T'+str(x)
if cmd not in self.tool_cmds:
self.lanes[CUR_LANE.unit][CUR_LANE.name]['map'] = cmd
CUR_LANE.map = cmd
break
self.tool_cmds[self.lanes[CUR_LANE.unit][CUR_LANE.name]['map']]=CUR_LANE.name
self.tool_cmds[CUR_LANE.map]=CUR_LANE.name
try:
self.gcode.register_command(self.lanes[CUR_LANE.unit][CUR_LANE.name]['map'], self.cmd_CHANGE_TOOL, desc=self.cmd_CHANGE_TOOL_help)
self.gcode.register_command(CUR_LANE.map, self.cmd_CHANGE_TOOL, desc=self.cmd_CHANGE_TOOL_help)
except:
self.gcode.respond_info("Error trying to map lane {lane} to {tool_macro}, please make sure there are no macros already setup for {tool_macro}".format(lane=[CUR_LANE.name], tool_macro=self.lanes[CUR_LANE.unit][CUR_LANE.name]['map']), )
self.gcode.respond_info("Error trying to map lane {lane} to {tool_macro}, please make sure there are no macros already setup for {tool_macro}".format(lane=[CUR_LANE.name], tool_macro=CUR_LANE.map), )
self.save_vars()

def load_config(config):
Expand Down
Loading
Loading