Skip to content

Commit

Permalink
Revert "add subscription channel descriptor"
Browse files Browse the repository at this point in the history
This reverts commit 0548566.
  • Loading branch information
retrozoid committed Jul 2, 2024
1 parent 0548566 commit 5530eb3
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 18 deletions.
11 changes: 2 additions & 9 deletions cdp/broker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package cdp

import (
"fmt"
"sync"
)

var BrokerChannelSize = 50000

type subscriber struct {
sessionID string
desc string
channel chan Message
}

Expand Down Expand Up @@ -57,18 +55,14 @@ func (b broker) run() {
case message := <-b.messages:
for _, subscriber := range value {
if message.SessionID == "" || subscriber.sessionID == "" || message.SessionID == subscriber.sessionID {
select {
case subscriber.channel <- message:
default:
fmt.Println(subscriber.desc, "channel is full")
}
subscriber.channel <- message
}
}
}
}
}

func (b broker) subscribe(sessionID, desc string) chan Message {
func (b broker) subscribe(sessionID string) chan Message {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -78,7 +72,6 @@ func (b broker) subscribe(sessionID, desc string) chan Message {
default:
sub := subscriber{
sessionID: sessionID,
desc: desc,
channel: make(chan Message, BrokerChannelSize),
}
b.sub <- sub
Expand Down
4 changes: 2 additions & 2 deletions cdp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (t *Transport) gracefullyClose() {
}
}

func (t *Transport) Subscribe(sessionID, desc string) (chan Message, func()) {
channel := t.broker.subscribe(sessionID, desc)
func (t *Transport) Subscribe(sessionID string) (chan Message, func()) {
channel := t.broker.subscribe(sessionID)
return channel, func() {
if channel != nil {
t.broker.unsubscribe(channel)
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"

Expand Down Expand Up @@ -48,7 +47,7 @@ func TakeWithContext(ctx context.Context, logger *slog.Logger, chromeArgs ...str

func Subscribe[T any](s *Session, method string, filter func(T) bool) cdp.Future[T] {
var (
channel, cancel = s.Subscribe(fmt.Sprintf("%s-listener", method))
channel, cancel = s.Subscribe()
)
callback := func(resolve func(T), reject func(error)) {
for value := range channel {
Expand Down
9 changes: 4 additions & 5 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -125,8 +124,8 @@ func (s *Session) Call(method string, send, recv any) error {
return nil
}

func (s *Session) Subscribe(desc string) (channel chan cdp.Message, cancel func()) {
return s.transport.Subscribe(s.sessionID, desc)
func (s *Session) Subscribe() (channel chan cdp.Message, cancel func()) {
return s.transport.Subscribe(s.sessionID)
}

func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, error) {
Expand All @@ -153,7 +152,7 @@ func NewSession(transport *cdp.Transport, targetID target.TargetID) (*Session, e
return nil, err
}
session.sessionID = string(val.SessionId)
channel, unsubscribe := session.Subscribe("session-core-handler")
channel, unsubscribe := session.Subscribe()
go func() {
if err := session.handle(channel); err != nil {
unsubscribe()
Expand Down Expand Up @@ -229,7 +228,7 @@ func (s *Session) handle(channel chan cdp.Message) error {
}

func (s *Session) funcCalled(fn string) cdp.Future[runtime.BindingCalled] {
var channel, cancel = s.Subscribe(fmt.Sprintf("func-%s-called-listener", fn))
var channel, cancel = s.Subscribe()
callback := func(resolve func(runtime.BindingCalled), reject func(error)) {
for value := range channel {
if value.Method == "Runtime.bindingCalled" {
Expand Down

0 comments on commit 5530eb3

Please sign in to comment.