Skip to content

Commit

Permalink
add subscription channel descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
retrozoid committed Jul 1, 2024
1 parent 68e146b commit 0548566
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
11 changes: 9 additions & 2 deletions cdp/broker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package cdp

import (
"fmt"
"sync"
)

var BrokerChannelSize = 50000

type subscriber struct {
sessionID string
desc string
channel chan Message
}

Expand Down Expand Up @@ -55,14 +57,18 @@ func (b broker) run() {
case message := <-b.messages:
for _, subscriber := range value {
if message.SessionID == "" || subscriber.sessionID == "" || message.SessionID == subscriber.sessionID {
subscriber.channel <- message
select {
case subscriber.channel <- message:
default:
fmt.Println(subscriber.desc, "channel is full")
}
}
}
}
}
}

func (b broker) subscribe(sessionID string) chan Message {
func (b broker) subscribe(sessionID, desc string) chan Message {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -72,6 +78,7 @@ func (b broker) subscribe(sessionID string) chan Message {
default:
sub := subscriber{
sessionID: sessionID,
desc: desc,
channel: make(chan Message, BrokerChannelSize),
}
b.sub <- sub
Expand Down
4 changes: 2 additions & 2 deletions cdp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (t *Transport) gracefullyClose() {
}
}

func (t *Transport) Subscribe(sessionID string) (chan Message, func()) {
channel := t.broker.subscribe(sessionID)
func (t *Transport) Subscribe(sessionID, desc string) (chan Message, func()) {
channel := t.broker.subscribe(sessionID, desc)
return channel, func() {
if channel != nil {
t.broker.unsubscribe(channel)
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"

Expand Down Expand Up @@ -47,7 +48,7 @@ func TakeWithContext(ctx context.Context, logger *slog.Logger, chromeArgs ...str

func Subscribe[T any](s *Session, method string, filter func(T) bool) cdp.Future[T] {
var (
channel, cancel = s.Subscribe()
channel, cancel = s.Subscribe(fmt.Sprintf("%s-listener", method))
)
callback := func(resolve func(T), reject func(error)) {
for value := range channel {
Expand Down
9 changes: 5 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -124,8 +125,8 @@ func (s *Session) Call(method string, send, recv any) error {
return nil
}

func (s *Session) Subscribe() (channel chan cdp.Message, cancel func()) {
return s.transport.Subscribe(s.sessionID)
func (s *Session) Subscribe(desc string) (channel chan cdp.Message, cancel func()) {
return s.transport.Subscribe(s.sessionID, desc)
}

func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, error) {
Expand All @@ -152,7 +153,7 @@ func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, e
return nil, err
}
session.sessionID = string(val.SessionId)
channel, unsubscribe := session.Subscribe()
channel, unsubscribe := session.Subscribe("session-core-handler")
go func() {
if err := session.handle(channel); err != nil {
unsubscribe()
Expand Down Expand Up @@ -228,7 +229,7 @@ func (s *Session) handle(channel chan cdp.Message) error {
}

func (s *Session) funcCalled(fn string) cdp.Future[runtime.BindingCalled] {
var channel, cancel = s.Subscribe()
var channel, cancel = s.Subscribe(fmt.Sprintf("func-%s-called-listener", fn))
callback := func(resolve func(runtime.BindingCalled), reject func(error)) {
for value := range channel {
if value.Method == "Runtime.bindingCalled" {
Expand Down

0 comments on commit 0548566

Please sign in to comment.