Skip to content

Commit

Permalink
Sync added.
Browse files Browse the repository at this point in the history
  • Loading branch information
cinar committed Dec 22, 2023
1 parent 89e5d52 commit 39ec5d3
Show file tree
Hide file tree
Showing 11 changed files with 597 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
run: go build -v ./...

- name: Go test
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./asset/... ./helper/... ./strategy/... ./trend/...

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
Expand Down
85 changes: 84 additions & 1 deletion asset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ The information provided on this project is strictly for informational purposes

## Index

- [func Sync\(source, target Repository, defaultStartDate time.Time, workers int\) error](<#Sync>)
- [type FileSystemRepository](<#FileSystemRepository>)
- [func NewFileSystemRepository\(base string\) \*FileSystemRepository](<#NewFileSystemRepository>)
- [func \(r \*FileSystemRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#FileSystemRepository.Append>)
- [func \(r \*FileSystemRepository\) Assets\(\) \(\[\]string, error\)](<#FileSystemRepository.Assets>)
- [func \(r \*FileSystemRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#FileSystemRepository.Get>)
- [func \(r \*FileSystemRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#FileSystemRepository.GetSince>)
- [func \(r \*FileSystemRepository\) LastDate\(name string\) \(time.Time, error\)](<#FileSystemRepository.LastDate>)
- [type InMemoryRepository](<#InMemoryRepository>)
- [func NewInMemoryRepository\(\) \*InMemoryRepository](<#NewInMemoryRepository>)
- [func \(r \*InMemoryRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#InMemoryRepository.Append>)
- [func \(r \*InMemoryRepository\) Assets\(\) \(\[\]string, error\)](<#InMemoryRepository.Assets>)
- [func \(r \*InMemoryRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#InMemoryRepository.Get>)
- [func \(r \*InMemoryRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#InMemoryRepository.GetSince>)
- [func \(r \*InMemoryRepository\) LastDate\(name string\) \(time.Time, error\)](<#InMemoryRepository.LastDate>)
- [type Repository](<#Repository>)
- [type Snapshot](<#Snapshot>)
- [type TiingoEndOfDay](<#TiingoEndOfDay>)
Expand All @@ -45,6 +53,15 @@ The information provided on this project is strictly for informational purposes
- [func \(r \*TiingoRepository\) LastDate\(name string\) \(time.Time, error\)](<#TiingoRepository.LastDate>)


<a name="Sync"></a>
## func [Sync](<https://github.com/cinar/indicator/blob/v2/asset/sync.go#L18>)

```go
func Sync(source, target Repository, defaultStartDate time.Time, workers int) error
```

Sync synchronizes assets between the source and target repositories using multi\-worker concurrency.

<a name="FileSystemRepository"></a>
## type [FileSystemRepository](<https://github.com/cinar/indicator/blob/v2/asset/file_system_repository.go#L20-L25>)

Expand Down Expand Up @@ -111,6 +128,72 @@ func (r *FileSystemRepository) LastDate(name string) (time.Time, error)

LastDate returns the date of the last snapshot for the asset with the given name.

<a name="InMemoryRepository"></a>
## type [InMemoryRepository](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L16-L21>)

InMemoryRepository stores and retrieves asset snapshots using an in memory storage.

```go
type InMemoryRepository struct {
Repository
// contains filtered or unexported fields
}
```

<a name="NewInMemoryRepository"></a>
### func [NewInMemoryRepository](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L24>)

```go
func NewInMemoryRepository() *InMemoryRepository
```

NewInMemoryRepository initializes an in memory repository.

<a name="InMemoryRepository.Append"></a>
### func \(\*InMemoryRepository\) [Append](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L82>)

```go
func (r *InMemoryRepository) Append(name string, snapshots <-chan *Snapshot) error
```

Append adds the given snapshows to the asset with the given name.

<a name="InMemoryRepository.Assets"></a>
### func \(\*InMemoryRepository\) [Assets](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L31>)

```go
func (r *InMemoryRepository) Assets() ([]string, error)
```

Assets returns the names of all assets in the repository.

<a name="InMemoryRepository.Get"></a>
### func \(\*InMemoryRepository\) [Get](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L41>)

```go
func (r *InMemoryRepository) Get(name string) (<-chan *Snapshot, error)
```

Get attempts to return a channel of snapshots for the asset with the given name.

<a name="InMemoryRepository.GetSince"></a>
### func \(\*InMemoryRepository\) [GetSince](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L51>)

```go
func (r *InMemoryRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error)
```

GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.

<a name="InMemoryRepository.LastDate"></a>
### func \(\*InMemoryRepository\) [LastDate](<https://github.com/cinar/indicator/blob/v2/asset/in_memory_repository.go#L65>)

```go
func (r *InMemoryRepository) LastDate(name string) (time.Time, error)
```

LastDate returns the date of the last snapshot for the asset with the given name.

<a name="Repository"></a>
## type [Repository](<https://github.com/cinar/indicator/blob/v2/asset/repository.go#L11-L30>)

Expand Down Expand Up @@ -167,7 +250,7 @@ type Snapshot struct {

// Volume represents the total trading activity for
// the asset during the snapshot period.
Volume float64
Volume int64
}
```

Expand Down
2 changes: 1 addition & 1 deletion asset/file_system_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *FileSystemRepository) Get(name string) (<-chan *Snapshot, error) {

// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.
func (r *FileSystemRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) {
snapshots, err := helper.ReadFromCsvFile[Snapshot](r.getCsvFileName(name), true)
snapshots, err := r.Get(name)
if err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions asset/in_memory_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2023 Onur Cinar. All Rights Reserved.
// The source code is provided under MIT License.
// https://github.com/cinar/indicator

package asset

import (
"errors"
"time"

"github.com/cinar/indicator/helper"
)

// InMemoryRepository stores and retrieves asset snapshots using
// an in memory storage.
type InMemoryRepository struct {
Repository

// storage is the in memory storage for assets.
storage map[string][]*Snapshot
}

// NewInMemoryRepository initializes an in memory repository.
func NewInMemoryRepository() *InMemoryRepository {
return &InMemoryRepository{
storage: make(map[string][]*Snapshot),
}
}

// Assets returns the names of all assets in the repository.
func (r *InMemoryRepository) Assets() ([]string, error) {
assets := make([]string, 0, len(r.storage))
for name := range r.storage {
assets = append(assets, name)
}

return assets, nil
}

// Get attempts to return a channel of snapshots for the asset with the given name.
func (r *InMemoryRepository) Get(name string) (<-chan *Snapshot, error) {
snapshots, ok := r.storage[name]
if !ok {
return nil, errors.New("not found")
}

return helper.SliceToChan(snapshots), nil
}

// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.
func (r *InMemoryRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) {
snapshots, err := r.Get(name)
if err != nil {
return nil, err
}

snapshots = helper.Filter(snapshots, func(s *Snapshot) bool {
return s.Date.Equal(date) || s.Date.After(date)
})

return snapshots, nil
}

// LastDate returns the date of the last snapshot for the asset with the given name.
func (r *InMemoryRepository) LastDate(name string) (time.Time, error) {
var last time.Time

snapshots, err := r.Get(name)
if err != nil {
return last, err
}

snapshot, ok := <-helper.Last(snapshots, 1)
if !ok {
return last, errors.New("empty asset")
}

return snapshot.Date, nil
}

// Append adds the given snapshows to the asset with the given name.
func (r *InMemoryRepository) Append(name string, snapshots <-chan *Snapshot) error {
combined := r.storage[name]

for snapshot := range snapshots {
combined = append(combined, snapshot)
}

r.storage[name] = combined

return nil
}
161 changes: 161 additions & 0 deletions asset/in_memory_repository_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2023 Onur Cinar. All Rights Reserved.
// The source code is provided under MIT License.
// https://github.com/cinar/indicator

package asset_test

import (
"testing"
"time"

"github.com/cinar/indicator/asset"
"github.com/cinar/indicator/helper"
)

func TestInMemoryRepositoryAssets(t *testing.T) {
repository := asset.NewInMemoryRepository()

assets, err := repository.Assets()
if err != nil {
t.Fatal(err)
}

if len(assets) != 0 {
t.Fatal("not empty")
}

name := "A"

snapshots := []*asset.Snapshot{
{Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)},
{Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)},
}

err = repository.Append(name, helper.SliceToChan(snapshots))
if err != nil {
t.Fatal(err)
}

assets, err = repository.Assets()
if err != nil {
t.Fatal(err)
}

if len(assets) != 1 {
t.Fatalf("more assets found %v", assets)
}

if assets[0] != name {
t.Fatalf("actual %v expected %v", assets[0], name)
}

}

func TestInMemoryRepositoryGet(t *testing.T) {
repository := asset.NewInMemoryRepository()

name := "A"

_, err := repository.Get(name)
if err == nil {
t.Fatal("expected error")
}

snapshots := []*asset.Snapshot{
{Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)},
{Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)},
}

err = repository.Append(name, helper.SliceToChan(snapshots))
if err != nil {
t.Fatal(err)
}

actual, err := repository.Get(name)
if err != nil {
t.Fatal(err)
}

expected := helper.SliceToChan(snapshots)

err = helper.CheckEquals(actual, expected)
if err != nil {
t.Fatal(err)
}
}

func TestInMemoryRepositoryGetSince(t *testing.T) {
repository := asset.NewInMemoryRepository()

name := "A"
date := time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)

_, err := repository.GetSince(name, date)
if err == nil {
t.Fatal("expected error")
}

snapshots := []*asset.Snapshot{
{Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)},
{Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)},
}

err = repository.Append(name, helper.SliceToChan(snapshots))
if err != nil {
t.Fatal(err)
}

actual, err := repository.GetSince(name, date)
if err != nil {
t.Fatal(err)
}

expected := helper.SliceToChan(snapshots[1:])

err = helper.CheckEquals(actual, expected)
if err != nil {
t.Fatal(err)
}
}

func TestInMemoryRepositoryLastDate(t *testing.T) {
repository := asset.NewInMemoryRepository()

name := "A"

_, err := repository.LastDate(name)
if err == nil {
t.Fatal("expected error")
}

err = repository.Append(name, helper.SliceToChan([]*asset.Snapshot{}))
if err != nil {
t.Fatal(err)
}

_, err = repository.LastDate(name)
if err == nil {
t.Fatal("expected error")
}

snapshots := []*asset.Snapshot{
{Date: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)},
{Date: time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)},
}

err = repository.Append(name, helper.SliceToChan(snapshots))
if err != nil {
t.Fatal(err)
}

actual, err := repository.LastDate(name)
if err != nil {
t.Fatal(err)
}

expected := snapshots[1].Date

if !expected.Equal(actual) {
t.Fatalf("actual %v expected %v", actual, expected)
}
}
2 changes: 1 addition & 1 deletion asset/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ type Snapshot struct {

// Volume represents the total trading activity for
// the asset during the snapshot period.
Volume float64
Volume int64
}
Loading

0 comments on commit 39ec5d3

Please sign in to comment.