Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support request timeouts #7

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 116 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package supervisord

import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"

"github.com/kolo/xmlrpc"
)

var DefaultRequestTimeoutSec = 30

type (
Client struct {
*xmlrpc.Client
rpcUrl string
cl *http.Client
debug bool
requestTimeoutSec int
}
)

Expand All @@ -33,6 +43,15 @@ func init() {

type options struct {
username, password string

debug bool
timeoutSec int
}

func (me *options) setDefaults() {
if me.timeoutSec == 0 {
me.timeoutSec = DefaultRequestTimeoutSec
}
}

// ClientOption is used to customize the client.
Expand All @@ -46,15 +65,35 @@ func WithAuthentication(username, password string) ClientOption {
}
}

func WithRequestTimeout(sec int) ClientOption {
return func(o *options) {
o.timeoutSec = sec
}
}
func WithDebug(debug bool) ClientOption {
return func(o *options) {
o.debug = debug
}
}

func (c *Client) logf(s string, args ...interface{}) {
if c.debug == false {
return
}
fmt.Printf(s+"\n", args...)
}

func (c *Client) stringCall(method string, args ...interface{}) (string, error) {
var str string
c.logf("stringcall method:%s args:%q", method, args)
err := c.Call(method, args, &str)

return str, err
}

func (c *Client) boolCall(method string, args ...interface{}) error {
var result bool
c.logf("boolcall method:%s args:%q", method, args)
err := c.Call(method, args, &result)
if err != nil {
return err
Expand All @@ -67,6 +106,58 @@ func (c *Client) boolCall(method string, args ...interface{}) error {
return nil
}

func (c *Client) Call(method string, args interface{}, reply interface{}) error {

// encode request
largs := args.([]interface{})
buf, err := xmlrpc.EncodeMethodCall(method, largs...)
if err != nil {
return err
}
c.logf("xmlrpc call method:%s timeout:%d args:%q\n", method, c.requestTimeoutSec, buf)

reqTimeout := time.Duration(c.requestTimeoutSec) * time.Second

ctx := context.Background()
ctx2, cancel := context.WithTimeout(ctx, reqTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx2, "POST", c.rpcUrl, bytes.NewBuffer(buf))
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/xml")

c.logf("xmlrpc do request method:%s url:%s", method, c.rpcUrl)
resp, err := c.cl.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()

buf2, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

// decode
c.logf("xmlrpc decode response %q", buf2)
xres := xmlrpc.Response(buf2)

if xres.Err() != nil {
c.logf("ERROR: %s", xres.Err())
return xres.Err()
}

err = xres.Unmarshal(reply)
if err != nil {
return err
}

return nil
}

// Get a new client suitable for communicating with a supervisord.
// url must contain a real url to a supervisord RPC-service.
//
Expand All @@ -77,8 +168,8 @@ func NewClient(url string, opts ...ClientOption) (*Client, error) {
o(opt)
}

var tr http.RoundTripper = http.DefaultTransport

var tr http.RoundTripper
tr = &http.Transport{}
if opt.username != "" && opt.password != "" {
tr = &basicAuthTransport{
username: opt.username,
Expand All @@ -87,12 +178,19 @@ func NewClient(url string, opts ...ClientOption) (*Client, error) {
}
}

rpc, err := xmlrpc.NewClient(url, tr)
if err != nil {
return nil, err
cl := &http.Client{}
cl.Transport = tr

opt.setDefaults()

me := &Client{
cl: cl,
rpcUrl: url,
requestTimeoutSec: opt.timeoutSec,
debug: opt.debug,
}

return &Client{rpc}, nil
return me, nil
}

// NewUnixSocketClient returns a new client which connects to supervisord
Expand Down Expand Up @@ -121,15 +219,21 @@ func NewUnixSocketClient(path string, opts ...ClientOption) (*Client, error) {
rt: tr,
}
}
opt.setDefaults()

// we pass a valid url, as this is later url.Parse()'ed
// also we need to somehow specify "/RPC2"
rpc, err := xmlrpc.NewClient("http://127.0.0.1/RPC2", tr)
if err != nil {
return nil, err
cl := &http.Client{}
cl.Transport = tr

rpcUrl := "http://127.0.0.1:9001/RPC2"
me := &Client{
cl: cl,
rpcUrl: rpcUrl,
requestTimeoutSec: opt.timeoutSec,
debug: opt.debug,
}

return &Client{rpc}, nil
return me, nil

}

Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/sigmonsays/go-supervisord

go 1.18

require github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b

require github.com/divan/gorilla-xmlrpc v0.0.0-20190926132722-f0686da74fda // indirect
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
github.com/divan/gorilla-xmlrpc v0.0.0-20190926132722-f0686da74fda h1:q6BJCx6rxRJv/sLreclgzu4dK4dPF8x48afqcXtRtLQ=
github.com/divan/gorilla-xmlrpc v0.0.0-20190926132722-f0686da74fda/go.mod h1:3Cp6mWQcmK3erqkPrriKEkSpok0LO1uB2M5GxGzifhc=
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b h1:iNjcivnc6lhbvJA3LD622NPrUponluJrBWPIwGG/3Bg=
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=