Skip to content

Commit

Permalink
Read IP metadata from GCS (#4)
Browse files Browse the repository at this point in the history
IP metadata can now be loaded from Google Cloud Storage.

* FromURL now requires a context
* Get, New, load, and Reload now require a context
* GCS codepaths are now fully tested
* Track md5 of loaded config
  • Loading branch information
pboothe authored Jan 14, 2020
1 parent e4bad46 commit 1fc8c0a
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 25 deletions.
15 changes: 8 additions & 7 deletions ipannotator/ipannotator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipannotator

import (
"context"
"fmt"
"log"
"net"
Expand All @@ -18,7 +19,7 @@ import (
// ReloadingAnnotator is just a regular annotator with a Reload method.
type ReloadingAnnotator interface {
annotator.Annotator
Reload()
Reload(context.Context)
}

// ipannotator is the central struct for this module.
Expand Down Expand Up @@ -83,8 +84,8 @@ func (ipa *ipannotator) Annotate(ID *inetdiag.SockID, annotations *annotator.Ann
// Reload is intended to be regularly called in a loop. It should check whether
// the data in GCS is newer than the local data, and, if it is, then download
// and load that new data into memory and then replace it in the annotator.
func (ipa *ipannotator) Reload() {
newMM, err := ipa.load()
func (ipa *ipannotator) Reload(ctx context.Context) {
newMM, err := ipa.load(ctx)
if err != nil {
log.Println("Could not reload dataset:", err)
return
Expand All @@ -96,8 +97,8 @@ func (ipa *ipannotator) Reload() {
}

// load unconditionally loads datasets and returns them.
func (ipa *ipannotator) load() (*geolite2v2.GeoDataset, error) {
z, err := ipa.backingDataSource.Get()
func (ipa *ipannotator) load(ctx context.Context) (*geolite2v2.GeoDataset, error) {
z, err := ipa.backingDataSource.Get(ctx)
if err != nil {
return nil, err
}
Expand All @@ -107,13 +108,13 @@ func (ipa *ipannotator) load() (*geolite2v2.GeoDataset, error) {
// New makes a new Annotator that uses IP addresses to generate geolocation and
// ASNumber metadata for that IP based on the current copy of MaxMind data
// stored in GCS.
func New(geo zipfile.Provider, localIPs []net.IP) ReloadingAnnotator {
func New(ctx context.Context, geo zipfile.Provider, localIPs []net.IP) ReloadingAnnotator {
ipa := &ipannotator{
backingDataSource: geo,
localIPs: localIPs,
}
var err error
ipa.maxmind, err = ipa.load()
ipa.maxmind, err = ipa.load(ctx)
rtx.Must(err, "Could not load annotation db")
return ipa
}
20 changes: 11 additions & 9 deletions ipannotator/ipannotator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipannotator

import (
"archive/zip"
"context"
"errors"
"math"
"net"
Expand All @@ -21,15 +22,15 @@ func init() {
var err error
u, err := url.Parse("file:../testdata/GeoLite2City.zip")
rtx.Must(err, "Could not parse URL")
localZipfile, err = zipfile.FromURL(u)
localZipfile, err = zipfile.FromURL(context.Background(), u)
rtx.Must(err, "Could not create zipfile.Provider")
}

func TestIPAnnotationS2C(t *testing.T) {
localaddrs := []net.IP{
net.ParseIP("1.0.0.1"),
}
ipa := New(localZipfile, localaddrs)
ipa := New(context.Background(), localZipfile, localaddrs)

// Try to annotate a S2C connection.
conn := &inetdiag.SockID{
Expand All @@ -55,7 +56,7 @@ func TestIPAnnotationC2S(t *testing.T) {
localaddrs := []net.IP{
net.ParseIP("1.0.0.1"),
}
ipa := New(localZipfile, localaddrs)
ipa := New(context.Background(), localZipfile, localaddrs)

// Try to annotate a C2S connection.
conn := &inetdiag.SockID{
Expand All @@ -81,7 +82,7 @@ func TestIPAnnotationC2S(t *testing.T) {

func TestIPAnnotationUknownDirection(t *testing.T) {
localaddrs := []net.IP{net.ParseIP("1.0.0.1")}
ipa := New(localZipfile, localaddrs)
ipa := New(context.Background(), localZipfile, localaddrs)

// Try to annotate a connection with no local IP.
conn := &inetdiag.SockID{
Expand All @@ -101,7 +102,7 @@ func TestIPAnnotationUknownDirection(t *testing.T) {

func TestIPAnnotationUknownIP(t *testing.T) {
localaddrs := []net.IP{net.ParseIP("1.0.0.1")}
ipa := New(localZipfile, localaddrs)
ipa := New(context.Background(), localZipfile, localaddrs)

// Try to annotate a connection with no local IP.
conn := &inetdiag.SockID{
Expand All @@ -121,26 +122,27 @@ func TestIPAnnotationUknownIP(t *testing.T) {

type badProvider struct{}

func (badProvider) Get() (*zip.Reader, error) {
func (badProvider) Get(_ context.Context) (*zip.Reader, error) {
return nil, errors.New("Error for testing")
}

func TestIPAnnotationLoadErrors(t *testing.T) {
ctx := context.Background()
ipa := ipannotator{
backingDataSource: badProvider{},
localIPs: []net.IP{net.ParseIP("1.0.0.1")},
}
_, err := ipa.load()
_, err := ipa.load(ctx)
if err == nil {
t.Error("Should have had a non-nil error due to missing file")
}

// load errors should not cause Reload to crash.
ipa.Reload() // No crash == success.
ipa.Reload(ctx) // No crash == success.

// Now change the backing source, and the next Reload should load the actual data.
ipa.backingDataSource = localZipfile
ipa.Reload()
ipa.Reload(ctx)

// Annotations should now succeed...
conn := &inetdiag.SockID{
Expand Down
10 changes: 7 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func main() {
rtx.Must(err, "Could not read local addresses")
u, err := url.Parse(*maxmindurl)
rtx.Must(err, "Could not parse URL")
p, err := zipfile.FromURL(u)
p, err := zipfile.FromURL(mainCtx, u)
rtx.Must(err, "Could not get maxmind data from url")
ipa := ipannotator.New(p, localIPs)
ipa := ipannotator.New(mainCtx, p, localIPs)

// Reload the IP annotation config on a randomized schedule.
wg.Add(1)
Expand All @@ -74,7 +74,11 @@ func main() {
Max: *reloadMax,
Expected: *reloadTime,
}
memoryless.Run(mainCtx, ipa.Reload, reloadConfig)
tick, err := memoryless.NewTicker(mainCtx, reloadConfig)
rtx.Must(err, "Could not create ticker for reloading")
for range tick.C {
ipa.Reload(mainCtx)
}
wg.Done()
}()

Expand Down
7 changes: 7 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ var (
Help: "The number of times annotation returned an error",
},
)
GCSFilesLoaded = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "uuid_annotator_gcs_hash_loaded",
Help: "The hash of the loaded GCS file",
},
[]string{"md5"},
)
)
49 changes: 43 additions & 6 deletions zipfile/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package zipfile
import (
"archive/zip"
"bytes"
"context"
"errors"
"io/ioutil"
"net/url"

"cloud.google.com/go/storage"
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
"github.com/m-lab/uuid-annotator/metrics"
)

// Errors that might be returned outside the package.
Expand All @@ -19,24 +24,54 @@ type Provider interface {
// Get returns a zip.Reader pointer based on the latest copy of the data the
// provider refers to. It may be called multiple times, and caching is left
// up to the individual Provider implementation.
Get() (*zip.Reader, error)
Get(ctx context.Context) (*zip.Reader, error)
}

// gcsProvider gets zip files from Google Cloud Storage.
type gcsProvider struct {
bucket, filename string
client stiface.Client
md5 []byte
cachedReader *zip.Reader
}

func (g *gcsProvider) Get() (*zip.Reader, error) {
return nil, errors.New("unimplemented")
func (g *gcsProvider) Get(ctx context.Context) (*zip.Reader, error) {
o := g.client.Bucket(g.bucket).Object(g.filename)
oa, err := o.Attrs(ctx)
if err != nil {
return nil, err
}
if g.cachedReader == nil || g.md5 == nil || !bytes.Equal(g.md5, oa.MD5) {
// Reload data only if the object changed or the data was never loaded in the first place.
r, err := o.NewReader(ctx)
if err != nil {
return nil, err
}
var data []byte
data, err = ioutil.ReadAll(r)
if err != nil {
return nil, err
}
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
return nil, err
}
g.cachedReader = zr
if g.md5 != nil {
metrics.GCSFilesLoaded.WithLabelValues(string(g.md5)).Set(0)
}
g.md5 = oa.MD5
metrics.GCSFilesLoaded.WithLabelValues(string(g.md5)).Set(1)
}
return g.cachedReader, nil
}

// fileProvider gets zipfiles from the local disk.
type fileProvider struct {
filename string
}

func (f *fileProvider) Get() (*zip.Reader, error) {
func (f *fileProvider) Get(ctx context.Context) (*zip.Reader, error) {
b, err := ioutil.ReadFile(f.filename)
if err != nil {
return nil, err
Expand All @@ -54,13 +89,15 @@ func (f *fileProvider) Get() (*zip.Reader, error) {
// should implement an https case in the below handler. M-Lab doesn't need that
// case because we cache MaxMind's data to reduce load on their servers and to
// eliminate a runtime dependency on a third party service.
func FromURL(u *url.URL) (Provider, error) {
func FromURL(ctx context.Context, u *url.URL) (Provider, error) {
switch u.Scheme {
case "gs":
client, err := storage.NewClient(ctx)
return &gcsProvider{
client: stiface.AdaptClient(client),
bucket: u.Host,
filename: u.Path,
}, nil
}, err
case "file":
return &fileProvider{
filename: u.Opaque,
Expand Down
Loading

0 comments on commit 1fc8c0a

Please sign in to comment.