Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workign on state machine debugging errors, test will fail #2

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 87 additions & 30 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,66 @@ const (
TRIGGER_STATE_NEW = "new"
)

type States[AC any, OC any, JC any] struct {
states []State[AC, OC, JC]
StateMap map[string]State[AC, OC, JC]
stateNames []string
}

func (s States[AC, OC, JC]) States() []State[AC, OC, JC] {
return s.states
}

func (s States[AC, OC, JC]) StateNames() []string {
return s.stateNames
}

func NewStates[AC any, OC any, JC any](states []State[AC, OC, JC], validateExitStates bool) (States[AC, OC, JC], error) {
r := States[AC, OC, JC]{
states: states,
}

// Make a map of triggers to states so we can easily reference it
r.StateMap = map[string]State[AC, OC, JC]{}
for _, s := range states {
r.StateMap[s.TriggerState] = s
}
// get a list of return state names for use
r.stateNames = make([]string, 0, len(r.StateMap))
for k := range r.StateMap {
r.stateNames = append(r.stateNames, k)
}

// validate the states
for _, s := range states {
if !s.Terminal && s.Exec == nil {
return States[AC, OC, JC]{}, fmt.Errorf("State %s is non-terminal but has no Exec function", s.TriggerState)
}
if validateExitStates {
if s.Terminal {
continue
}
if len(s.ExitStates) == 0 {
return States[AC, OC, JC]{}, fmt.Errorf("ValidateExitStates: invalid State machine, state %s is non-terminal but has no ExitStates", s.TriggerState)
}
otherStates := false
for _, exit := range s.ExitStates {
if s.TriggerState != exit {
otherStates = true
}
if _, ok := r.StateMap[exit]; !ok {
return States[AC, OC, JC]{}, fmt.Errorf("invalid exit state [%s] for state %s", exit, s.TriggerState)
}
}
if !otherStates {
return States[AC, OC, JC]{}, fmt.Errorf("ValidateExitStates: invalid State machine, state %s is non-terminal but has no ExitStates other than self", TRIGGER_STATE_NEW)
}
}
}

return r, nil
}

// State represents a state in a state machine for job processing.
// It defines the behavior and configuration for a particular state.
type State[AC any, OC any, JC any] struct {
Expand All @@ -38,6 +98,9 @@ type State[AC any, OC any, JC any] struct {

// RateLimit is an optional rate limiter for controlling the execution rate of this state. Useful when calling rate limited apis.
RateLimit *rate.Limiter

// A list of valid exit states for use when in validation mode on the processor
ExitStates []string
}

// KickRequest struct is a job context with a requested state that the
Expand All @@ -56,18 +119,17 @@ type StatusCount struct {

// Processor executes a job
type Processor[AC any, OC any, JC any] struct {
appContext AC
states []State[AC, OC, JC]
serializer Serializer[OC, JC]
statusListener StatusListener
initted bool
stateMap map[string]State[AC, OC, JC]
stateNames []string
stateChan map[string]chan Job[JC]
returnChan chan Return[JC]
appContext AC
states States[AC, OC, JC]
serializer Serializer[OC, JC]
statusListener StatusListener
initted bool
stateChan map[string]chan Job[JC]
returnChan chan Return[JC]
ValidateExitStates bool
}

func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] {
func NewProcessor[AC any, OC any, JC any](ac AC, states States[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] {
return &Processor[AC, OC, JC]{
appContext: ac,
states: states,
Expand All @@ -84,33 +146,24 @@ type Return[JC any] struct {
Error error
}

func (p *Processor[AC, OC, JC]) init() {
func (p *Processor[AC, OC, JC]) init() error {
if p.initted {
return
return nil
}

if p.serializer == nil {
p.serializer = &NilSerializer[OC, JC]{}
}
if p.statusListener == nil {
p.statusListener = &NilStatusListener{}
}
// Make a map of triggers to states so we can easily reference it
p.stateMap = map[string]State[AC, OC, JC]{}
for _, s := range p.states {
p.stateMap[s.TriggerState] = s
}
// get a list of return state names for use
p.stateNames = make([]string, 0, len(p.stateMap))
for k := range p.stateMap {
p.stateNames = append(p.stateNames, k)
}

// For each state, we need a channel of jobs
p.stateChan = map[string]chan Job[JC]{}

// Create the state chans
totalConcurrency := 0
for _, s := range p.states {
for _, s := range p.states.States() {
if s.Terminal {
continue
}
Expand All @@ -120,11 +173,15 @@ func (p *Processor[AC, OC, JC]) init() {

// When a job changes state, we send it to this channel to centrally manage and re-queue
p.returnChan = make(chan Return[JC], totalConcurrency*2) // make it the size of the total amount of in flight jobs we could have so that each worker can return a task
return nil
}

// Exec this big work function, this does all the crunching
func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error {
p.init()
err := p.init()
if err != nil {
return err
}

if p.allJobsAreTerminal(r) {
slog.Info("AllJobsTerminal")
Expand All @@ -134,7 +191,7 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error
wg := sync.WaitGroup{}

// create the workers
for _, s := range p.states {
for _, s := range p.states.States() {
// Terminal states don't need to recieve jobs, they're just done
if s.Terminal {
continue
Expand Down Expand Up @@ -233,7 +290,7 @@ func (p *Processor[AC, OC, JC]) updateStatusCounts(r *Run[OC, JC]) {

ret := []StatusCount{}

for _, state := range p.states {
for _, state := range p.states.States() {
if _, ok := counts[state.TriggerState]; !ok {
ret = append(ret, StatusCount{
State: state.TriggerState,
Expand All @@ -252,7 +309,7 @@ func (p *Processor[AC, OC, JC]) updateStatusCounts(r *Run[OC, JC]) {

func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool {
c := r.StatusCounts()
for _, k := range p.states {
for _, k := range p.states.States() {
if k.Terminal {
continue
}
Expand All @@ -266,7 +323,7 @@ func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool {
func (p *Processor[AC, OC, JC]) enqueueAllJobs(r *Run[OC, JC]) {
slog.Info("Enqueing Jobs", "jobCount", len(r.Jobs))
enqueued := 0
for _, state := range p.states {
for _, state := range p.states.States() {
enqueued += p.enqueueJobsForState(r, state) // mutates r.Jobs
}
slog.Info("All Queues Primed", "jobCount", len(r.Jobs), "enqueuedCount", enqueued)
Expand Down Expand Up @@ -319,7 +376,7 @@ func (p *Processor[AC, OC, JC]) kickJobs(rtn Return[JC], j Job[JC], r *Run[OC, J
}

// validate it
_, ok := p.stateMap[newJob.State]
_, ok := p.states.StateMap[newJob.State]
if !ok {
log.Fatal(p.invalidStateError(newJob.State))
}
Expand Down Expand Up @@ -379,5 +436,5 @@ func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, r *Run[OC, JC], s
}

func (p *Processor[AC, OC, JC]) invalidStateError(s string) error {
return fmt.Errorf("State [%s] has no executor, valid state names: %s", s, strings.Join(p.stateNames, ", "))
return fmt.Errorf("State [%s] has no executor, valid state names: %s", s, strings.Join(p.states.StateNames(), ", "))
}
Loading
Loading