Skip to content

Commit

Permalink
bulk operations with local csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
k-capehart committed Apr 24, 2024
1 parent 993951f commit 0301c33
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 12 deletions.
128 changes: 116 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ go get github.com/k-capehart/go-salesforce
### Structs
```go
type Salesforce struct {
auth *Auth
auth *auth
}

type Creds struct {
Expand Down Expand Up @@ -486,7 +486,17 @@ if err != nil {
Create Bulk API Jobs to query, insert, update, upsert, and delete large collections of records
- [Review Salesforce REST API resources for Bulk v2](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/bulk_api_2_0.htm)
- Work with large lists of records by passing either a slice or records or the path to a csv file
- Jobs can run asynchronously and optionally allow to wait for results or not
- Jobs can run asynchronously and optionally wait for them to finish so errors are available

### Structs
```go
type BulkJobResults struct {
Id string
State string
NumberRecordsFailed int
ErrorMessage string
}
```

### InsertBulk
`func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error)`
Expand Down Expand Up @@ -517,6 +527,29 @@ if err != nil {
}
```

### InsertBulkFile
`func (sf *Salesforce) InsertBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error)`

Inserts a collection of salesforce records from a csv file using Bulk API v2, returning a list of Job IDs
- `sObjectName`: API name of Salesforce object
- `filePath`: path to a csv file containing salesforce data
- `batchSize`: `1 <= batchSize <= 10000`
- `waitForResults`: denotes whether to wait for jobs to finish and return any errors if they are encountered during the operation

`data/avengers.csv`
```
FirstName,LastName
Tony,Stark
Steve,Rogers
Bruce,Banner
```
```go
_, err := sf.InsertBulkFile("Contact", "data/avengers.csv", 1000, true)
if err != nil {
panic(err)
}
```

### UpdateBulk
`func (sf *Salesforce) UpdateBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error)`

Expand Down Expand Up @@ -550,6 +583,33 @@ if err != nil {
}
```

### UpdateBulkFile
`func (sf *Salesforce) UpdateBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error)`

Updates a collection of salesforce records from a csv file using Bulk API v2, returning a list of Job IDs
- `sObjectName`: API name of Salesforce object
- `filePath`: path to a csv file containing salesforce data
- An Id is required within csv data
- `batchSize`: `1 <= batchSize <= 10000`
- `waitForResults`: denotes whether to wait for jobs to finish and return any errors if they are encountered during the operation

`data/update_avengers.csv`
```
Id,FirstName,LastName
003Dn00000pEwRuIAK,Rocket,Raccoon
003Dn00000pEwQxIAK,Drax,The Destroyer
003Dn00000pEwQyIAK,Peter,Quill
003Dn00000pEwQzIAK,I Am,Groot
003Dn00000pEwR0IAK,Gamora,Zen Whoberi Ben Titan
003Dn00000pEwR1IAK,Mantis,Mantis
```
```go
_, err := sf.UpdateBulkFile("Contact", "data/update_avengers.csv", 1000, true)
if err != nil {
panic(err)
}
```

### UpsertBulk
`func (sf *Salesforce) UpsertBulk(sObjectName string, externalIdFieldName string, records any, batchSize int, waitForResults bool) ([]string, error)`

Expand Down Expand Up @@ -584,6 +644,32 @@ if err != nil {
}
```

### UpsertBulkFile
`func (sf *Salesforce) UpsertBulkFile(sObjectName string, externalIdFieldName string, filePath string, batchSize int, waitForResults bool) ([]string, error)`

Updates (or inserts) a collection of salesforce records from a csv file using Bulk API v2, returning a list of Job IDs
- `sObjectName`: API name of Salesforce object
- `externalIdFieldName`: field API name for an external Id that exists on the given object
- `filePath`: path to a csv file containing salesforce data
- A value for the External Id is required within csv data
- `batchSize`: `1 <= batchSize <= 10000`
- `waitForResults`: denotes whether to wait for jobs to finish and return any errors if they are encountered during the operation

`data/upsert_avengers.csv`
```
ContactExternalId__c,FirstName,LastName
Avng7,Matt,Murdock
Avng8,Luke,Cage
Avng9,Jessica,Jones
Avng10,Danny,Rand
```
```go
_, err := sf.UpsertBulkFile("Contact", "ContactExternalId__c", "data/upsert_avengers.csv", 1000, true)
if err != nil {
panic(err)
}
```

### DeleteBulk
`func (sf *Salesforce) DeleteBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error)`

Expand Down Expand Up @@ -614,22 +700,40 @@ if err != nil {
}
```

### DeleteBulkFile
`func (sf *Salesforce) DeleteBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error)`

Deletes a collection of salesforce records from a csv file using Bulk API v2, returning a list of Job IDs
- `sObjectName`: API name of Salesforce object
- `filePath`: path to a csv file containing salesforce data
- should only contain Ids
- `batchSize`: `1 <= batchSize <= 10000`
- `waitForResults`: denotes whether to wait for jobs to finish and return any errors if they are encountered during the operation

`data/delete_avengers.csv`
```
Id
003Dn00000pEwRuIAK
003Dn00000pEwQxIAK
003Dn00000pEwQyIAK
003Dn00000pEwQzIAK
003Dn00000pEwR0IAK
003Dn00000pEwR1IAK
```
```go
_, err := sf.DeleteBulkFile("Contact", "data/delete_avengers.csv", 1000, true)
if err != nil {
panic(err)
}
```

### GetJobResults
`func (sf *Salesforce) GetJobResults(bulkJobId string) (BulkJobResults, error)`

Returns an instance of BulkJobResults given a Job Id
- `bulkJobId`: the Id for a bulk API job
- Can be used when you want to check the results of a job, but at a later time

```go
// function returns a list of type BulkJobResults, as defined here
type BulkJobResults struct {
Id string
State string
NumberRecordsFailed int
ErrorMessage string
}
```
```go
type Contact struct {
LastName string
Expand All @@ -647,7 +751,7 @@ if err != nil {
}
time.Sleep(time.Second)
for _, id := range jobIds {
results, err := sf.GetJobResults(id)
results, err := sf.GetJobResults(id) // returns an instance of BulkJobResults
if err != nil {
panic(err)
}
Expand Down
60 changes: 60 additions & 0 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -210,6 +211,33 @@ func mapsToCSV(maps []map[string]any) (string, error) {
return buf.String(), nil
}

func readCSVFile(filePath string) ([]map[string]string, error) {
file, fileErr := os.Open(filePath)
if fileErr != nil {
return nil, fileErr
}
defer file.Close()

reader := csv.NewReader(file)
records, readErr := reader.ReadAll()
if readErr != nil {
return nil, readErr
}

keys := records[0]

var recordMap []map[string]string
for _, row := range records[1:] {
record := make(map[string]string)
for i, col := range row {
record[keys[i]] = col
}
recordMap = append(recordMap, record)
}

return recordMap, nil
}

func doBulkJob(auth auth, sObjectName string, fieldName string, operation string, records any, batchSize int, waitForResults bool) ([]string, error) {
recordMap, err := convertToSliceOfMaps(records)
if err != nil {
Expand Down Expand Up @@ -283,14 +311,46 @@ func doInsertBulk(auth auth, sObjectName string, records any, batchSize int, wai
return doBulkJob(auth, sObjectName, "", insertOperation, records, batchSize, waitForResults)
}

func doInsertBulkFile(auth auth, sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
data, err := readCSVFile(filePath)
if err != nil {
return []string{}, err
}
return doBulkJob(auth, sObjectName, "", insertOperation, data, batchSize, waitForResults)
}

func doUpdateBulk(auth auth, sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
return doBulkJob(auth, sObjectName, "", updateOperation, records, batchSize, waitForResults)
}

func doUpdateBulkFile(auth auth, sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
data, err := readCSVFile(filePath)
if err != nil {
return []string{}, err
}
return doBulkJob(auth, sObjectName, "", updateOperation, data, batchSize, waitForResults)
}

func doUpsertBulk(auth auth, sObjectName string, fieldName string, records any, batchSize int, waitForResults bool) ([]string, error) {
return doBulkJob(auth, sObjectName, fieldName, upsertOperation, records, batchSize, waitForResults)
}

func doUpsertBulkFile(auth auth, sObjectName string, fieldName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
data, err := readCSVFile(filePath)
if err != nil {
return []string{}, err
}
return doBulkJob(auth, sObjectName, fieldName, upsertOperation, data, batchSize, waitForResults)
}

func doDeleteBulk(auth auth, sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
return doBulkJob(auth, sObjectName, "", deleteOperation, records, batchSize, waitForResults)
}

func doDeleteBulkFile(auth auth, sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
data, err := readCSVFile(filePath)
if err != nil {
return []string{}, err
}
return doBulkJob(auth, sObjectName, "", deleteOperation, data, batchSize, waitForResults)
}
56 changes: 56 additions & 0 deletions salesforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int,
return jobIds, nil
}

func (sf *Salesforce) InsertBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
}

jobIds, bulkErr := doInsertBulkFile(*sf.auth, sObjectName, filePath, batchSize, waitForResults)
if bulkErr != nil {
return []string{}, bulkErr
}

return jobIds, nil
}

func (sf *Salesforce) UpdateBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
if validationErr != nil {
Expand All @@ -426,6 +440,20 @@ func (sf *Salesforce) UpdateBulk(sObjectName string, records any, batchSize int,
return jobIds, nil
}

func (sf *Salesforce) UpdateBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
}

jobIds, bulkErr := doUpdateBulkFile(*sf.auth, sObjectName, filePath, batchSize, waitForResults)
if bulkErr != nil {
return []string{}, bulkErr
}

return jobIds, nil
}

func (sf *Salesforce) UpsertBulk(sObjectName string, externalIdFieldName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
if validationErr != nil {
Expand All @@ -440,6 +468,20 @@ func (sf *Salesforce) UpsertBulk(sObjectName string, externalIdFieldName string,
return jobIds, nil
}

func (sf *Salesforce) UpsertBulkFile(sObjectName string, externalIdFieldName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
}

jobIds, bulkErr := doUpsertBulkFile(*sf.auth, sObjectName, externalIdFieldName, filePath, batchSize, waitForResults)
if bulkErr != nil {
return []string{}, bulkErr
}

return jobIds, nil
}

func (sf *Salesforce) DeleteBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
validationErr := validateBulk(*sf, records, batchSize)
if validationErr != nil {
Expand All @@ -454,6 +496,20 @@ func (sf *Salesforce) DeleteBulk(sObjectName string, records any, batchSize int,
return jobIds, nil
}

func (sf *Salesforce) DeleteBulkFile(sObjectName string, filePath string, batchSize int, waitForResults bool) ([]string, error) {
authErr := validateAuth(*sf)
if authErr != nil {
return []string{}, authErr
}

jobIds, bulkErr := doDeleteBulkFile(*sf.auth, sObjectName, filePath, batchSize, waitForResults)
if bulkErr != nil {
return []string{}, bulkErr
}

return jobIds, nil
}

func (sf *Salesforce) GetJobResults(bulkJobId string) (BulkJobResults, error) {
authErr := validateAuth(*sf)
if authErr != nil {
Expand Down

0 comments on commit 0301c33

Please sign in to comment.