From c540ffbaa4b0f3a44437a47c2bc89490eed8924f Mon Sep 17 00:00:00 2001 From: Akinlua Bolamigbe <41027615+alob-mtc@users.noreply.github.com> Date: Mon, 1 May 2023 10:00:21 +0000 Subject: [PATCH 1/2] added an event loop and reduced unnecessary goroutine creation --- .gitignore | 3 +- eventloop.go | 30 +++++++++++++++++ promise.go | 91 +++++++++++++++++++++++++++------------------------- 3 files changed, 79 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index 62c8935..f498092 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.idea/ \ No newline at end of file +.idea/ +tmp/ \ No newline at end of file diff --git a/eventloop.go b/eventloop.go index 9f9b50a..64c94cc 100644 --- a/eventloop.go +++ b/eventloop.go @@ -14,9 +14,37 @@ var ( ) func Init() { + queue := []func() bool{} + var lock sync.Mutex once.Do(func() { GlobalEventLoop = &eventLoop{promiseQueue: make([]*Promise, 0)} }) + // TODO: add evnt loop + go func() { + for work := range eventBus { + lock.Lock() + queue = append(queue, *work) + lock.Unlock() + } + }() + + go func() { + for { + n := len(queue) + retry := []func() bool{} + for i := 0; i < n; i++ { + currentWork := queue[i] + + if done := currentWork(); !done { + retry = append(retry, currentWork) + } + } + lock.Lock() + queue = queue[n:] + queue = append(queue, retry...) + lock.Unlock() + } + }() } func GetGlobalEventLoop() Future { @@ -39,6 +67,7 @@ func (e *eventLoop) Await(currentP *Promise) (interface{}, error) { defer currentP.Done() currentP.RegisterHandler() select { + case err := <-currentP.errChan: return nil, err case rev := <-currentP.rev: @@ -108,6 +137,7 @@ outer: <-p.done } if currentN := int(atomic.LoadInt64(&e.size)); i == 0 && !(currentN > n) { + close(eventBus) break outer } } diff --git a/promise.go b/promise.go index 68d6576..23329f2 100644 --- a/promise.go +++ b/promise.go @@ -5,13 +5,15 @@ import ( "sync/atomic" ) +var eventBus = make(chan *func() bool) + type Promise struct { - id uint64 - handler bool - rev <-chan interface{} - errChan chan error - err bool - done chan struct{} + id uint64 + handler bool + rev <-chan interface{} + errChan chan error + doneFlag bool + done chan struct{} } func (e *eventLoop) newPromise(rev <-chan interface{}, errChan chan error) *Promise { @@ -22,7 +24,7 @@ func (e *eventLoop) newPromise(rev <-chan interface{}, errChan chan error) *Prom return currentP } -func (p *Promise) Done() { +func (p *Promise) Done() { close(p.done) } @@ -32,51 +34,52 @@ func (p *Promise) RegisterHandler() { func (p *Promise) Then(fn func(interface{})) *Promise { p.RegisterHandler() - go func() { - for { - select { - default: - if p.err { - return - } - case val := <-p.rev: - func() { - defer func() { - if r := recover(); r != nil { - switch x := r.(type) { - case error: - p.errChan <- x - default: - p.errChan <- fmt.Errorf("%v", x) - } - } else { - p.err = true - p.Done() + work := func() bool { + select { + default: + if p.doneFlag { + return true + } + return false + case val := <-p.rev: + go func() { + defer func() { + if r := recover(); r != nil { + switch x := r.(type) { + case error: + p.errChan <- x + default: + p.errChan <- fmt.Errorf("%v", x) } - }() - fn(val) + } else { + p.doneFlag = true + p.Done() + } }() - return - } + fn(val) + }() + return true } - }() + } + eventBus <- &work return p } func (p *Promise) Catch(fn func(err error)) { p.RegisterHandler() - go func() { - for { - select { - default: - if p.err { - return - } - case err := <-p.errChan: - fn(err) - p.err = true - p.Done() + work := func() bool { + select { + default: + if p.doneFlag { + return true } + return false + case err := <-p.errChan: + p.doneFlag = true + go fn(err) + p.Done() + return true } - }() + } + eventBus <- &work } From ec6635ea7862300cc739ae19e0adde180a9f6ee5 Mon Sep 17 00:00:00 2001 From: Akinlua Bolamigbe <41027615+alob-mtc@users.noreply.github.com> Date: Mon, 1 May 2023 11:22:21 +0000 Subject: [PATCH 2/2] clean up fuion signature --- eventloop.go | 57 +++++++++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/eventloop.go b/eventloop.go index 64c94cc..7e63b6d 100644 --- a/eventloop.go +++ b/eventloop.go @@ -13,41 +13,38 @@ var ( GlobalEventLoop *eventLoop ) -func Init() { - queue := []func() bool{} - var lock sync.Mutex +func InitGlobalEventLoop() Future { once.Do(func() { - GlobalEventLoop = &eventLoop{promiseQueue: make([]*Promise, 0)} - }) - // TODO: add evnt loop - go func() { - for work := range eventBus { - lock.Lock() - queue = append(queue, *work) - lock.Unlock() - } - }() + queue := []func() bool{} + var lock sync.Mutex + // event loop + go func() { + for work := range eventBus { + lock.Lock() + queue = append(queue, *work) + lock.Unlock() + } + }() - go func() { - for { - n := len(queue) - retry := []func() bool{} - for i := 0; i < n; i++ { - currentWork := queue[i] + go func() { + for { + n := len(queue) + retry := []func() bool{} + for i := 0; i < n; i++ { + currentWork := queue[i] - if done := currentWork(); !done { - retry = append(retry, currentWork) + if done := currentWork(); !done { + retry = append(retry, currentWork) + } } + lock.Lock() + queue = queue[n:] + queue = append(queue, retry...) + lock.Unlock() } - lock.Lock() - queue = queue[n:] - queue = append(queue, retry...) - lock.Unlock() - } - }() -} - -func GetGlobalEventLoop() Future { + }() + GlobalEventLoop = &eventLoop{promiseQueue: make([]*Promise, 0)} + }) return GlobalEventLoop }