Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCE-M3: Hook to extract VPC Endpoint ID #5

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion diam/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ func readerBufferSlice(buf *bytes.Buffer, l int) []byte {

// ReadMessage reads a binary stream from the reader and uses the given
// dictionary to parse it.
func ReadMessage(reader io.Reader, dictionary *dict.Parser) (*Message, error) {
func ReadMessage(reader io.Reader, dictionary *dict.Parser, hook func(*Message) error) (*Message, error) {
buf := newReaderBuffer()
defer putReaderBuffer(buf)
m := &Message{dictionary: dictionary}

if err := hook(m); err != nil {
return nil, err
}

cmd, stream, err := m.readHeader(reader, buf)
if err != nil {
return nil, err
Expand Down Expand Up @@ -98,6 +103,7 @@ func (m *Message) readHeader(r io.Reader, buf *bytes.Buffer) (cmd *dict.Command,
if err != nil {
return nil, stream, err
}

m.Header, err = DecodeHeader(b)
if err != nil {
return nil, stream, err
Expand Down
32 changes: 22 additions & 10 deletions diam/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,21 @@ func (c *conn) readMessage() (m *Message, err error) {
if c.server.ReadTimeout > 0 {
c.rwc.SetReadDeadline(time.Now().Add(c.server.ReadTimeout))
}

wrappedMethod := func(m *Message) error {
if c.server.ReadMessageHook == nil {
return nil
}

return c.server.ReadMessageHook(c.writer, m)
}

if msc, isMulti := c.rwc.(MultistreamConn); isMulti {
// If it's a multi-stream association - reset the stream to "undefined" prior to reading next message
msc.ResetCurrentStream()
m, err = ReadMessage(msc, c.dictionary()) // MultistreamConn has it's own buffering
m, err = ReadMessage(msc, c.dictionary(), wrappedMethod) // MultistreamConn has it's own buffering
} else {
m, err = ReadMessage(c.buf.Reader, c.dictionary())
m, err = ReadMessage(c.buf.Reader, c.dictionary(), wrappedMethod)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -557,16 +566,19 @@ func Serve(l net.Listener, handler Handler) error {
return srv.Serve(l)
}

type ReadMessageHook = func(Conn, *Message) error

// A Server defines parameters for running a diameter server.
type Server struct {
Network string // network of the address - empty string defaults to tcp
Addr string // address to listen on, ":3868" if empty
Handler Handler // handler to invoke, DefaultServeMux if nil
Dict *dict.Parser // diameter dictionaries for this server
ReadTimeout time.Duration // maximum duration before timing out read of the request
WriteTimeout time.Duration // maximum duration before timing out write of the response
TLSConfig *tls.Config // optional TLS config, used by ListenAndServeTLS
LocalAddr net.Addr // optional Local Address to bind dailer's (Dail...) socket to
Network string // network of the address - empty string defaults to tcp
Addr string // address to listen on, ":3868" if empty
Handler Handler // handler to invoke, DefaultServeMux if nil
Dict *dict.Parser // diameter dictionaries for this server
ReadTimeout time.Duration // maximum duration before timing out read of the request
WriteTimeout time.Duration // maximum duration before timing out write of the response
TLSConfig *tls.Config // optional TLS config, used by ListenAndServeTLS
LocalAddr net.Addr // optional Local Address to bind dailer's (Dail...) socket to
ReadMessageHook ReadMessageHook // optional Called right before ReadMessage method.
}

// serverHandler delegates to either the server's Handler or DefaultServeMux.
Expand Down