From 5530eb3a7ff1781a17a3e5847938828b8043e8ff Mon Sep 17 00:00:00 2001 From: zoid Date: Tue, 2 Jul 2024 09:10:38 +0400 Subject: [PATCH] Revert "add subscription channel descriptor" This reverts commit 05485660823dbf2e9ccfa4285b92d7a5ffa3c44a. --- cdp/broker.go | 11 ++--------- cdp/transport.go | 4 ++-- main.go | 3 +-- session.go | 9 ++++----- 4 files changed, 9 insertions(+), 18 deletions(-) diff --git a/cdp/broker.go b/cdp/broker.go index b6d1137..d552cc3 100644 --- a/cdp/broker.go +++ b/cdp/broker.go @@ -1,7 +1,6 @@ package cdp import ( - "fmt" "sync" ) @@ -9,7 +8,6 @@ var BrokerChannelSize = 50000 type subscriber struct { sessionID string - desc string channel chan Message } @@ -57,18 +55,14 @@ func (b broker) run() { case message := <-b.messages: for _, subscriber := range value { if message.SessionID == "" || subscriber.sessionID == "" || message.SessionID == subscriber.sessionID { - select { - case subscriber.channel <- message: - default: - fmt.Println(subscriber.desc, "channel is full") - } + subscriber.channel <- message } } } } } -func (b broker) subscribe(sessionID, desc string) chan Message { +func (b broker) subscribe(sessionID string) chan Message { b.lock.Lock() defer b.lock.Unlock() @@ -78,7 +72,6 @@ func (b broker) subscribe(sessionID, desc string) chan Message { default: sub := subscriber{ sessionID: sessionID, - desc: desc, channel: make(chan Message, BrokerChannelSize), } b.sub <- sub diff --git a/cdp/transport.go b/cdp/transport.go index 74c97f4..7bde73a 100644 --- a/cdp/transport.go +++ b/cdp/transport.go @@ -96,8 +96,8 @@ func (t *Transport) gracefullyClose() { } } -func (t *Transport) Subscribe(sessionID, desc string) (chan Message, func()) { - channel := t.broker.subscribe(sessionID, desc) +func (t *Transport) Subscribe(sessionID string) (chan Message, func()) { + channel := t.broker.subscribe(sessionID) return channel, func() { if channel != nil { t.broker.unsubscribe(channel) diff --git a/main.go b/main.go index 8db9aa4..8e81d77 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log/slog" "net/http" @@ -48,7 +47,7 @@ func TakeWithContext(ctx context.Context, logger *slog.Logger, chromeArgs ...str func Subscribe[T any](s *Session, method string, filter func(T) bool) cdp.Future[T] { var ( - channel, cancel = s.Subscribe(fmt.Sprintf("%s-listener", method)) + channel, cancel = s.Subscribe() ) callback := func(resolve func(T), reject func(error)) { for value := range channel { diff --git a/session.go b/session.go index 535a591..184d98d 100644 --- a/session.go +++ b/session.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log/slog" "sync" "time" @@ -125,8 +124,8 @@ func (s *Session) Call(method string, send, recv any) error { return nil } -func (s *Session) Subscribe(desc string) (channel chan cdp.Message, cancel func()) { - return s.transport.Subscribe(s.sessionID, desc) +func (s *Session) Subscribe() (channel chan cdp.Message, cancel func()) { + return s.transport.Subscribe(s.sessionID) } func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, error) { @@ -153,7 +152,7 @@ func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, e return nil, err } session.sessionID = string(val.SessionId) - channel, unsubscribe := session.Subscribe("session-core-handler") + channel, unsubscribe := session.Subscribe() go func() { if err := session.handle(channel); err != nil { unsubscribe() @@ -229,7 +228,7 @@ func (s *Session) handle(channel chan cdp.Message) error { } func (s *Session) funcCalled(fn string) cdp.Future[runtime.BindingCalled] { - var channel, cancel = s.Subscribe(fmt.Sprintf("func-%s-called-listener", fn)) + var channel, cancel = s.Subscribe() callback := func(resolve func(runtime.BindingCalled), reject func(error)) { for value := range channel { if value.Method == "Runtime.bindingCalled" {