Skip to content

Commit

Permalink
fix(conf): disable auto_flush when set to off in config (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
sklarsa authored Aug 12, 2024
1 parent 1532472 commit f7f593f
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 56 deletions.
11 changes: 6 additions & 5 deletions conf_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type configData struct {
}

func confFromStr(conf string) (*lineSenderConfig, error) {
senderConf := &lineSenderConfig{}
var senderConf *lineSenderConfig

data, err := parseConfigStr(conf)
if err != nil {
Expand All @@ -46,14 +46,14 @@ func confFromStr(conf string) (*lineSenderConfig, error) {

switch data.Schema {
case "http":
senderConf.senderType = httpSenderType
senderConf = newLineSenderConfig(httpSenderType)
case "https":
senderConf.senderType = httpSenderType
senderConf = newLineSenderConfig(httpSenderType)
senderConf.tlsMode = tlsEnabled
case "tcp":
senderConf.senderType = tcpSenderType
senderConf = newLineSenderConfig(tcpSenderType)
case "tcps":
senderConf.senderType = tcpSenderType
senderConf = newLineSenderConfig(tcpSenderType)
senderConf.tlsMode = tlsEnabled
default:
return nil, fmt.Errorf("invalid schema: %s", data.Schema)
Expand Down Expand Up @@ -90,6 +90,7 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
case "token_y":
// Some clients require public key.
// But since Go sender doesn't need it, we ignore the values.
continue
case "auto_flush":
if v == "off" {
senderConf.autoFlushRows = 0
Expand Down
10 changes: 9 additions & 1 deletion conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,15 @@ func TestHappyCasesFromConf(t *testing.T) {
actual, err := qdb.ConfFromStr(tc.config)
assert.NoError(t, err)

expected := &qdb.LineSenderConfig{}
var expected *qdb.LineSenderConfig
switch tc.config[0] {
case 'h':
expected = qdb.NewLineSenderConfig(qdb.HttpSenderType)
case 't':
expected = qdb.NewLineSenderConfig(qdb.TcpSenderType)
default:
assert.FailNow(t, "happy case configs must start with either 'http' or 'tcp'")
}
for _, opt := range tc.expectedOpts {
opt(expected)
}
Expand Down
12 changes: 11 additions & 1 deletion export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ type (
ConfigData = configData
TcpLineSender = tcpLineSender
LineSenderConfig = lineSenderConfig
SenderType = senderType
)

var (
GlobalTransport = globalTransport
GlobalTransport = globalTransport
NoSenderType SenderType = noSenderType
HttpSenderType SenderType = httpSenderType
TcpSenderType SenderType = tcpSenderType
DefaultAutoFlushInterval = defaultAutoFlushInterval
DefaultAutoFlushRows = defaultAutoFlushRows
)

func NewBuffer(initBufSize int, maxBufSize int, fileNameLimit int) Buffer {
Expand Down Expand Up @@ -76,3 +82,7 @@ func BufLen(s LineSender) int {
}
panic("unexpected struct")
}

func NewLineSenderConfig(t SenderType) *LineSenderConfig {
return newLineSenderConfig(t)
}
53 changes: 45 additions & 8 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {

assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))

// Sleep past the default interval
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)

// Check that the number of messages hasn't changed
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))

// Send one additional message and ensure that all are flushed
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
Expand All @@ -511,8 +517,6 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {

func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
ctx := context.Background()
autoFlushRows := 10
autoFlushInterval := time.Duration(autoFlushRows-1) * time.Millisecond

srv, err := newTestHttpServer(readAndDiscard)
assert.NoError(t, err)
Expand All @@ -524,21 +528,54 @@ func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
ctx,
qdb.WithHttp(),
qdb.WithAddress(srv.Addr()),
qdb.WithAutoFlushRows(autoFlushRows),
qdb.WithAutoFlushInterval(autoFlushInterval),
qdb.WithAutoFlushRows(qdb.DefaultAutoFlushRows),
qdb.WithAutoFlushInterval(qdb.DefaultAutoFlushInterval),
qdb.WithAutoFlushDisabled(),
)
assert.NoError(t, err)
defer sender.Close(ctx)

// Send autoFlushRows + 1 messages and ensure all are buffered
for i := 0; i < autoFlushRows+1; i++ {
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
time.Sleep(time.Millisecond)
}

assert.Equal(t, autoFlushRows+1, qdb.MsgCount(sender))
// Sleep past the default interval
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)

assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
}

func TestNoFlushWhenAutoFlushRowsAndIntervalAre0(t *testing.T) {
ctx := context.Background()

srv, err := newTestHttpServer(readAndDiscard)
assert.NoError(t, err)
defer srv.Close()

// opts are processed sequentially, so AutoFlushDisabled will
// override AutoFlushRows
sender, err := qdb.NewLineSender(
ctx,
qdb.WithHttp(),
qdb.WithAddress(srv.Addr()),
qdb.WithAutoFlushRows(0),
qdb.WithAutoFlushInterval(0),
)
assert.NoError(t, err)
defer sender.Close(ctx)

// Send autoFlushRows + 1 messages and ensure all are buffered
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
}

// Sleep past the default interval
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)

assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
}

func TestSenderDoubleClose(t *testing.T) {
Expand Down Expand Up @@ -625,7 +662,7 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {

err = sender.Close(ctx)
assert.NoError(t, err)
assert.Empty(t, qdb.Messages(sender))
assert.NotEmpty(t, qdb.Messages(sender))
}

func TestSuccessAfterRetries(t *testing.T) {
Expand Down
90 changes: 49 additions & 41 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,61 @@ func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error) {
// sender corresponds to a single client connection. LineSender should
// not be called concurrently by multiple goroutines.
func NewLineSender(ctx context.Context, opts ...LineSenderOption) (LineSender, error) {
conf := &lineSenderConfig{}
var conf *lineSenderConfig

// Iterate over all options to determine the sender type
// This is used to set defaults based on the type of sender (http vs tcp)
// Worst case performance is 2N for the number of LineSenderOptions
tmp := newLineSenderConfig(noSenderType)
for _, opt := range opts {
opt(tmp)
switch tmp.senderType {
case httpSenderType:
conf = newLineSenderConfig(httpSenderType)
case tcpSenderType:
conf = newLineSenderConfig(tcpSenderType)
}

if conf != nil {
break
}
}

if tmp.senderType == noSenderType {
return nil, errors.New("sender type is not specified: use WithHttp or WithTcp")
}

for _, opt := range opts {
opt(conf)
}
return newLineSender(ctx, conf)
}

func newLineSenderConfig(t senderType) *lineSenderConfig {
switch t {
case tcpSenderType:
return &lineSenderConfig{
senderType: t,
address: defaultTcpAddress,
initBufSize: defaultInitBufferSize,
fileNameLimit: defaultFileNameLimit,
}
default:
return &lineSenderConfig{
senderType: t,
address: defaultHttpAddress,
requestTimeout: defaultRequestTimeout,
retryTimeout: defaultRetryTimeout,
minThroughput: defaultMinThroughput,
autoFlushRows: defaultAutoFlushRows,
autoFlushInterval: defaultAutoFlushInterval,
initBufSize: defaultInitBufferSize,
maxBufSize: defaultMaxBufferSize,
fileNameLimit: defaultFileNameLimit,
}
}
}

func newLineSender(ctx context.Context, conf *lineSenderConfig) (LineSender, error) {
switch conf.senderType {
case tcpSenderType:
Expand Down Expand Up @@ -538,17 +586,6 @@ func sanitizeTcpConf(conf *lineSenderConfig) error {
return errors.New("tcpKeyId is empty and tcpKey is not. both (or none) must be provided")
}

// Set defaults
if conf.address == "" {
conf.address = defaultTcpAddress
}
if conf.initBufSize == 0 {
conf.initBufSize = defaultInitBufferSize
}
if conf.fileNameLimit == 0 {
conf.fileNameLimit = defaultFileNameLimit
}

return nil
}

Expand All @@ -563,35 +600,6 @@ func sanitizeHttpConf(conf *lineSenderConfig) error {
return errors.New("both basic and token authentication cannot be used")
}

// Set defaults
if conf.address == "" {
conf.address = defaultHttpAddress
}
if conf.requestTimeout == 0 {
conf.requestTimeout = defaultRequestTimeout
}
if conf.retryTimeout == 0 {
conf.retryTimeout = defaultRetryTimeout
}
if conf.minThroughput == 0 {
conf.minThroughput = defaultMinThroughput
}
if conf.autoFlushRows == 0 {
conf.autoFlushRows = defaultAutoFlushRows
}
if conf.autoFlushInterval == 0 {
conf.autoFlushInterval = defaultAutoFlushInterval
}
if conf.initBufSize == 0 {
conf.initBufSize = defaultInitBufferSize
}
if conf.maxBufSize == 0 {
conf.maxBufSize = defaultMaxBufferSize
}
if conf.fileNameLimit == 0 {
conf.fileNameLimit = defaultFileNameLimit
}

return nil
}

Expand Down

0 comments on commit f7f593f

Please sign in to comment.