Skip to content

Commit

Permalink
Merge pull request #10 from alob-mtc/event-loop
Browse files Browse the repository at this point in the history
added an event loop and reduced unnecessary goroutine creation
  • Loading branch information
alob-mtc authored May 1, 2023
2 parents 5421a67 + ec6635e commit 12b86c2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 49 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea/
.idea/
tmp/
35 changes: 31 additions & 4 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,38 @@ var (
GlobalEventLoop *eventLoop
)

func Init() {
func InitGlobalEventLoop() Future {
once.Do(func() {
queue := []func() bool{}
var lock sync.Mutex
// event loop
go func() {
for work := range eventBus {
lock.Lock()
queue = append(queue, *work)
lock.Unlock()
}
}()

go func() {
for {
n := len(queue)
retry := []func() bool{}
for i := 0; i < n; i++ {
currentWork := queue[i]

if done := currentWork(); !done {
retry = append(retry, currentWork)
}
}
lock.Lock()
queue = queue[n:]
queue = append(queue, retry...)
lock.Unlock()
}
}()
GlobalEventLoop = &eventLoop{promiseQueue: make([]*Promise, 0)}
})
}

func GetGlobalEventLoop() Future {
return GlobalEventLoop
}

Expand All @@ -39,6 +64,7 @@ func (e *eventLoop) Await(currentP *Promise) (interface{}, error) {
defer currentP.Done()
currentP.RegisterHandler()
select {

case err := <-currentP.errChan:
return nil, err
case rev := <-currentP.rev:
Expand Down Expand Up @@ -108,6 +134,7 @@ outer:
<-p.done
}
if currentN := int(atomic.LoadInt64(&e.size)); i == 0 && !(currentN > n) {
close(eventBus)
break outer
}
}
Expand Down
91 changes: 47 additions & 44 deletions promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"sync/atomic"
)

var eventBus = make(chan *func() bool)

type Promise struct {
id uint64
handler bool
rev <-chan interface{}
errChan chan error
err bool
done chan struct{}
id uint64
handler bool
rev <-chan interface{}
errChan chan error
doneFlag bool
done chan struct{}
}

func (e *eventLoop) newPromise(rev <-chan interface{}, errChan chan error) *Promise {
Expand All @@ -22,7 +24,7 @@ func (e *eventLoop) newPromise(rev <-chan interface{}, errChan chan error) *Prom
return currentP
}

func (p *Promise) Done() {
func (p *Promise) Done() {
close(p.done)
}

Expand All @@ -32,51 +34,52 @@ func (p *Promise) RegisterHandler() {

func (p *Promise) Then(fn func(interface{})) *Promise {
p.RegisterHandler()
go func() {
for {
select {
default:
if p.err {
return
}
case val := <-p.rev:
func() {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case error:
p.errChan <- x
default:
p.errChan <- fmt.Errorf("%v", x)
}
} else {
p.err = true
p.Done()
work := func() bool {
select {
default:
if p.doneFlag {
return true
}
return false
case val := <-p.rev:
go func() {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case error:
p.errChan <- x
default:
p.errChan <- fmt.Errorf("%v", x)
}
}()
fn(val)
} else {
p.doneFlag = true
p.Done()
}
}()
return
}
fn(val)
}()
return true
}
}()
}
eventBus <- &work
return p
}

func (p *Promise) Catch(fn func(err error)) {
p.RegisterHandler()
go func() {
for {
select {
default:
if p.err {
return
}
case err := <-p.errChan:
fn(err)
p.err = true
p.Done()
work := func() bool {
select {
default:
if p.doneFlag {
return true
}
return false
case err := <-p.errChan:
p.doneFlag = true
go fn(err)
p.Done()
return true
}
}()
}
eventBus <- &work
}

0 comments on commit 12b86c2

Please sign in to comment.