diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 897585e..f3b7572 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: run: go build -v ./... - name: Go test - run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./... + run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./asset/... ./helper/... ./strategy/... ./trend/... - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 diff --git a/asset/README.md b/asset/README.md index d3ee0d1..bccdff3 100644 --- a/asset/README.md +++ b/asset/README.md @@ -24,6 +24,7 @@ The information provided on this project is strictly for informational purposes ## Index +- [func Sync\(source, target Repository, defaultStartDate time.Time, workers int\) error](<#Sync>) - [type FileSystemRepository](<#FileSystemRepository>) - [func NewFileSystemRepository\(base string\) \*FileSystemRepository](<#NewFileSystemRepository>) - [func \(r \*FileSystemRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#FileSystemRepository.Append>) @@ -31,6 +32,13 @@ The information provided on this project is strictly for informational purposes - [func \(r \*FileSystemRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#FileSystemRepository.Get>) - [func \(r \*FileSystemRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#FileSystemRepository.GetSince>) - [func \(r \*FileSystemRepository\) LastDate\(name string\) \(time.Time, error\)](<#FileSystemRepository.LastDate>) +- [type InMemoryRepository](<#InMemoryRepository>) + - [func NewInMemoryRepository\(\) \*InMemoryRepository](<#NewInMemoryRepository>) + - [func \(r \*InMemoryRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#InMemoryRepository.Append>) + - [func \(r \*InMemoryRepository\) Assets\(\) \(\[\]string, error\)](<#InMemoryRepository.Assets>) + - [func \(r \*InMemoryRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#InMemoryRepository.Get>) + - [func \(r \*InMemoryRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#InMemoryRepository.GetSince>) + - [func \(r \*InMemoryRepository\) LastDate\(name string\) \(time.Time, error\)](<#InMemoryRepository.LastDate>) - [type Repository](<#Repository>) - [type Snapshot](<#Snapshot>) - [type TiingoEndOfDay](<#TiingoEndOfDay>) @@ -45,6 +53,15 @@ The information provided on this project is strictly for informational purposes - [func \(r \*TiingoRepository\) LastDate\(name string\) \(time.Time, error\)](<#TiingoRepository.LastDate>) + +## func [Sync]() + +```go +func Sync(source, target Repository, defaultStartDate time.Time, workers int) error +``` + +Sync synchronizes assets between the source and target repositories using multi\-worker concurrency. + ## type [FileSystemRepository]() @@ -111,6 +128,72 @@ func (r *FileSystemRepository) LastDate(name string) (time.Time, error) LastDate returns the date of the last snapshot for the asset with the given name. + +## type [InMemoryRepository]() + +InMemoryRepository stores and retrieves asset snapshots using an in memory storage. + +```go +type InMemoryRepository struct { + Repository + // contains filtered or unexported fields +} +``` + + +### func [NewInMemoryRepository]() + +```go +func NewInMemoryRepository() *InMemoryRepository +``` + +NewInMemoryRepository initializes an in memory repository. + + +### func \(\*InMemoryRepository\) [Append]() + +```go +func (r *InMemoryRepository) Append(name string, snapshots <-chan *Snapshot) error +``` + +Append adds the given snapshows to the asset with the given name. + + +### func \(\*InMemoryRepository\) [Assets]() + +```go +func (r *InMemoryRepository) Assets() ([]string, error) +``` + +Assets returns the names of all assets in the repository. + + +### func \(\*InMemoryRepository\) [Get]() + +```go +func (r *InMemoryRepository) Get(name string) (<-chan *Snapshot, error) +``` + +Get attempts to return a channel of snapshots for the asset with the given name. + + +### func \(\*InMemoryRepository\) [GetSince]() + +```go +func (r *InMemoryRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) +``` + +GetSince attempts to return a channel of snapshots for the asset with the given name since the given date. + + +### func \(\*InMemoryRepository\) [LastDate]() + +```go +func (r *InMemoryRepository) LastDate(name string) (time.Time, error) +``` + +LastDate returns the date of the last snapshot for the asset with the given name. + ## type [Repository]() @@ -167,7 +250,7 @@ type Snapshot struct { // Volume represents the total trading activity for // the asset during the snapshot period. - Volume float64 + Volume int64 } ``` diff --git a/asset/file_system_repository.go b/asset/file_system_repository.go index a7fd650..4d72fc4 100644 --- a/asset/file_system_repository.go +++ b/asset/file_system_repository.go @@ -60,7 +60,7 @@ func (r *FileSystemRepository) Get(name string) (<-chan *Snapshot, error) { // GetSince attempts to return a channel of snapshots for the asset with the given name since the given date. func (r *FileSystemRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) { - snapshots, err := helper.ReadFromCsvFile[Snapshot](r.getCsvFileName(name), true) + snapshots, err := r.Get(name) if err != nil { return nil, err } diff --git a/asset/in_memory_repository.go b/asset/in_memory_repository.go new file mode 100644 index 0000000..1a8d497 --- /dev/null +++ b/asset/in_memory_repository.go @@ -0,0 +1,92 @@ +// Copyright (c) 2023 Onur Cinar. All Rights Reserved. +// The source code is provided under MIT License. +// https://github.com/cinar/indicator + +package asset + +import ( + "errors" + "time" + + "github.com/cinar/indicator/helper" +) + +// InMemoryRepository stores and retrieves asset snapshots using +// an in memory storage. +type InMemoryRepository struct { + Repository + + // storage is the in memory storage for assets. + storage map[string][]*Snapshot +} + +// NewInMemoryRepository initializes an in memory repository. +func NewInMemoryRepository() *InMemoryRepository { + return &InMemoryRepository{ + storage: make(map[string][]*Snapshot), + } +} + +// Assets returns the names of all assets in the repository. +func (r *InMemoryRepository) Assets() ([]string, error) { + assets := make([]string, 0, len(r.storage)) + for name := range r.storage { + assets = append(assets, name) + } + + return assets, nil +} + +// Get attempts to return a channel of snapshots for the asset with the given name. +func (r *InMemoryRepository) Get(name string) (<-chan *Snapshot, error) { + snapshots, ok := r.storage[name] + if !ok { + return nil, errors.New("not found") + } + + return helper.SliceToChan(snapshots), nil +} + +// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date. +func (r *InMemoryRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) { + snapshots, err := r.Get(name) + if err != nil { + return nil, err + } + + snapshots = helper.Filter(snapshots, func(s *Snapshot) bool { + return s.Date.Equal(date) || s.Date.After(date) + }) + + return snapshots, nil +} + +// LastDate returns the date of the last snapshot for the asset with the given name. +func (r *InMemoryRepository) LastDate(name string) (time.Time, error) { + var last time.Time + + snapshots, err := r.Get(name) + if err != nil { + return last, err + } + + snapshot, ok := <-helper.Last(snapshots, 1) + if !ok { + return last, errors.New("empty asset") + } + + return snapshot.Date, nil +} + +// Append adds the given snapshows to the asset with the given name. +func (r *InMemoryRepository) Append(name string, snapshots <-chan *Snapshot) error { + combined := r.storage[name] + + for snapshot := range snapshots { + combined = append(combined, snapshot) + } + + r.storage[name] = combined + + return nil +} diff --git a/asset/in_memory_repository_test.go b/asset/in_memory_repository_test.go new file mode 100644 index 0000000..edc18ac --- /dev/null +++ b/asset/in_memory_repository_test.go @@ -0,0 +1,161 @@ +// Copyright (c) 2023 Onur Cinar. All Rights Reserved. +// The source code is provided under MIT License. +// https://github.com/cinar/indicator + +package asset_test + +import ( + "testing" + "time" + + "github.com/cinar/indicator/asset" + "github.com/cinar/indicator/helper" +) + +func TestInMemoryRepositoryAssets(t *testing.T) { + repository := asset.NewInMemoryRepository() + + assets, err := repository.Assets() + if err != nil { + t.Fatal(err) + } + + if len(assets) != 0 { + t.Fatal("not empty") + } + + name := "A" + + snapshots := []*asset.Snapshot{ + {Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}, + {Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)}, + } + + err = repository.Append(name, helper.SliceToChan(snapshots)) + if err != nil { + t.Fatal(err) + } + + assets, err = repository.Assets() + if err != nil { + t.Fatal(err) + } + + if len(assets) != 1 { + t.Fatalf("more assets found %v", assets) + } + + if assets[0] != name { + t.Fatalf("actual %v expected %v", assets[0], name) + } + +} + +func TestInMemoryRepositoryGet(t *testing.T) { + repository := asset.NewInMemoryRepository() + + name := "A" + + _, err := repository.Get(name) + if err == nil { + t.Fatal("expected error") + } + + snapshots := []*asset.Snapshot{ + {Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}, + {Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)}, + } + + err = repository.Append(name, helper.SliceToChan(snapshots)) + if err != nil { + t.Fatal(err) + } + + actual, err := repository.Get(name) + if err != nil { + t.Fatal(err) + } + + expected := helper.SliceToChan(snapshots) + + err = helper.CheckEquals(actual, expected) + if err != nil { + t.Fatal(err) + } +} + +func TestInMemoryRepositoryGetSince(t *testing.T) { + repository := asset.NewInMemoryRepository() + + name := "A" + date := time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC) + + _, err := repository.GetSince(name, date) + if err == nil { + t.Fatal("expected error") + } + + snapshots := []*asset.Snapshot{ + {Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}, + {Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)}, + } + + err = repository.Append(name, helper.SliceToChan(snapshots)) + if err != nil { + t.Fatal(err) + } + + actual, err := repository.GetSince(name, date) + if err != nil { + t.Fatal(err) + } + + expected := helper.SliceToChan(snapshots[1:]) + + err = helper.CheckEquals(actual, expected) + if err != nil { + t.Fatal(err) + } +} + +func TestInMemoryRepositoryLastDate(t *testing.T) { + repository := asset.NewInMemoryRepository() + + name := "A" + + _, err := repository.LastDate(name) + if err == nil { + t.Fatal("expected error") + } + + err = repository.Append(name, helper.SliceToChan([]*asset.Snapshot{})) + if err != nil { + t.Fatal(err) + } + + _, err = repository.LastDate(name) + if err == nil { + t.Fatal("expected error") + } + + snapshots := []*asset.Snapshot{ + {Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}, + {Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)}, + } + + err = repository.Append(name, helper.SliceToChan(snapshots)) + if err != nil { + t.Fatal(err) + } + + actual, err := repository.LastDate(name) + if err != nil { + t.Fatal(err) + } + + expected := snapshots[1].Date + + if !expected.Equal(actual) { + t.Fatalf("actual %v expected %v", actual, expected) + } +} diff --git a/asset/snapshot.go b/asset/snapshot.go index 4f90925..9db4143 100644 --- a/asset/snapshot.go +++ b/asset/snapshot.go @@ -32,5 +32,5 @@ type Snapshot struct { // Volume represents the total trading activity for // the asset during the snapshot period. - Volume float64 + Volume int64 } diff --git a/asset/sync.go b/asset/sync.go new file mode 100644 index 0000000..7c6242d --- /dev/null +++ b/asset/sync.go @@ -0,0 +1,68 @@ +// Copyright (c) 2023 Onur Cinar. All Rights Reserved. +// The source code is provided under MIT License. +// https://github.com/cinar/indicator + +package asset + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/cinar/indicator/helper" +) + +// Sync synchronizes assets between the source and target repositories using +// multi-worker concurrency. +func Sync(source, target Repository, defaultStartDate time.Time, workers int) error { + assets, err := target.Assets() + if err != nil { + return err + } + + log.Printf("Will sync %d assets.", len(assets)) + jobs := helper.SliceToChan(assets) + + hasErrors := false + wg := &sync.WaitGroup{} + + for i := 0; i < workers; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for name := range jobs { + lastDate, err := target.LastDate(name) + if err != nil { + lastDate = defaultStartDate + } + + log.Printf("Syncing %s starting %s", name, lastDate.Format("2006-01-02")) + + snapshots, err := source.GetSince(name, lastDate) + if err != nil { + log.Printf("GetSince failed for %s with %v", name, err) + hasErrors = true + continue + } + + err = target.Append(name, snapshots) + if err != nil { + log.Printf("Append failed for %s with %v", name, err) + hasErrors = true + continue + } + } + }() + } + + wg.Wait() + + if hasErrors { + return errors.New("has errors") + } + + return nil +} diff --git a/asset/sync_test.go b/asset/sync_test.go new file mode 100644 index 0000000..8f3c14e --- /dev/null +++ b/asset/sync_test.go @@ -0,0 +1,146 @@ +// Copyright (c) 2023 Onur Cinar. All Rights Reserved. +// The source code is provided under MIT License. +// https://github.com/cinar/indicator + +package asset_test + +import ( + "errors" + "testing" + "time" + + "github.com/cinar/indicator/asset" + "github.com/cinar/indicator/helper" +) + +type MockRepository struct { + asset.Repository + + AssetsFunc func() ([]string, error) + GetFunc func(string) (<-chan *asset.Snapshot, error) + GetSinceFunc func(string, time.Time) (<-chan *asset.Snapshot, error) + LastDateFunc func(string) (time.Time, error) + AppendFunc func(string, <-chan *asset.Snapshot) error +} + +func (r *MockRepository) Assets() ([]string, error) { + return r.AssetsFunc() +} + +func (r *MockRepository) Get(name string) (<-chan *asset.Snapshot, error) { + return r.GetFunc(name) +} + +func (r *MockRepository) GetSince(name string, date time.Time) (<-chan *asset.Snapshot, error) { + return r.GetSinceFunc(name, date) +} + +func (r *MockRepository) LastDate(name string) (time.Time, error) { + return r.LastDateFunc(name) +} + +func (r *MockRepository) Append(name string, snapshots <-chan *asset.Snapshot) error { + return r.AppendFunc(name, snapshots) +} + +func TestSync(t *testing.T) { + name := "A" + snapshots := []*asset.Snapshot{ + {Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}, + {Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)}, + } + + source := asset.NewInMemoryRepository() + target := asset.NewInMemoryRepository() + + err := target.Append(name, helper.SliceToChan([]*asset.Snapshot{})) + if err != nil { + t.Fatal(err) + } + + err = source.Append(name, helper.SliceToChan(snapshots)) + if err != nil { + t.Fatal(err) + } + + err = asset.Sync(source, target, snapshots[0].Date, 1) + if err != nil { + t.Fatal(err) + } + + actual, err := target.Get(name) + if err != nil { + t.Fatal(err) + } + + expected := helper.SliceToChan(snapshots) + + err = helper.CheckEquals(actual, expected) + if err != nil { + t.Fatal(err) + } +} + +func TestSyncMissingOnSource(t *testing.T) { + name := "A" + + source := asset.NewInMemoryRepository() + target := asset.NewInMemoryRepository() + + err := target.Append(name, helper.SliceToChan([]*asset.Snapshot{})) + if err != nil { + t.Fatal(err) + } + + defaultStartDate := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + err = asset.Sync(source, target, defaultStartDate, 1) + if err == nil { + t.Fatal("expected error") + } +} + +func TestSyncFailingTargetAssets(t *testing.T) { + source := asset.NewInMemoryRepository() + target := &MockRepository{ + AssetsFunc: func() ([]string, error) { + return nil, errors.New("assert error") + }, + } + + defaultStartDate := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + err := asset.Sync(source, target, defaultStartDate, 1) + if err == nil { + t.Fatal("expected error") + } +} + +func TestSyncFailingTargetAppend(t *testing.T) { + source := &MockRepository{ + GetSinceFunc: func(s string, t time.Time) (<-chan *asset.Snapshot, error) { + return helper.SliceToChan([]*asset.Snapshot{}), nil + }, + } + + target := &MockRepository{ + AssetsFunc: func() ([]string, error) { + return []string{"A"}, nil + }, + + LastDateFunc: func(s string) (time.Time, error) { + return time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), nil + }, + + AppendFunc: func(s string, c <-chan *asset.Snapshot) error { + return errors.New("append error") + }, + } + + defaultStartDate := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + err := asset.Sync(source, target, defaultStartDate, 1) + if err == nil { + t.Fatal("expected error") + } +} diff --git a/asset/tiingo_repository.go b/asset/tiingo_repository.go index aff9120..bf44552 100644 --- a/asset/tiingo_repository.go +++ b/asset/tiingo_repository.go @@ -87,7 +87,7 @@ func (e *TiingoEndOfDay) ToSnapshot() *Snapshot { High: e.AdjHigh, Low: e.AdjLow, Close: e.AdjClose, - Volume: float64(e.AdjVolume), + Volume: e.AdjVolume, } } diff --git a/cmd/indicator-sync/main.go b/cmd/indicator-sync/main.go new file mode 100644 index 0000000..d6c91be --- /dev/null +++ b/cmd/indicator-sync/main.go @@ -0,0 +1,41 @@ +// Copyright (c) 2023 Onur Cinar. All Rights Reserved. +// The source code is provided under MIT License. +// https://github.com/cinar/indicator + +// main is the indicator sync command line program. +package main + +import ( + "flag" + "log" + "time" + + "github.com/cinar/indicator/asset" +) + +func main() { + var tiingoKey string + var targetBase string + var minusDays int + var workers int + + flag.StringVar(&tiingoKey, "key", "", "tiingo service api key") + flag.StringVar(&targetBase, "target", ".", "target repository base directory") + flag.IntVar(&minusDays, "days", 0, "lookback period in days for the new assets") + flag.IntVar(&workers, "workers", 1, "number of concurrent workers") + flag.Parse() + + if tiingoKey == "" { + log.Fatal("Tiingo API key required") + } + + defaultStartDate := time.Now().AddDate(0, 0, -minusDays) + + source := asset.NewTiingoRepository(tiingoKey) + target := asset.NewFileSystemRepository(targetBase) + + err := asset.Sync(source, target, defaultStartDate, workers) + if err != nil { + log.Fatal(err) + } +} diff --git a/pre-commit.sh b/pre-commit.sh index 324f4e3..fb0f563 100755 --- a/pre-commit.sh +++ b/pre-commit.sh @@ -7,7 +7,7 @@ go fmt ./... go fix ./... go vet ./... -go test -cover ./... +go test -cover ./asset/... ./helper/... ./strategy/... ./trend/... gosec ./... revive -config=revive.toml ./...