Skip to content

Commit

Permalink
add write timeout, in case peer stalled
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Dec 17, 2024
1 parent 27e5822 commit b180b17
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 4 deletions.
9 changes: 7 additions & 2 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type Options struct {
ReadMinBufferSizeToProcess int
TcpReadBufferSize int
TcpWriteBufferSize int

TlsConfig *tls.Config
WriteTimeout time.Duration
TlsConfig *tls.Config
}

func (o *Options) SetDefaults() {
Expand Down Expand Up @@ -119,6 +119,10 @@ func (o *Options) SetDefaults() {
o.ReadAccumTimeLimit = 5 * time.Second
}

if o.WriteTimeout == 0 {
o.WriteTimeout = 5 * time.Minute
}

o.Merger = &pebble.Merger{
Name: "CRDT",
Merge: func(key, value []byte) (pebble.ValueMerger, error) {
Expand Down Expand Up @@ -262,6 +266,7 @@ func Open(dirname string, opts Options) (*Chotki, error) {
BufferMinToProcess: cho.opts.ReadMinBufferSizeToProcess,
},
&protocol.TcpBufferSizeOpt{Read: cho.opts.TcpReadBufferSize, Write: cho.opts.TcpWriteBufferSize},
&protocol.NetWriteTimeoutOpt{Timeout: cho.opts.WriteTimeout},
)

if !exists {
Expand Down
10 changes: 10 additions & 0 deletions protocol/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Net struct {
readBufferTcpSize int
writeBufferTcpSize int
readAccumTimeLimit time.Duration
writeTimeout time.Duration
bufferMaxSize int
bufferMinToProcess int
}
Expand All @@ -65,6 +66,14 @@ type NetOpt interface {
Apply(*Net)
}

type NetWriteTimeoutOpt struct {
Timeout time.Duration
}

func (opt *NetWriteTimeoutOpt) Apply(n *Net) {
n.writeTimeout = opt.Timeout
}

type NetTlsConfigOpt struct {
Config *tls.Config
}
Expand Down Expand Up @@ -315,6 +324,7 @@ func (n *Net) keepPeer(name string, conn net.Conn) {
peer := &Peer{
inout: n.onInstall(name),
conn: conn,
writeTimeout: n.writeTimeout,
readAccumtTimeLimit: n.readAccumTimeLimit,
bufferMaxSize: n.bufferMaxSize,
bufferMinToProcess: n.bufferMinToProcess,
Expand Down
4 changes: 2 additions & 2 deletions protocol/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ func TestTCPDepot_Connect(t *testing.T) {
lCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
l := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{lCon}
}, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")})
}, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err := l.Listen(loop)
assert.Nil(t, err)

cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
c := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{cCon}
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")})
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err = c.Connect(loop)
assert.Nil(t, err)
Expand Down
4 changes: 4 additions & 0 deletions protocol/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Peer struct {
readAccumtTimeLimit time.Duration
bufferMaxSize int
bufferMinToProcess int
writeTimeout time.Duration
}

func (p *Peer) getReadTimeLimit() time.Duration {
Expand Down Expand Up @@ -148,6 +149,9 @@ func (p *Peer) keepWrite(ctx context.Context) error {
p.writeBatchSize.Add(float64(batchSize))

b := net.Buffers(recs)
if p.writeTimeout != 0 {
p.conn.SetWriteDeadline(time.Now().Add(p.writeTimeout))
}
for len(b) > 0 && err == nil {
if _, err = b.WriteTo(p.conn); err != nil {
return err
Expand Down

0 comments on commit b180b17

Please sign in to comment.