From 8784f3e1b40ea2db02544bff1d356a968a2a286b Mon Sep 17 00:00:00 2001 From: Michiel De Backker Date: Sat, 20 Jun 2020 21:41:21 +0200 Subject: [PATCH 1/2] Added DialAssociation and ListenAssociation These are helpers to easily run SCTP over UDP. Example adapted. I added the Association postfix to keep space for a potential Dial/Listen implementation that directly gives you a Stream. Relates to #74 --- association.go | 13 +++++++ examples/ping-pong/Makefile | 4 +-- examples/ping-pong/conn.go | 72 ------------------------------------- examples/ping-pong/ping.go | 11 ++---- examples/ping-pong/pong.go | 24 ++++++------- go.mod | 1 + go.sum | 2 ++ listener.go | 59 ++++++++++++++++++++++++++++++ 8 files changed, 91 insertions(+), 95 deletions(-) delete mode 100644 examples/ping-pong/conn.go create mode 100644 listener.go diff --git a/association.go b/association.go index b0b5e4f9..2a3dcc17 100644 --- a/association.go +++ b/association.go @@ -212,6 +212,19 @@ func Server(config Config) (*Association, error) { } } +// DialAssociation connects to the given network address and establishes a +// SCTP association on top. The net.Conn in the config is ignored. +func DialAssociation(network string, raddr *net.UDPAddr, config Config) (*Association, error) { + pConn, err := net.DialUDP(network, nil, raddr) + if err != nil { + return nil, err + } + + config.NetConn = pConn + + return Client(config) +} + // Client opens a SCTP stream over a conn func Client(config Config) (*Association, error) { a := createAssociation(config) diff --git a/examples/ping-pong/Makefile b/examples/ping-pong/Makefile index 5ab5a615..09c40676 100644 --- a/examples/ping-pong/Makefile +++ b/examples/ping-pong/Makefile @@ -1,7 +1,7 @@ all: ping pong -ping: ping.go conn.go +ping: ping.go go build -o $@ -pong: pong.go conn.go +pong: pong.go go build -o $@ -tags $@ \ No newline at end of file diff --git a/examples/ping-pong/conn.go b/examples/ping-pong/conn.go deleted file mode 100644 index 4b5d5e88..00000000 --- a/examples/ping-pong/conn.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "net" - "sync" - "time" -) - -// Reference: https://github.com/pion/sctp/blob/master/association_test.go -// Since UDP is connectionless, as a server, it doesn't know how to reply -// simply using the `Write` method. So, to make it work, `disconnectedPacketConn` -// will infer the last packet that it reads as the reply address for `Write` - -type disconnectedPacketConn struct { - mu sync.RWMutex - rAddr net.Addr - pConn net.PacketConn -} - -// Read -func (c *disconnectedPacketConn) Read(p []byte) (int, error) { - i, rAddr, err := c.pConn.ReadFrom(p) - if err != nil { - return 0, err - } - - c.mu.Lock() - c.rAddr = rAddr - c.mu.Unlock() - - return i, err -} - -// Write writes len(p) bytes from p to the DTLS connection -func (c *disconnectedPacketConn) Write(p []byte) (n int, err error) { - return c.pConn.WriteTo(p, c.RemoteAddr()) -} - -// Close closes the conn and releases any Read calls -func (c *disconnectedPacketConn) Close() error { - return c.pConn.Close() -} - -// LocalAddr is a stub -func (c *disconnectedPacketConn) LocalAddr() net.Addr { - if c.pConn != nil { - return c.pConn.LocalAddr() - } - return nil -} - -// RemoteAddr is a stub -func (c *disconnectedPacketConn) RemoteAddr() net.Addr { - c.mu.RLock() - defer c.mu.RUnlock() - return c.rAddr -} - -// SetDeadline is a stub -func (c *disconnectedPacketConn) SetDeadline(t time.Time) error { - return nil -} - -// SetReadDeadline is a stub -func (c *disconnectedPacketConn) SetReadDeadline(t time.Time) error { - return nil -} - -// SetWriteDeadline is a stub -func (c *disconnectedPacketConn) SetWriteDeadline(t time.Time) error { - return nil -} diff --git a/examples/ping-pong/ping.go b/examples/ping-pong/ping.go index aef6bc02..ea09604c 100644 --- a/examples/ping-pong/ping.go +++ b/examples/ping-pong/ping.go @@ -12,18 +12,13 @@ import ( ) func main() { - conn, err := net.Dial("udp", "127.0.0.1:5678") - if err != nil { - log.Fatal(err) - } - defer conn.Close() - fmt.Println("dialed udp ponger") + // Prepare the IP to connect to + addr := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5678} config := sctp.Config{ - NetConn: conn, LoggerFactory: logging.NewDefaultLoggerFactory(), } - a, err := sctp.Client(config) + a, err := sctp.DialAssociation("udp", addr, config) if err != nil { log.Fatal(err) } diff --git a/examples/ping-pong/pong.go b/examples/ping-pong/pong.go index cf63e2c9..6eaaf1e5 100644 --- a/examples/ping-pong/pong.go +++ b/examples/ping-pong/pong.go @@ -13,29 +13,27 @@ import ( ) func main() { - addr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 5678, - } + addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5678} - conn, err := net.ListenUDP("udp", &addr) + config := sctp.Config{ + LoggerFactory: logging.NewDefaultLoggerFactory(), + } + l, err := sctp.ListenAssociation("udp", addr, config) if err != nil { log.Fatal(err) } - defer conn.Close() - fmt.Println("created a udp listener") + defer l.Close() + fmt.Println("created a listener") - config := sctp.Config{ - NetConn: &disconnectedPacketConn{pConn: conn}, - LoggerFactory: logging.NewDefaultLoggerFactory(), - } - a, err := sctp.Server(config) + // Note: You should accept all incoming associations in a loop. + a, err := l.Accept() if err != nil { log.Fatal(err) } defer a.Close() - fmt.Println("created a server") + fmt.Println("accepted an association") + // Note: You should accept all incoming streams in a loop. stream, err := a.AcceptStream() if err != nil { log.Fatal(err) diff --git a/go.mod b/go.mod index 02a0b813..94c2ccd6 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ require ( github.com/kr/pretty v0.1.0 // indirect github.com/pion/logging v0.2.2 github.com/pion/transport v0.10.0 + github.com/pion/udp v0.1.0 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.5.1 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index 94dbde20..368021da 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/transport v0.10.0 h1:9M12BSneJm6ggGhJyWpDveFOstJsTiQjkLf4M44rm80= github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= +github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= +github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/listener.go b/listener.go new file mode 100644 index 00000000..f7d6ed35 --- /dev/null +++ b/listener.go @@ -0,0 +1,59 @@ +package sctp + +import ( + "net" + + "github.com/pion/udp" +) + +// ListenAssociation creates a SCTP association listener +func ListenAssociation(network string, laddr *net.UDPAddr, config Config) (*AssociationListener, error) { + lc := udp.ListenConfig{} + parent, err := lc.Listen(network, laddr) + if err != nil { + return nil, err + } + return &AssociationListener{ + config: config, + parent: parent, + }, nil +} + +// NewAssociationListener creates a SCTP association listener +// which accepts connections from an inner Listener. +// The net.Conn in the config is ignored. +func NewAssociationListener(inner net.Listener, config Config) (*AssociationListener, error) { + return &AssociationListener{ + config: config, + parent: inner, + }, nil +} + +// AssociationListener represents a SCTP association listener +type AssociationListener struct { + config Config + parent net.Listener +} + +// Accept waits for and returns the next association to the listener. +// You have to either close or read on all connection that are created. +func (l *AssociationListener) Accept() (*Association, error) { + c, err := l.parent.Accept() + if err != nil { + return nil, err + } + l.config.NetConn = c + return Server(l.config) +} + +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +// Already Accepted connections are not closed. +func (l *AssociationListener) Close() error { + return l.parent.Close() +} + +// Addr returns the listener's network address. +func (l *AssociationListener) Addr() net.Addr { + return l.parent.Addr() +} From 0dee1d0484b0a1699e011e4a73bd87ecfbad272a Mon Sep 17 00:00:00 2001 From: Michiel De Backker Date: Sun, 21 Jun 2020 18:11:09 +0200 Subject: [PATCH 2/2] Added Dial and Listen Dial & Listen with sane but over-writable defaults. Relates to #74 --- chunk_payload_data.go | 2 + dialer.go | 44 ++++++++++++++ examples/ping-pong/ping.go | 14 +---- examples/ping-pong/pong.go | 21 +------ listener.go | 116 +++++++++++++++++++++++++++++++++++-- stream.go | 27 +++++++++ 6 files changed, 187 insertions(+), 37 deletions(-) create mode 100644 dialer.go diff --git a/chunk_payload_data.go b/chunk_payload_data.go index 3e5e3468..4220c1c5 100644 --- a/chunk_payload_data.go +++ b/chunk_payload_data.go @@ -88,6 +88,8 @@ type PayloadProtocolIdentifier uint32 // PayloadProtocolIdentifier enums // https://www.iana.org/assignments/sctp-parameters/sctp-parameters.xhtml#sctp-parameters-25 const ( + PayloadTypeUnspecified PayloadProtocolIdentifier = 0 + PayloadTypeWebRTCDCEP PayloadProtocolIdentifier = 50 PayloadTypeWebRTCString PayloadProtocolIdentifier = 51 PayloadTypeWebRTCBinary PayloadProtocolIdentifier = 53 diff --git a/dialer.go b/dialer.go new file mode 100644 index 00000000..fb7e7fd2 --- /dev/null +++ b/dialer.go @@ -0,0 +1,44 @@ +package sctp + +import ( + "net" + + "github.com/pion/logging" +) + +// Dial connects to the given network address and establishes a +// SCTP stream on top. For more control use DialAssociation. +func Dial(network string, raddr *net.UDPAddr, streamIdentifier uint16) (*Stream, error) { + return (&Dialer{}).Dial(network, raddr, streamIdentifier) +} + +// A Dialer contains options for connecting to an address. +// +// The zero value for each field is equivalent to dialing without that option. +// Dialing with the zero value of Dialer is therefore equivalent +// to just calling the Dial function. +// +// The net.Conn in the config is ignored. +type Dialer struct { + // PayloadType determines the PayloadProtocolIdentifier used + PayloadType PayloadProtocolIdentifier + + // Config holds common config + Config *Config +} + +// Dial connects to the given network address and establishes a +// SCTP stream on top. The net.Conn in the config is ignored. +func (d *Dialer) Dial(network string, raddr *net.UDPAddr, streamIdentifier uint16) (*Stream, error) { + if d.Config == nil { + d.Config = &Config{ + LoggerFactory: logging.NewDefaultLoggerFactory(), + } + } + a, err := DialAssociation(network, raddr, *d.Config) + if err != nil { + return nil, err + } + + return a.OpenStream(streamIdentifier, d.PayloadType) +} diff --git a/examples/ping-pong/ping.go b/examples/ping-pong/ping.go index ea09604c..72333ceb 100644 --- a/examples/ping-pong/ping.go +++ b/examples/ping-pong/ping.go @@ -7,7 +7,6 @@ import ( "log" "net" - "github.com/pion/logging" "github.com/pion/sctp" ) @@ -15,17 +14,8 @@ func main() { // Prepare the IP to connect to addr := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5678} - config := sctp.Config{ - LoggerFactory: logging.NewDefaultLoggerFactory(), - } - a, err := sctp.DialAssociation("udp", addr, config) - if err != nil { - log.Fatal(err) - } - defer a.Close() - fmt.Println("created a client") - - stream, err := a.OpenStream(0, sctp.PayloadTypeWebRTCString) + // Open SCTP stream + stream, err := sctp.Dial("udp", addr, 0) if err != nil { log.Fatal(err) } diff --git a/examples/ping-pong/pong.go b/examples/ping-pong/pong.go index 6eaaf1e5..84be19d9 100644 --- a/examples/ping-pong/pong.go +++ b/examples/ping-pong/pong.go @@ -6,19 +6,14 @@ import ( "fmt" "log" "net" - "time" - "github.com/pion/logging" "github.com/pion/sctp" ) func main() { addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5678} - config := sctp.Config{ - LoggerFactory: logging.NewDefaultLoggerFactory(), - } - l, err := sctp.ListenAssociation("udp", addr, config) + l, err := sctp.Listen("udp", addr) if err != nil { log.Fatal(err) } @@ -26,23 +21,13 @@ func main() { fmt.Println("created a listener") // Note: You should accept all incoming associations in a loop. - a, err := l.Accept() - if err != nil { - log.Fatal(err) - } - defer a.Close() - fmt.Println("accepted an association") - - // Note: You should accept all incoming streams in a loop. - stream, err := a.AcceptStream() + stream, err := l.Accept() if err != nil { log.Fatal(err) } defer stream.Close() fmt.Println("accepted a stream") - // set unordered = true and 10ms treshold for dropping packets - stream.SetReliabilityParams(true, sctp.ReliabilityTypeTimed, 10) var pongSeqNum int for { buff := make([]byte, 1024) @@ -60,7 +45,5 @@ func main() { log.Fatal(err) } fmt.Println("sent:", pongMsg) - - time.Sleep(time.Second) } } diff --git a/listener.go b/listener.go index f7d6ed35..f9195d8a 100644 --- a/listener.go +++ b/listener.go @@ -3,25 +3,35 @@ package sctp import ( "net" + "github.com/pion/logging" "github.com/pion/udp" + "github.com/pkg/errors" ) -// ListenAssociation creates a SCTP association listener -func ListenAssociation(network string, laddr *net.UDPAddr, config Config) (*AssociationListener, error) { - lc := udp.ListenConfig{} - parent, err := lc.Listen(network, laddr) +// ListenAssociation creates a SCTP association listener. +func ListenAssociation(network string, laddr *net.UDPAddr) (*AssociationListener, error) { + return (&ListenConfig{}).ListenAssociation(network, laddr) +} + +// ListenAssociation creates a SCTP association listener. +func (lc *ListenConfig) ListenAssociation(network string, laddr *net.UDPAddr) (*AssociationListener, error) { + parent, err := (&udp.ListenConfig{}).Listen(network, laddr) if err != nil { return nil, err } + if lc.Config == nil { + lc.Config = &Config{ + LoggerFactory: logging.NewDefaultLoggerFactory(), + } + } return &AssociationListener{ - config: config, + config: *lc.Config, parent: parent, }, nil } // NewAssociationListener creates a SCTP association listener // which accepts connections from an inner Listener. -// The net.Conn in the config is ignored. func NewAssociationListener(inner net.Listener, config Config) (*AssociationListener, error) { return &AssociationListener{ config: config, @@ -29,6 +39,12 @@ func NewAssociationListener(inner net.Listener, config Config) (*AssociationList }, nil } +// ListenConfig stores options for listening to an address. +// The net.Conn in the config is ignored. +type ListenConfig struct { + Config *Config +} + // AssociationListener represents a SCTP association listener type AssociationListener struct { config Config @@ -57,3 +73,91 @@ func (l *AssociationListener) Close() error { func (l *AssociationListener) Addr() net.Addr { return l.parent.Addr() } + +// Listen creates a basic SCTP stream listener. +// For more control check out the AssociationListener. +func Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + return (&ListenConfig{}).Listen(network, laddr) +} + +// Listen creates a basic SCTP stream listener. +// For more control check out the AssociationListener. +func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + parent, err := ListenAssociation(network, laddr) + if err != nil { + return nil, err + } + if lc.Config == nil { + lc.Config = &Config{ + LoggerFactory: logging.NewDefaultLoggerFactory(), + } + } + l := &listener{ + config: *lc.Config, + parent: parent, + acceptCh: make(chan *Stream), + } + + go l.acceptAssociationLoop() + + return l, nil +} + +func (l *listener) acceptAssociationLoop() { + // TODO: Shutdown + for { + // TODO: Cleanup association when closing the last stream and/or listener. + a, err := l.parent.Accept() + if err != nil { + // TODO: Error handling + return + } + + go l.acceptStreamLoop(a) + } +} + +func (l *listener) acceptStreamLoop(a *Association) { + // TODO: Shutdown + for { + s, err := a.AcceptStream() + if err != nil { + // TODO: Error handling + return + } + + l.acceptCh <- s + } +} + +type listener struct { + config Config + parent *AssociationListener + + acceptCh chan *Stream +} + +var _ net.Listener = (*listener)(nil) + +// Accept waits for and returns the next stream to the listener. +// You have to either close or read on all connection that are created. +func (l *listener) Accept() (net.Conn, error) { + s, ok := <-l.acceptCh + + if !ok { + return nil, errors.Errorf("listener closed") + } + return s, nil +} + +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +// Already Accepted connections are not closed. +func (l *listener) Close() error { + return l.parent.Close() +} + +// Addr returns the listener's network address. +func (l *listener) Addr() net.Addr { + return l.parent.Addr() +} diff --git a/stream.go b/stream.go index 17933812..55fdf909 100644 --- a/stream.go +++ b/stream.go @@ -3,7 +3,9 @@ package sctp import ( "io" "math" + "net" "sync" + "time" "github.com/pion/logging" "github.com/pkg/errors" @@ -352,3 +354,28 @@ func (s *Stream) getNumBytesInReassemblyQueue() int { // No lock is required as it reads the size with atomic load function. return s.reassemblyQueue.getNumBytes() } + +// LocalAddr implements net.Conn.LocalAddr +func (s *Stream) LocalAddr() net.Addr { + return s.association.netConn.LocalAddr() +} + +// RemoteAddr implements net.Conn.RemoteAddr +func (s *Stream) RemoteAddr() net.Addr { + return s.association.netConn.RemoteAddr() +} + +// SetDeadline is a stub +func (s *Stream) SetDeadline(t time.Time) error { + return nil +} + +// SetReadDeadline is a stub +func (s *Stream) SetReadDeadline(t time.Time) error { + return nil +} + +// SetWriteDeadline is a stub +func (s *Stream) SetWriteDeadline(t time.Time) error { + return nil +}