Skip to content

Commit

Permalink
new minor version: Added SendEvents function to Statsd interface; Usi…
Browse files Browse the repository at this point in the history
…ng interface in buffered client constructor; Fixed tests
  • Loading branch information
quipo committed Aug 1, 2017
1 parent 6f34e0a commit 20e051c
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 28 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This client library was inspired by the one embedded in the [Bit.ly NSQ](https:/
* Increment - Count occurrences per second/minute of a specific event
* Decrement - Count occurrences per second/minute of a specific event
* Timing - To track a duration event
* PrecisionTiming - To track a duration event
* Gauge - Gauges are a constant data type. They are not subject to averaging, and they don’t change unless you change them. That is, once you set a gauge value, it will be a flat line on the graph until you change it again
* Absolute - Absolute-valued metric (not averaged/aggregated)
* Total - Continously increasing value, e.g. read operations since boot
Expand Down Expand Up @@ -63,6 +64,18 @@ func main() {
The string "%HOST%" in the metric name will automatically be replaced with the hostname of the server the event is sent from.


## Changelog

* `v.1.1.0`:

* Added `SendEvents` function to `Statsd` interface;
* Using interface in buffered client constructor;
* Added/Fixed tests

* `v.1.0.0`: First stable release
* `v.0.0.9`: Added memoization to reduce memory allocations
* `v.0.0.8`: Pre-release

## Author

Lorenzo Alberton
Expand Down
12 changes: 10 additions & 2 deletions bufferedclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type closeRequest struct {
// flushing aggregates to StatsD, useful if the frequency of events is extremely high
// and sampling is not desirable
type StatsdBuffer struct {
statsd *StatsdClient
statsd Statsd
flushInterval time.Duration
eventChannel chan event.Event
events map[string]event.Event
Expand All @@ -27,7 +27,7 @@ type StatsdBuffer struct {
}

// NewStatsdBuffer Factory
func NewStatsdBuffer(interval time.Duration, client *StatsdClient) *StatsdBuffer {
func NewStatsdBuffer(interval time.Duration, client Statsd) *StatsdBuffer {
sb := &StatsdBuffer{
flushInterval: interval,
statsd: client,
Expand Down Expand Up @@ -124,6 +124,14 @@ func (sb *StatsdBuffer) Total(stat string, value int64) error {
return nil
}

// SendEvents - Sends stats from all the event objects.
func (sb *StatsdBuffer) SendEvents(events map[string]event.Event) error {
for _, e := range events {
sb.eventChannel <- e
}
return nil
}

// avoid too many allocations by memoizing the "type|key" pair for an event
// @see https://gobyexample.com/closures
func initMemoisedKeyMap() func(typ string, key string) string {
Expand Down
12 changes: 10 additions & 2 deletions bufferedclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ func TestBufferedTotal(t *testing.T) {
ln, udpAddr := newLocalListenerUDP(t)
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)

prefix := "myproject."

client := NewStatsdClient(udpAddr.String(), prefix)
Expand All @@ -39,7 +42,8 @@ func TestBufferedTotal(t *testing.T) {
hostname, err := os.Hostname()
expected["zz."+hostname] = 1

go doListenUDP(t, ln, ch, 1)
go doListenUDP(t, ln, ch, len(s))
time.Sleep(50 * time.Millisecond)

err = buffered.CreateSocket()
if nil != err {
Expand All @@ -61,7 +65,9 @@ func TestBufferedTotal(t *testing.T) {
batch := <-ch
for _, x := range strings.Split(batch, "\n") {
x = strings.TrimSpace(x)
//fmt.Println(x)
if "" == x {
continue
}
if !strings.HasPrefix(x, prefix) {
t.Errorf("Metric without expected prefix: expected '%s', actual '%s'", prefix, x)
return
Expand All @@ -83,4 +89,6 @@ func TestBufferedTotal(t *testing.T) {
if !reflect.DeepEqual(expected, actual) {
t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual)
}

time.Sleep(2 * time.Second)
}
104 changes: 81 additions & 23 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,22 +47,34 @@ func (mock MockNetConn) SetWriteDeadline(t time.Time) error {
return nil
}

func newLocalListenerUDP(t *testing.T) (*net.UDPConn, *net.UDPAddr) {
addr := fmt.Sprintf(":%d", getFreePort())
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
t.Fatal(err)
// TODO: use this function instead mocking net.Conn
// usage: client, server := GetTestConnection("tcp", t)
// usage: client, server := GetTestConnection("udp", t)
func GetTestConnection(connType string, t *testing.T) (client, server net.Conn) {
ln, err := net.Listen(connType, "127.0.0.1")
if nil != err {
t.Error("TCP errpr:", err)
}
ln, err := net.ListenUDP("udp", udpAddr)
if err != nil {
t.Fatal(err)
go func() {
defer ln.Close()
server, err = ln.Accept()
if nil != err {
t.Error("TCP Accept errpr:", err)
}
}()

client, err = net.Dial(connType, ln.Addr().String())
if nil != err {
t.Error("TCP Dial error:", err)
}
return ln, udpAddr
return client, server
}

func TestTotal(t *testing.T) {
ln, udpAddr := newLocalListenerUDP(t)
defer ln.Close()
t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)

prefix := "myproject."

Expand Down Expand Up @@ -90,7 +103,7 @@ func TestTotal(t *testing.T) {

err = client.CreateSocket()
if nil != err {
t.Fatal(err)
t.Fatal("Create socket error:", err)
}
defer client.Close()

Expand All @@ -103,9 +116,13 @@ func TestTotal(t *testing.T) {
re := regexp.MustCompile(`^(.*)\:(\d+)\|(\w).*$`)

for i := len(s); i > 0; i-- {
x := <-ch
x, open := <-ch
if !open {
t.Logf("CLOSED CHANNEL")
break
}
x = strings.TrimSpace(x)
//fmt.Println(x)
//t.Logf(x)
if !strings.HasPrefix(x, prefix) {
t.Errorf("Metric without expected prefix: expected '%s', actual '%s'", prefix, x)
break
Expand All @@ -116,7 +133,7 @@ func TestTotal(t *testing.T) {
}
v, err := strconv.ParseInt(vv[2], 10, 64)
if err != nil {
t.Error(err)
t.Error("Cannot parse int:", err)
}
actual[vv[1][len(prefix):]] = v
}
Expand All @@ -126,28 +143,52 @@ func TestTotal(t *testing.T) {
}
}

func newLocalListenerUDP(t *testing.T) (*net.UDPConn, *net.UDPAddr) {
addr := fmt.Sprintf(":%d", getFreePort())
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
t.Fatal("UDP error:", err)
}
ln, err := net.ListenUDP("udp", udpAddr)
if err != nil {
t.Fatal("UDP Listen error:", err)
}
t.Logf("Started new local UDP listener @ %s\n", udpAddr)
return ln, udpAddr
}

func doListenUDP(t *testing.T, conn *net.UDPConn, ch chan string, n int) {
var wg sync.WaitGroup
wg.Add(n)

for n > 0 {
// Handle the connection in a new goroutine.
// The loop then returns to accepting, so that
// multiple connections may be served concurrently.
go func(c *net.UDPConn, ch chan string) {
go func(c *net.UDPConn, ch chan string, wg *sync.WaitGroup) {
t.Logf("Reading from UDP socket @ %s\n", conn.LocalAddr().String())
buffer := make([]byte, 1024)
size, err := c.Read(buffer)
// size, address, err := sock.ReadFrom(buffer) <- This starts printing empty and nil values below immediatly
if err != nil {
fmt.Println(string(buffer), size, err)
t.Fatal(err)
t.Logf("Error reading from UDP socket. Buffer: %s, Size: %d, Error: %s\n", string(buffer), size, err)
//t.Fatal(err)
}
t.Logf("Read buffer: \n------------------\n%s\n------------------\n* Size: %d\n", string(buffer), size)
ch <- string(buffer)
}(conn, ch)
wg.Done()
}(conn, ch, &wg)
n--
}
wg.Wait()
t.Logf("Finished listening on UDP socket @ %s\n", conn.LocalAddr().String())
}

func doListenTCP(t *testing.T, conn net.Listener, ch chan string, n int) {
client, err := conn.Accept()
for {
for n > 0 { // read n non-empty lines from TCP socket
t.Logf("doListenTCP iteration")
client, err := conn.Accept()

if err != nil {
t.Fatal(err)
}
Expand All @@ -160,12 +201,14 @@ func doListenTCP(t *testing.T, conn net.Listener, ch chan string, n int) {
}
t.Fatal(err)
}

t.Logf("Read from TCP socket:\n----------\n%s\n----------\n", string(buf))
for _, s := range bytes.Split(buf[:c], []byte{'\n'}) {
if len(s) > 0 {
n--
ch <- string(s)
}
}

}
}

Expand All @@ -182,6 +225,9 @@ func TestTCP(t *testing.T) {
addr, ln := newLocalListenerTCP(t)
defer ln.Close()

t.Log("Starting new TCP listener at", addr)
time.Sleep(50 * time.Millisecond)

prefix := "myproject."
client := NewStatsdClient(addr, prefix)

Expand All @@ -204,8 +250,7 @@ func TestTCP(t *testing.T) {
hostname, err := os.Hostname()
expected["zz."+hostname] = 1

go doListenTCP(t, ln, ch, len(s))

t.Logf("Sending stats to TCP Socket")
err = client.CreateTCPSocket()
if nil != err {
t.Fatal(err)
Expand All @@ -215,14 +260,27 @@ func TestTCP(t *testing.T) {
for k, v := range s {
client.Total(k, v)
}
time.Sleep(60 * time.Millisecond)

go doListenTCP(t, ln, ch, len(s))
time.Sleep(50 * time.Millisecond)

actual := make(map[string]int64)

re := regexp.MustCompile(`^(.*)\:(\d+)\|(\w).*$`)

for i := len(s); i > 0; i-- {
x := <-ch
//t.Logf("ITERATION %d\n", i)
x, open := <-ch
if !open {
//t.Logf("CLOSED _____")
break
}
x = strings.TrimSpace(x)
if "" == x {
//t.Logf("EMPTY STRING *****")
break
}
//fmt.Println(x)
if !strings.HasPrefix(x, prefix) {
t.Errorf("Metric without expected prefix: expected '%s', actual '%s'", prefix, x)
Expand Down
8 changes: 7 additions & 1 deletion interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package statsd

import "time"
import (
"time"

"github.com/quipo/statsd/event"
)

// Statsd is an interface to a StatsD client (buffered/unbuffered)
type Statsd interface {
Expand All @@ -19,4 +23,6 @@ type Statsd interface {
FGauge(stat string, value float64) error
FGaugeDelta(stat string, value float64) error
FAbsolute(stat string, value float64) error

SendEvents(events map[string]event.Event) error
}
7 changes: 7 additions & 0 deletions noopclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package statsd

import (
"time"

"github.com/quipo/statsd/event"
)

// NoopClient implements a "no-op" statsd in case there is no statsd server
Expand Down Expand Up @@ -78,3 +80,8 @@ func (s NoopClient) FGaugeDelta(stat string, value float64) error {
func (s NoopClient) FAbsolute(stat string, value float64) error {
return nil
}

// SendEvents does nothing
func (s NoopClient) SendEvents(events map[string]event.Event) error {
return nil
}

0 comments on commit 20e051c

Please sign in to comment.