Skip to content

Commit

Permalink
pkg spec: spec.WSStreamConn -> spec.Conn
Browse files Browse the repository at this point in the history
  • Loading branch information
rkonfj committed Jun 2, 2023
1 parent 18a108e commit 41e0bc9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
10 changes: 5 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ func (c *TohClient) DialContext(ctx context.Context, network, address string) (n
if err != nil {
return nil, err
}
return spec.NewWSStreamConn(conn, addr), nil
return spec.NewConn(conn, addr), nil
case "udp", "udp4", "udp6":
conn, addr, err := c.dial(ctx, network, address)
if err != nil {
return nil, err
}
return &spec.PacketConnWrapper{Conn: spec.NewWSStreamConn(conn, addr)}, nil
return &spec.PacketConnWrapper{Conn: spec.NewConn(conn, addr)}, nil
default:
return nil, errors.New("unsupport network " + network)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (c *TohClient) dnsExchange(dnServer string, query *dns.Msg) (resp *dns.Msg,
}

func (c *TohClient) dial(ctx context.Context, network, addr string) (
wsConn *wsConn, remoteAddr net.Addr, err error) {
conn spec.StreamConn, remoteAddr net.Addr, err error) {
handshake := http.Header{}
handshake.Add(spec.HeaderHandshakeKey, c.options.Key)
handshake.Add(spec.HeaderHandshakeNet, network)
Expand All @@ -238,13 +238,13 @@ func (c *TohClient) dial(ctx context.Context, network, addr string) (

t1 := time.Now()

wsConn, respHeader, err := c.dialWS(ctx, c.options.Server, handshake)
conn, respHeader, err := c.dialWS(ctx, c.options.Server, handshake)
if err != nil {
return
}
logrus.Debugf("%s://%s established successfully, toh latency %s", network, addr, time.Since(t1))

go c.newPingLoop(wsConn)
go c.newPingLoop(conn.(*wsConn))
estAddr := respHeader.Get(spec.HeaderEstablishAddr)
if len(estAddr) == 0 {
estAddr = "0.0.0.0:0"
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s TohServer) HandleUpgradeWebSocket(w http.ResponseWriter, r *http.Request
}

go func() {
lbc, rbc := s.pipe(spec.NewWSStreamConn(&wsConn{conn: conn}, conn.RemoteAddr()), netConn)
lbc, rbc := s.pipe(spec.NewConn(&wsConn{conn: conn}, conn.RemoteAddr()), netConn)
s.trafficEventChan <- &TrafficEvent{
In: lbc,
Out: rbc,
Expand All @@ -108,7 +108,7 @@ func (s TohServer) HandleUpgradeWebSocket(w http.ResponseWriter, r *http.Request
}()
}

func (s *TohServer) pipe(wsConn *spec.WSStreamConn, netConn net.Conn) (lbc, rbc int64) {
func (s *TohServer) pipe(wsConn *spec.Conn, netConn net.Conn) (lbc, rbc int64) {
if wsConn == nil || netConn == nil {
return
}
Expand Down
42 changes: 21 additions & 21 deletions spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,34 @@ type TohClient interface {
LookupIP6(host string) (ips []net.IP, err error)
}

// WSConn websocket connection which used to read, write and close data
type WSConn interface {
// StreamConn under layer transport connection. .i.e websocket
type StreamConn interface {
Read(ctx context.Context) ([]byte, error)
Write(ctx context.Context, p []byte) error
LocalAddr() net.Addr
Close(code int, reason string) error
}

// WSStreamConn tcp/udp connection based on websocket connection
type WSStreamConn struct {
wsConn WSConn
// Conn tcp/udp connection based on StreamConn connection
type Conn struct {
conn StreamConn
addr net.Addr
readDeadline *time.Time
writeDeadline *time.Time
buf []byte
}

func NewWSStreamConn(wsConn WSConn, addr net.Addr) *WSStreamConn {
return &WSStreamConn{
wsConn: wsConn,
addr: addr,
func NewConn(conn StreamConn, addr net.Addr) *Conn {
return &Conn{
conn: conn,
addr: addr,
}
}

// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (c *WSStreamConn) Read(b []byte) (n int, err error) {
func (c *Conn) Read(b []byte) (n int, err error) {
if c.buf != nil {
n = copy(b, c.buf)
if n < len(c.buf) {
Expand All @@ -65,7 +65,7 @@ func (c *WSStreamConn) Read(b []byte) (n int, err error) {
defer cancel()
}

wsb, err := c.wsConn.Read(ctx)
wsb, err := c.conn.Read(ctx)
if err != nil {
if strings.Contains(err.Error(), "StatusNormalClosure") ||
strings.Contains(err.Error(), "1000") {
Expand All @@ -84,31 +84,31 @@ func (c *WSStreamConn) Read(b []byte) (n int, err error) {
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (c *WSStreamConn) Write(b []byte) (n int, err error) {
func (c *Conn) Write(b []byte) (n int, err error) {
ctx := context.Background()
if c.writeDeadline != nil {
_ctx, cancel := context.WithDeadline(context.Background(), *c.writeDeadline)
ctx = _ctx
defer cancel()
}
n = len(b)
err = c.wsConn.Write(ctx, b)
err = c.conn.Write(ctx, b)
return
}

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *WSStreamConn) Close() error {
return c.wsConn.Close(1000, "have read")
func (c *Conn) Close() error {
return c.conn.Close(1000, "StatusNormalClosure")
}

// LocalAddr returns the local network address, if known.
func (c *WSStreamConn) LocalAddr() net.Addr {
return c.wsConn.LocalAddr()
func (c *Conn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

// RemoteAddr returns the remote network address, if known.
func (c *WSStreamConn) RemoteAddr() net.Addr {
func (c *Conn) RemoteAddr() net.Addr {
return c.addr
}

Expand All @@ -133,7 +133,7 @@ func (c *WSStreamConn) RemoteAddr() net.Addr {
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c *WSStreamConn) SetDeadline(t time.Time) error {
func (c *Conn) SetDeadline(t time.Time) error {
c.readDeadline = &t
c.writeDeadline = &t
return nil
Expand All @@ -142,7 +142,7 @@ func (c *WSStreamConn) SetDeadline(t time.Time) error {
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *WSStreamConn) SetReadDeadline(t time.Time) error {
func (c *Conn) SetReadDeadline(t time.Time) error {
c.readDeadline = &t
return nil
}
Expand All @@ -152,7 +152,7 @@ func (c *WSStreamConn) SetReadDeadline(t time.Time) error {
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c *WSStreamConn) SetWriteDeadline(t time.Time) error {
func (c *Conn) SetWriteDeadline(t time.Time) error {
c.writeDeadline = &t
return nil
}
Expand Down

0 comments on commit 41e0bc9

Please sign in to comment.