Skip to content

Commit

Permalink
Merge pull request #31 from k-capehart/virtual-file-system
Browse files Browse the repository at this point in the history
add file system abstraction to enable more testing
  • Loading branch information
k-capehart authored May 9, 2024
2 parents 30f11e1 + a208a82 commit b03426b
Show file tree
Hide file tree
Showing 7 changed files with 711 additions and 29 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ Anyone is welcome to contribute.

- Open an issue or discussion post to track the effort
- Fork this repository, then clone it
- Replace dependency by pointing to your locally cloned `go-salesforce` in your module's `go.mod`
- Place this in your own module's `go.mod` to enable testing local changes
- `replace github.com/k-capehart/go-salesforce => /path_to_local_fork/`
- Run tests
- `go test -cover`
Expand Down
14 changes: 8 additions & 6 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"

"github.com/spf13/afero"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -59,6 +59,8 @@ const (
queryJobType = "query"
)

var appFs = afero.NewOsFs() // afero.Fs type is a wrapper around os functions, allowing us to mock it in tests

func updateJobState(job bulkJob, state string, auth authentication) error {
job.State = state
body, _ := json.Marshal(job)
Expand Down Expand Up @@ -311,7 +313,7 @@ func mapsToCSVSlices(maps []map[string]string) ([][]string, error) {
}

func readCSVFile(filePath string) ([][]string, error) {
file, fileErr := os.Open(filePath)
file, fileErr := appFs.Open(filePath)
if fileErr != nil {
return nil, fileErr
}
Expand All @@ -327,7 +329,7 @@ func readCSVFile(filePath string) ([][]string, error) {
}

func writeCSVFile(filePath string, data [][]string) error {
file, fileErr := os.Create(filePath)
file, fileErr := appFs.Create(filePath)
if fileErr != nil {
return fileErr
}
Expand Down Expand Up @@ -408,7 +410,7 @@ func doBulkJob(auth authentication, sObjectName string, fieldName string, operat
if waitForResults {
c := make(chan error, len(jobIds))
for _, id := range jobIds {
go waitForJobResultsAsync(auth, id, ingestJobType, time.Second, c)
go waitForJobResultsAsync(auth, id, ingestJobType, (time.Second / 2), c)
}
jobErrors = <-c
}
Expand Down Expand Up @@ -467,7 +469,7 @@ func doBulkJobWithFile(auth authentication, sObjectName string, fieldName string
if waitForResults {
c := make(chan error, len(jobIds))
for _, id := range jobIds {
go waitForJobResultsAsync(auth, id, ingestJobType, time.Second, c)
go waitForJobResultsAsync(auth, id, ingestJobType, (time.Second / 2), c)
}
jobErrors = <-c
}
Expand All @@ -494,7 +496,7 @@ func doQueryBulk(auth authentication, filePath string, query string) error {
return newErr
}

pollErr := waitForJobResults(auth, job.Id, queryJobType, time.Second)
pollErr := waitForJobResults(auth, job.Id, queryJobType, (time.Second / 2))
if pollErr != nil {
return pollErr
}
Expand Down
156 changes: 156 additions & 0 deletions bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"testing"
"time"

"github.com/spf13/afero"
)

func Test_createBulkJob(t *testing.T) {
Expand Down Expand Up @@ -706,3 +708,157 @@ func Test_collectQueryResults(t *testing.T) {
})
}
}

func Test_uploadJobData(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.RequestURI[len(r.RequestURI)-8:] == "/batches" {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
}))
sfAuth := authentication{
InstanceUrl: server.URL,
AccessToken: "accesstokenvalue",
}
defer server.Close()

badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.RequestURI[len(r.RequestURI)-8:] == "/batches" {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusOK)
}
}))
badAuth := authentication{
InstanceUrl: badServer.URL,
AccessToken: "accesstokenvalue",
}
defer badServer.Close()

type args struct {
auth authentication
data string
bulkJob bulkJob
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "update job state success",
args: args{
auth: sfAuth,
data: "data",
bulkJob: bulkJob{},
},
wantErr: false,
},
{
name: "update job state fail",
args: args{
auth: badAuth,
data: "data",
bulkJob: bulkJob{},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := uploadJobData(tt.args.auth, tt.args.data, tt.args.bulkJob); (err != nil) != tt.wantErr {
t.Errorf("uploadJobData() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_readCSVFile(t *testing.T) {
appFs = afero.NewMemMapFs() // replace appFs with mocked file system
if err := appFs.MkdirAll("data", 0755); err != nil {
t.Fatalf("error creating directory in virtual file system")
}
if err := afero.WriteFile(appFs, "data/data.csv", []byte("123"), 0644); err != nil {
t.Fatalf("error creating file in virtual file system")
}

type args struct {
filePath string
}
tests := []struct {
name string
args args
want [][]string
wantErr bool
}{
{
name: "read file successfully",
args: args{
filePath: "data/data.csv",
},
want: [][]string{{"123"}},
wantErr: false,
},
{
name: "read file failure",
args: args{
filePath: "data/does_not_exist.csv",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := readCSVFile(tt.args.filePath)
if (err != nil) != tt.wantErr {
t.Errorf("readCSVFile() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("readCSVFile() = %v, want %v", got, tt.want)
}
})
}
}

func Test_writeCSVFile(t *testing.T) {
appFs = afero.NewMemMapFs() // replace appFs with mocked file system

type args struct {
filePath string
data [][]string
}
tests := []struct {
name string
args args
want [][]string
wantErr bool
}{
{
name: "write file successfully",
args: args{
filePath: "data/export.csv",
data: [][]string{{"123"}},
},
want: [][]string{{"123"}},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := writeCSVFile(tt.args.filePath, tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("writeCSVFile() error = %v, wantErr %v", err, tt.wantErr)
return
}
got, err := readCSVFile("data/export.csv")
if err != nil {
t.Errorf(err.Error())
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("writeCSVFile() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ require github.com/mitchellh/mapstructure v1.5.0

require github.com/forcedotcom/go-soql v0.0.0-20220705175410-00f698360bee

require k8s.io/apimachinery v0.29.4
require (
github.com/spf13/afero v1.11.0
k8s.io/apimachinery v0.29.4
)

require (
github.com/go-logr/logr v1.3.0 // indirect
golang.org/x/text v0.14.0 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20191204025024-5ee1b9f4859a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down
42 changes: 22 additions & 20 deletions salesforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ func validateCollections(sf Salesforce, records any, batchSize int) error {
return nil
}

func validateBulk(sf Salesforce, records any, batchSize int) error {
func validateBulk(sf Salesforce, records any, batchSize int, isFile bool) error {
authErr := validateAuth(sf)
if authErr != nil {
return authErr
}
typErr := validateOfTypeSlice(records)
if typErr != nil {
return typErr
if !isFile {
typErr := validateOfTypeSlice(records)
if typErr != nil {
return typErr
}
}
batchSizeErr := validateBatchSizeWithinRange(batchSize, bulkBatchSizeMax)
if batchSizeErr != nil {
Expand Down Expand Up @@ -464,7 +466,7 @@ func (sf *Salesforce) QueryStructBulkExport(soqlStruct any, filePath string) err
}

func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
validationErr := validateBulk(*sf, records, batchSize, false)
if validationErr != nil {
return []string{}, validationErr
}
Expand All @@ -478,9 +480,9 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int,
}

func (sf *Salesforce) InsertBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
validationErr := validateBulk(*sf, nil, batchSize, true)
if validationErr != nil {
return []string{}, validationErr
}

jobIds, bulkErr := doBulkJobWithFile(*sf.auth, sObjectName, "", insertOperation, filePath, batchSize, waitForResults)
Expand All @@ -492,7 +494,7 @@ func (sf *Salesforce) InsertBulkFile(sObjectName string, filePath string, batchS
}

func (sf *Salesforce) UpdateBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
validationErr := validateBulk(*sf, records, batchSize, false)
if validationErr != nil {
return []string{}, validationErr
}
Expand All @@ -506,9 +508,9 @@ func (sf *Salesforce) UpdateBulk(sObjectName string, records any, batchSize int,
}

func (sf *Salesforce) UpdateBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
validationErr := validateBulk(*sf, nil, batchSize, true)
if validationErr != nil {
return []string{}, validationErr
}

jobIds, bulkErr := doBulkJobWithFile(*sf.auth, sObjectName, "", updateOperation, filePath, batchSize, waitForResults)
Expand All @@ -520,7 +522,7 @@ func (sf *Salesforce) UpdateBulkFile(sObjectName string, filePath string, batchS
}

func (sf *Salesforce) UpsertBulk(sObjectName string, externalIdFieldName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
validationErr := validateBulk(*sf, records, batchSize, false)
if validationErr != nil {
return []string{}, validationErr
}
Expand All @@ -534,9 +536,9 @@ func (sf *Salesforce) UpsertBulk(sObjectName string, externalIdFieldName string,
}

func (sf *Salesforce) UpsertBulkFile(sObjectName string, externalIdFieldName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
validationErr := validateBulk(*sf, nil, batchSize, true)
if validationErr != nil {
return []string{}, validationErr
}

jobIds, bulkErr := doBulkJobWithFile(*sf.auth, sObjectName, externalIdFieldName, upsertOperation, filePath, batchSize, waitForResults)
Expand All @@ -548,7 +550,7 @@ func (sf *Salesforce) UpsertBulkFile(sObjectName string, externalIdFieldName str
}

func (sf *Salesforce) DeleteBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
validationErr := validateBulk(*sf, records, batchSize, false)
if validationErr != nil {
return []string{}, validationErr
}
Expand All @@ -562,9 +564,9 @@ func (sf *Salesforce) DeleteBulk(sObjectName string, records any, batchSize int,
}

func (sf *Salesforce) DeleteBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
validationErr := validateBulk(*sf, nil, batchSize, true)
if validationErr != nil {
return []string{}, validationErr
}

jobIds, bulkErr := doBulkJobWithFile(*sf.auth, sObjectName, "", deleteOperation, filePath, batchSize, waitForResults)
Expand Down
Loading

0 comments on commit b03426b

Please sign in to comment.