Skip to content

Commit

Permalink
Merge pull request #14 from splitio/enh/dead_socket_removal_attempt
Browse files Browse the repository at this point in the history
attempt to remove dead socket on startup
  • Loading branch information
mredolatti authored Nov 10, 2023
2 parents 1472745 + fec5246 commit 1608e5e
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.1.1 (Nov 10, 2023):
- Updated startup logic to remove dead sockets instead of failing immediately if the socket file exists
- Fix memory leak when accepting an incoming connection.

1.1.0 (Sep 19, 2023):
- Add support for Client/GetTreatment(s)WithConfig operations.
- Add support for Manager operations.
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PLATFORM ?=
PLATFORM_STR := $(if $(PLATFORM),--platform=$(PLATFORM),)

VERSION := $(shell cat splitio/version.go | grep 'const Version' | sed 's/const Version = //' | tr -d '"')
COMMIT_SHA := $(shell git rev-parse --short HEAD)
COMMIT_SHA := $(shell bash -c '[ ! -z $${GITHUB_SHA} ] && echo $${GITHUB_SHA:0:7} || git rev-parse --short=7 HEAD')
COMMIT_SHA_FILE := splitio/commitsha.go

GO_FILES := $(shell find . -name "*.go" -not -name "$(COMMIT_SHA_FILE)") go.sum
Expand All @@ -37,6 +37,9 @@ clean:
## build binaries for this platform
build: splitd splitcli sdhelper

## print current commit SHA (from repo metadata if local, from env-var if GHA)
printsha:
@echo $(COMMIT_SHA)


## run all tests
Expand Down
16 changes: 16 additions & 0 deletions cmd/splitcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/splitio/splitd/splitio/conf"
"github.com/splitio/splitd/splitio/link"
"github.com/splitio/splitd/splitio/link/client/types"
"github.com/splitio/splitd/splitio/link/transfer"
"github.com/splitio/splitd/splitio/util"
)

Expand Down Expand Up @@ -38,6 +39,21 @@ func main() {
VerboseWriter: os.Stderr,
})

if args.Method == "ping" {
// no consumer is created (to avoid registering)
c, err := transfer.NewClientConn(logger, &linkOpts.Transfer)
if err != nil {
logger.Error("error connecting to socket: ", err.Error())
os.Exit(2)
}
if err = c.Shutdown(); err != nil {
logger.Error("error closing connection: ", err.Error())
os.Exit(2)
}
logger.Info("socket is accepting connections properly")
os.Exit(0)
}

c, err := link.Consumer(logger, linkOpts)
if err != nil {
logger.Error("error creating client wrapper: ", err)
Expand Down
11 changes: 11 additions & 0 deletions cmd/splitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/splitio/splitd/splitio/link"
"github.com/splitio/splitd/splitio/sdk"
"github.com/splitio/splitd/splitio/util"

"github.com/splitio/splitd/splitio/provisional/profiler"
)

func main() {
Expand Down Expand Up @@ -47,6 +49,15 @@ func main() {
})
defer shutdown.Wait()

if pc := cfg.Debug.Profiling; pc.Enable {
go func() {
p := profiler.New(pc.Host, pc.Port)
if err := p.ListenAndServe(); err != nil {
panic(err.Error())
}
}()
}

// Wait for connection to end (either gracefully of because of an error)
err = <-errc
exitOnErr("shutdown: ", err)
Expand Down
5 changes: 5 additions & 0 deletions infra/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ accum=$(yq '.sdk.apikey = env(SPLITD_APIKEY) | .link.address = env(SPLITD_LINK_A
# logger configs
[ ! -z ${SPLITD_LOG_LEVEL+x} ] && accum=$(echo "${accum}" | yq '.logging.level = env(SPLITD_LOG_LEVEL)')
[ ! -z ${SPLITD_LOG_OUTPUT+x} ] && accum=$(echo "${accum}" | yq '.logging.output = env(SPLITD_LOG_OUTPUT)')

# profiling configs
[ ! -z ${SPLITD_PROFILING_ENABLE+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.enable = env(SPLITD_PROFILING_ENABLE)')
[ ! -z ${SPLITD_PROFILING_HOST+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.host = env(SPLITD_PROFILING_HOST)')
[ ! -z ${SPLITD_PROFILING_PORT+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.port = env(SPLITD_PROFILING_PORT)')
# @}

# Ensure that the socket-file is read-writable by anyone
Expand Down
10 changes: 10 additions & 0 deletions infra/test/test_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ function testAllVars {
export SPLITD_EVENTS_REFRESH_SECS="11"
export SPLITD_EVENTS_QUEUE_SIZE="12"

export SPLITD_PROFILING_ENABLE="true"
export SPLITD_PROFILING_HOST="somehost"
export SPLITD_PROFILING_PORT="1234"


# Exec entrypoint
[ -f "./testcfg" ] && rm ./testcfg
Expand Down Expand Up @@ -97,6 +101,12 @@ function testAllVars {

# ---

assert_eq "true" $(echo "$conf_json" | jq '.Debug.Profiling.Enable') "incorrect profiling status"
assert_eq '"somehost"' $(echo "$conf_json" | jq '.Debug.Profiling.Host') "incorrect profiling host"
assert_eq "1234" $(echo "$conf_json" | jq '.Debug.Profiling.Port') "incorrect profiling port"

# ---

assert_eq '"WARNING"' $(echo "$conf_json" | jq '.Logger.Level') "incorrect log level"
assert_eq '"/dev/stderr"' $(echo "$conf_json" | jq '.Logger.Output') "incorrect log output"

Expand Down
5 changes: 5 additions & 0 deletions splitd.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ link:
serialization: msgpack
bufferSize: 1024
protocol: v1
debug:
profiling:
enable: false
host: localhost
port: 8888

2 changes: 1 addition & 1 deletion splitio/commitsha.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package splitio

const CommitSHA = "29ff22d"
const CommitSHA = "a41bede"
22 changes: 22 additions & 0 deletions splitio/conf/splitd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
Logger Logger `yaml:"logging"`
SDK SDK `yaml:"sdk"`
Link Link `yaml:"link"`
Debug Debug `yaml:"debug"`
}

func (c Config) String() string {
Expand Down Expand Up @@ -58,6 +59,7 @@ func (c *Config) PopulateWithDefaults() {
c.SDK.PopulateWithDefaults()
c.Link.PopulateWithDefaults()
c.Logger.PopulateWithDefaults()
c.Debug.PopulateWithDefaults()
}

type Link struct {
Expand Down Expand Up @@ -271,6 +273,26 @@ func (l *Logger) ToLoggerOptions() (*logging.LoggerOptions, error) {
return opts, nil
}

type Debug struct {
Profiling Profiling `yaml:"profiling"`
}

func (d *Debug) PopulateWithDefaults() {
d.Profiling.PopulateWithDefaults()
}

type Profiling struct {
Enable bool `yaml:"enable"`
Host string `yaml:"host"`
Port int `yaml:"port"`
}

func (p *Profiling) PopulateWithDefaults() {
p.Enable = false
p.Host = "localhost"
p.Port = 8888
}

func ReadConfig() (*Config, error) {
cfgFN := defaultConfigFN
if fromEnv := os.Getenv("SPLITD_CONF_FILE"); fromEnv != "" {
Expand Down
14 changes: 11 additions & 3 deletions splitio/link/service/v1/clientmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ func (m *ClientManager) handleClientInteractions() error {
rpc, err := m.fetchRPC()
if err != nil {
if errors.Is(err, io.EOF) { // connection ended, no error
m.logger.Debug(fmt.Sprintf("connection remotely closed for metadata=%+v", m.clientConfig.Metadata))
m.logger.Debug(fmt.Sprintf("connection remotely closed for metadata=%s", formatClientConfig(m.clientConfig)))
return nil
} else if errors.Is(err, os.ErrDeadlineExceeded) { // we waited for an RPC, got none, try again.
m.logger.Debug(fmt.Sprintf("read timeout/no RPC fetched. restarting loop for metadata=%+v", m.clientConfig))
m.logger.Debug(fmt.Sprintf("read timeout/no RPC fetched. restarting loop for metadata=%s", formatClientConfig(m.clientConfig)))
continue
} else {
m.logger.Error(fmt.Sprintf("unexpected error reading RPC: %s. Closing conn for metadata=%+v", err, m.clientConfig))
m.logger.Error(fmt.Sprintf("unexpected error reading RPC: %s. Closing conn for metadata=%s", err, formatClientConfig(m.clientConfig)))
return err
}
}
Expand Down Expand Up @@ -313,3 +313,11 @@ func (m *ClientManager) handleSplits(rpc *protov1.RPC) (interface{}, error) {

return response, nil
}

func formatClientConfig(c *types.ClientConfig) string {
if c == nil {
return "<nil>"
}

return fmt.Sprintf("%+v", c)
}
10 changes: 7 additions & 3 deletions splitio/link/transfer/acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ func (a *Acceptor) Start(onClientAttachedCallback OnClientAttachedCallback) (<-c
return
}

ctx, cancel := context.WithTimeout(context.Background(), a.maxWait)
defer cancel()
err = a.sem.Acquire(ctx, 1)
// try to acquire a semaphore slot (throughput limiting):
// to avoid leaks, the lifetime of the context/deadline is scoped to a func containing a defer statement
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), a.maxWait)
defer cancel()
return a.sem.Acquire(ctx, 1)
}()
if err != nil {
a.logger.Error(fmt.Sprintf("Incoming connection request timed out. If the current parallelism is expected, "+
"consider increasing `maxConcurrentConnections` (current=%d)", a.maxConns))
Expand Down
1 change: 0 additions & 1 deletion splitio/link/transfer/acceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,4 @@ func TestNewAcceptorInstantiation(t *testing.T) {
assert.Equal(t, opts.Address, acc.address.(*net.UnixAddr).Name)
assert.Equal(t, "unix", acc.address.(*net.UnixAddr).Network())
assert.Nil(t, acc.Shutdown())

}
41 changes: 40 additions & 1 deletion splitio/link/transfer/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"net"
"os"
"syscall"
"time"

"github.com/splitio/go-toolkit/v5/logging"
Expand All @@ -29,7 +31,8 @@ const (
)

var (
ErrInvalidConnType = errors.New("invalid conn type")
ErrInvalidConnType = errors.New("invalid conn type")
ErrServiceAddressInUse = errors.New("provided socket file / address is already in use")
)

func NewAcceptor(logger logging.LoggerInterface, o *Options, listenerConfig *AcceptorConfig) (*Acceptor, error) {
Expand All @@ -46,6 +49,10 @@ func NewAcceptor(logger logging.LoggerInterface, o *Options, listenerConfig *Acc
return nil, ErrInvalidConnType
}

if err := ensureAddressUsable(logger, address); err != nil {
return nil, err
}

cf := func(c net.Conn) RawConn { return newConnWrapper(c, ff, o) }
return newAcceptor(address, cf, logger, listenerConfig), nil
}
Expand Down Expand Up @@ -93,3 +100,35 @@ func DefaultOpts() Options {
// helpers

func lpFramerFromConn(c net.Conn) framing.Interface { return framing.NewLengthPrefix(c) }

func ensureAddressUsable(logger logging.LoggerInterface, address net.Addr) error {
switch address.Network() {
case "unix", "unixpacket":
if _, err := os.Stat(address.String()); errors.Is(err, os.ErrNotExist) {
return nil // file doesn't exist, we're ok
}

logger.Warning("The socket file exists. Testing if it's currently accepting connections")
c, err := net.Dial(address.Network(), address.String())
if err == nil {
c.Close()
return ErrServiceAddressInUse
}

logger.Warning("The socket appears to be from a previous (dead) execution. Will try to remove it")

if !errors.Is(err, syscall.ECONNREFUSED) {
return fmt.Errorf("unknown error when testing for a dead socket: %w", err)
}

// the socket seems to be bound to a dead process, will try removing it
// so that a listener can be created
if err := os.Remove(address.String()); err != nil {
return fmt.Errorf("error removing dead-socket file from a previous execution: %w", err)
}

logger.Warning("Dead socket file removed successfuly")

}
return nil
}
54 changes: 53 additions & 1 deletion splitio/link/transfer/setup_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,64 @@
package transfer

import (
"github.com/stretchr/testify/assert"
"net"
"os"
"path/filepath"
"testing"
"time"

"github.com/splitio/go-toolkit/v5/logging"
"github.com/stretchr/testify/assert"
)

func TestConnType(t *testing.T) {
assert.Equal(t, "unix-seqpacket", ConnTypeUnixSeqPacket.String())
assert.Equal(t, "unix-stream", ConnTypeUnixStream.String())
assert.Equal(t, "invalid-socket-type", ConnType(123).String())
}

func TestEnsureAddressIsUsable(t *testing.T) {

logger := logging.NewLogger(nil)
assert.Nil(t, ensureAddressUsable(logger, &net.UDPAddr{}))
assert.Nil(t, ensureAddressUsable(logger, &net.TCPAddr{}))
assert.Nil(t, ensureAddressUsable(logger, &net.UnixAddr{Name: "/some/nonexistent/file"}))

// test unknown error (in this case trying to connect to a different socket type)
ready := make(chan struct{})
path := filepath.Join(os.TempDir(), "splitd_test_ensure_address_usable.sock")
os.Remove(path) // por las dudas
go func() {
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: path, Net: "unix"})
assert.Nil(t, err)
defer l.Close()

l.SetDeadline(time.Now().Add(1 * time.Second))
go func() {
time.Sleep(100 * time.Millisecond)
ready <- struct{}{}
}()
l.Accept()
}()
<-ready
assert.ErrorContains(t, ensureAddressUsable(logger, &net.UnixAddr{Name: path, Net: "unixpacket"}), "unknown error when testing for a dead socket")

// test socket in use error
ready = make(chan struct{})
path = filepath.Join(os.TempDir(), "splitd_test_ensure_address_usable2.sock")
os.Remove(path) // por las dudas
go func() {
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: path, Net: "unix"})
assert.Nil(t, err)
defer l.Close()

l.SetDeadline(time.Now().Add(1 * time.Second))
go func() {
time.Sleep(100 * time.Millisecond)
ready <- struct{}{}
}()
l.Accept()
}()
<-ready
assert.ErrorIs(t, ErrServiceAddressInUse, ensureAddressUsable(logger, &net.UnixAddr{Name: path, Net: "unix"}))
}
36 changes: 36 additions & 0 deletions splitio/provisional/profiler/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package profiler

import (
"fmt"
"net/http"
"net/http/pprof"
)

func init() {
http.DefaultServeMux = http.NewServeMux()
}

type HTTPProfileInterface struct {
server http.Server
//server http.ServeMux
}

func New(host string, port int) *HTTPProfileInterface {
mux := http.NewServeMux()

mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
return &HTTPProfileInterface{
http.Server{
Addr: fmt.Sprintf("%s:%d", host, port),
Handler: mux,
},
}
}

func (h *HTTPProfileInterface) ListenAndServe() error {
return h.server.ListenAndServe()
}
Loading

0 comments on commit 1608e5e

Please sign in to comment.