This repository has been archived by the owner on Nov 23, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
instance.go
175 lines (145 loc) · 4.02 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright © 2019 Developer Network, LLC
//
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.
package engine
import (
"context"
"time"
"go.devnw.com/validator"
)
type instance struct {
electron *Electron
conductor Conductor
atom Atom
properties *Properties
ctx context.Context
cancel context.CancelFunc
// TODO: add an actions channel here that the monitor can keep
// an eye on for this bonded electron/atom combo
}
// bond bonds an instance of an electron with an instance of the
// corresponding atom in the atomizer registrations such that
// the execute method of the instance can properly exercise the
// Process method of the interface
func (i *instance) bond(atom Atom) (err error) {
if err = validator.Assert(
i.electron,
i.conductor,
atom,
); err != nil {
return &Error{
Event: &Event{
Message: "error while bonding atom instance",
AtomID: ID(atom),
},
Internal: err,
}
}
// register the atom internally because
// the instance is valid
i.atom = atom
return nil
}
// complete marks the completion of execution and pushes
// the results to the conductor
func (i *instance) complete() error {
// Set the end time and status in the properties
i.properties.End = time.Now()
if !validator.Valid(i.conductor) {
return &Error{
Event: &Event{
Message: "conductor validation failed",
AtomID: ID(i.atom),
ElectronID: i.electron.ID,
},
}
}
// Push the completed instance properties to the conductor
return i.conductor.Complete(i.ctx, i.properties)
}
// execute runs the process method on the bonded atom / electron pair
func (i *instance) execute(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = &Error{
Event: &Event{
Message: "panic in atomizer",
AtomID: ID(i.atom),
ElectronID: i.electron.ID,
},
Internal: ptoe(r),
}
return
}
// ensure that when this method exits the completion
// of this instance takes place and is pushed to the
// conductor
err = i.complete()
}()
// ensure the instance is valid before attempting
// to execute processing
if !validator.Valid(i) {
return &Error{
Event: &Event{
Message: "instance validation failed",
AtomID: ID(i.atom),
},
}
}
// Establish internal context
i.ctx, i.cancel = _ctxT(ctx, i.electron.Timeout)
i.properties = &Properties{
ElectronID: i.electron.ID,
AtomID: ID(i.atom),
Start: time.Now(),
}
// TODO: Setup with a heartbeat for monitoring processing of the
// bonded atom stream in from the process method
// Execute the process method of the atom
i.properties.Result, i.properties.Error = i.atom.Process(
i.ctx, i.conductor, i.electron)
// TODO: The processing has finished for this bonded atom and the
// results need to be calculated and the properties sent back to the
// conductor
// TODO: Ensure a is the proper thing to do here?? I think it needs
// to close a out at the conductor rather than here... unless the
// conductor overrode the call back
// TODO: Execute the callback with the notification here?
// TODO: determine if this is the correct location or if this is \
// something that should be handled purely by the conductor
// TODO: Handle this properly
// if inst.electron.Resp != nil {
// // Drop the return for this electron onto the channel
// // to be sent back to the requester
// inst.electron.Resp <- inst.properties
// }
return nil
}
// Validate ensures that the instance has the correct
// non-nil values internally so that it functions properly
func (i *instance) Validate() (valid bool) {
if i != nil {
if validator.Valid(
i.electron,
i.conductor,
i.atom) {
valid = true
}
}
return valid
}
func NewTime(ctx context.Context) <-chan time.Time {
tchan := make(chan time.Time)
go func(tchan chan<- time.Time) {
defer close(tchan)
for {
select {
case <-ctx.Done():
return
case tchan <- time.Now():
}
}
}(tchan)
return tchan
}