This repository has been archived by the owner on Nov 23, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
atomizer_exported.go
143 lines (121 loc) · 3.53 KB
/
atomizer_exported.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright © 2019 Developer Network, LLC
//
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.
package engine
import (
"context"
"fmt"
"go.devnw.com/event"
"go.devnw.com/validator"
)
// Atomizer interface implementation
type Atomizer interface {
Exec() error
Register(value ...interface{}) error
Wait()
Events(buffer int) event.EventStream
Errors(buffer int) event.ErrorStream
// private methods enforce only this
// package can return an atomizer
isAtomizer()
}
// Atomize initialize instance of the atomizer to start reading from
// conductors and execute bonded electrons/atoms
//
// NOTE: Registrations can be added through this method and OVERRIDE any
// existing registrations of the same Atom or Conductor.
func Atomize(
ctx context.Context,
registrations ...interface{},
) (Atomizer, error) {
err := Register(registrations...)
if err != nil {
return nil, err
}
ctx, cancel := _ctx(ctx)
return &atomizer{
ctx: ctx,
cancel: cancel,
electrons: make(chan instance),
bonded: make(chan instance),
registrations: make(chan interface{}),
atoms: make(map[string]chan<- instance),
publisher: event.NewPublisher(ctx),
}, nil
}
func (*atomizer) isAtomizer() {}
// Events returns an go.devnw.com/event.EventStream for the atomizer
func (a *atomizer) Events(buffer int) event.EventStream { return a.publisher.ReadEvents(buffer) }
// Errors returns an go.devnw.com/event.ErrorStream for the atomizer
func (a *atomizer) Errors(buffer int) event.ErrorStream { return a.publisher.ReadErrors(buffer) }
// Exec kicks off the processing of the atomizer by pulling in the
// pre-registrations through init calls on imported libraries and
// starts up the receivers for atoms and conductors
func (a *atomizer) Exec() (err error) {
// Execute on the atomizer should only ever be run once
a.execSyncOnce.Do(func() {
defer a.publisher.EventFunc(a.ctx, func() event.Event {
return makeEvent("pulling conductor and atom registrations")
})
// Initialize the registrations in the Atomizer package
for _, r := range Registrations() {
a.register(r)
}
// Start up the receivers
go a.receive()
// Setup the distribution loop for incoming electrons
// so that they can be properly fanned out to the
// atom receivers
go a.distribute()
// TODO: Setup the instance receivers for monitoring of
// individual instances as well as sending of outbound
// electrons
})
return err
}
// Register allows you to add additional type registrations to the atomizer
// (ie. Conductors and Atoms)
func (a *atomizer) Register(values ...interface{}) (err error) {
defer func() {
if r := recover(); r != nil {
err = &Error{
Event: &Event{
Message: "panic in atomizer",
},
Internal: ptoe(r),
}
}
}()
for _, value := range values {
if !validator.Valid(value) {
// TODO: create event here indicating that
// a value was invalid and not registered
continue
}
switch v := value.(type) {
case Conductor, Atom:
// Pass the value on the registrations
// channel to be received
select {
case <-a.ctx.Done():
return simple("context closed", nil)
case a.registrations <- v:
}
default:
return simple(
fmt.Sprintf(
"invalid value in registration %s",
ID(value),
),
nil,
)
}
}
return err
}
// Wait blocks on the context done channel to allow for the executable
// to block for the atomizer to finish processing
func (a *atomizer) Wait() {
<-a.ctx.Done()
}