Skip to content

Commit

Permalink
bulk query export to file
Browse files Browse the repository at this point in the history
  • Loading branch information
k-capehart committed Apr 24, 2024
1 parent 0301c33 commit 3cea006
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 47 deletions.
51 changes: 48 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ if err != nil {

### QueryStruct
`func (sf *Salesforce) QueryStruct(soqlStruct any, sObject any) error`
- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql)

Performs a SOQL query given a go-soql struct and decodes the response into the given struct
- `soqlStruct`: a custom struct using `soql` tags
- `sObject`: a slice of a custom struct type representing a Salesforce Object
- Eliminates need to separately maintain query string and struct
- Helps prevent SOQL injection
- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql)
- Eliminates need to separately maintain query string and struct
- Helps prevent SOQL injection

```go
type Contact struct {
Expand Down Expand Up @@ -498,6 +498,51 @@ type BulkJobResults struct {
}
```

### QueryBulkExport
`func (sf *Salesforce) QueryBulkExport(filePath string, query string) error`

Performs a query and exports the data to a csv file
- `filePath`: name and path of a csv file to be created
- `query`: a SOQL query

```go
err := sf.QueryBulkExport("data/export.csv", "SELECT Id, FirstName, LastName FROM Contact")
if err != nil {
panic(err)
}
```

### QueryStructBulkExport
`func (sf *Salesforce) QueryStructBulkExport(filePath string, soqlStruct any) error`

Performs a SOQL query given a go-soql struct and decodes the response into the given struct
- `filePath`: name and path of a csv file to be created
- `soqlStruct`: a custom struct using `soql` tags
- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql)
- Eliminates need to separately maintain query string and struct
- Helps prevent SOQL injection

```go
type ContactSoql struct {
Id string `soql:"selectColumn,fieldName=Id" json:"Id"`
FirstName string `soql:"selectColumn,fieldName=FirstName" json:"FirstName"`
LastName string `soql:"selectColumn,fieldName=LastName" json:"LastName"`
}

type ContactSoqlQuery struct {
SelectClause ContactSoql `soql:"selectClause,tableName=Contact"`
}
```
```go
soqlStruct := ContactSoqlQuery{
SelectClause: ContactSoql{},
}
err := sf.QueryStructBulkExport("data/export2.csv", soqlStruct)
if err != nil {
panic(err)
}
```

### InsertBulk
`func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error)`

Expand Down
201 changes: 179 additions & 22 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type bulkJobCreationRequest struct {
ExternalIdFieldName string `json:"externalIdFieldName"`
}

type bulkQueryJobCreationRequest struct {
Operation string `json:"operation"`
Query string `json:"query"`
}

type bulkJob struct {
Id string `json:"id"`
State string `json:"state"`
Expand All @@ -34,6 +39,12 @@ type BulkJobResults struct {
ErrorMessage string `json:"errorMessage"`
}

type bulkJobQueryResults struct {
NumberOfRecords int `json:"Sforce-Numberofrecords"`
Locator string `json:"Sforce-Locator"`
Data []map[string]string
}

const (
jobStateAborted = "Aborted"
jobStateUploadComplete = "UploadComplete"
Expand All @@ -44,6 +55,9 @@ const (
updateOperation = "update"
upsertOperation = "upsert"
deleteOperation = "delete"
queryOperation = "query"
ingestJobType = "ingest"
queryJobType = "query"
)

func updateJobState(job bulkJob, state string, auth auth) error {
Expand All @@ -59,27 +73,27 @@ func updateJobState(job bulkJob, state string, auth auth) error {
return nil
}

func createBulkJob(auth auth, body []byte) (*bulkJob, error) {
resp, err := doRequest(http.MethodPost, "/jobs/ingest", jsonType, auth, string(body))
func createBulkJob(auth auth, jobType string, body []byte) (bulkJob, error) {
resp, err := doRequest(http.MethodPost, "/jobs/"+jobType, jsonType, auth, string(body))
if err != nil {
return nil, err
return bulkJob{}, err
}
if resp.StatusCode != http.StatusOK {
return nil, processSalesforceError(*resp)
return bulkJob{}, processSalesforceError(*resp)
}

respBody, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, readErr
return bulkJob{}, readErr
}

bulkJob := &bulkJob{}
jsonError := json.Unmarshal(respBody, bulkJob)
newJob := &bulkJob{}
jsonError := json.Unmarshal(respBody, newJob)
if jsonError != nil {
return nil, jsonError
return bulkJob{}, jsonError
}

return bulkJob, nil
return *newJob, nil
}

func uploadJobData(auth auth, data string, bulkJob bulkJob) error {
Expand All @@ -100,8 +114,8 @@ func uploadJobData(auth auth, data string, bulkJob bulkJob) error {
return nil
}

func getJobResults(auth auth, bulkJobId string) (BulkJobResults, error) {
resp, err := doRequest(http.MethodGet, "/jobs/ingest/"+bulkJobId, jsonType, auth, "")
func getJobResults(auth auth, jobType string, bulkJobId string) (BulkJobResults, error) {
resp, err := doRequest(http.MethodGet, "/jobs/"+jobType+"/"+bulkJobId, jsonType, auth, "")
if err != nil {
return BulkJobResults{}, err
}
Expand All @@ -125,16 +139,16 @@ func getJobResults(auth auth, bulkJobId string) (BulkJobResults, error) {

func waitForJobResult(auth auth, bulkJobId string, c chan error) {
err := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute, false, func(context.Context) (bool, error) {
bulkJob, reqErr := getJobResults(auth, bulkJobId)
bulkJob, reqErr := getJobResults(auth, ingestJobType, bulkJobId)
if reqErr != nil {
return true, reqErr
}
return processJobResults(auth, bulkJob)
return isBulkJobDone(auth, bulkJob)
})
c <- err
}

func processJobResults(auth auth, bulkJob BulkJobResults) (bool, error) {
func isBulkJobDone(auth auth, bulkJob BulkJobResults) (bool, error) {
if bulkJob.State == jobStateJobComplete || bulkJob.State == jobStateFailed {
if bulkJob.ErrorMessage != "" {
return true, errors.New(bulkJob.ErrorMessage)
Expand All @@ -154,6 +168,67 @@ func processJobResults(auth auth, bulkJob BulkJobResults) (bool, error) {
return false, nil
}

func getQueryJobResults(auth auth, bulkJobId string, locator string) (bulkJobQueryResults, error) {
uri := "/jobs/query/" + bulkJobId + "/results"
if locator != "" {
uri = uri + "/?locator=" + locator
}
resp, err := doRequest(http.MethodGet, uri, jsonType, auth, "")
if err != nil {
return bulkJobQueryResults{}, err
}
if resp.StatusCode != http.StatusOK {
return bulkJobQueryResults{}, processSalesforceError(*resp)
}

reader := csv.NewReader(resp.Body)
recordMap, readErr := csvToMap(*reader)
if readErr != nil {
return bulkJobQueryResults{}, readErr
}
numberOfRecords, _ := strconv.Atoi(resp.Header["Sforce-Numberofrecords"][0])
locator = ""
if resp.Header["Sforce-Locator"][0] != "null" {
locator = resp.Header["Sforce-Locator"][0]
}

queryResults := bulkJobQueryResults{
NumberOfRecords: numberOfRecords,
Locator: locator,
Data: recordMap,
}

return queryResults, nil
}

func waitForQueryResults(auth auth, bulkJobId string) ([]map[string]string, error) {
err := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute, false, func(context.Context) (bool, error) {
bulkJob, reqErr := getJobResults(auth, queryJobType, bulkJobId)
if reqErr != nil {
return true, reqErr
}
return isBulkJobDone(auth, bulkJob)
})
if err != nil {
return nil, err
}

queryResults, resultsErr := getQueryJobResults(auth, bulkJobId, "")
if resultsErr != nil {
return nil, resultsErr
}
records := queryResults.Data
for queryResults.Locator != "" {
queryResults, resultsErr = getQueryJobResults(auth, bulkJobId, queryResults.Locator)
if resultsErr != nil {
return nil, resultsErr
}
records = append(records, queryResults.Data...)
}

return records, nil
}

func getFailedRecords(auth auth, bulkJobId string) (string, error) {
resp, err := doRequest(http.MethodGet, "/jobs/ingest/"+bulkJobId+"/failedResults", jsonType, auth, "")
if err != nil {
Expand Down Expand Up @@ -211,14 +286,31 @@ func mapsToCSV(maps []map[string]any) (string, error) {
return buf.String(), nil
}

func readCSVFile(filePath string) ([]map[string]string, error) {
file, fileErr := os.Open(filePath)
if fileErr != nil {
return nil, fileErr
func mapsToCSVSlices(maps []map[string]string) ([][]string, error) {
var data [][]string
var headers []string

if len(maps) > 0 {
headers = make([]string, 0, len(maps[0]))
for header := range maps[0] {
headers = append(headers, header)
}
data = append(data, headers)
}
defer file.Close()

reader := csv.NewReader(file)
for _, m := range maps {
row := make([]string, 0, len(headers))
for _, header := range headers {
val := m[header]
row = append(row, val)
}
data = append(data, row)
}

return data, nil
}

func csvToMap(reader csv.Reader) ([]map[string]string, error) {
records, readErr := reader.ReadAll()
if readErr != nil {
return nil, readErr
Expand All @@ -238,6 +330,36 @@ func readCSVFile(filePath string) ([]map[string]string, error) {
return recordMap, nil
}

func readCSVFile(filePath string) ([]map[string]string, error) {
file, fileErr := os.Open(filePath)
if fileErr != nil {
return nil, fileErr
}
defer file.Close()

reader := csv.NewReader(file)
recordMap, readErr := csvToMap(*reader)
if readErr != nil {
return nil, readErr
}

return recordMap, nil
}

func writeCSVFile(filePath string, data [][]string) error {
file, fileErr := os.Create(filePath)
if fileErr != nil {
return fileErr
}
defer file.Close()

writer := csv.NewWriter(file)
defer writer.Flush()
writer.WriteAll(data)

return nil
}

func doBulkJob(auth auth, sObjectName string, fieldName string, operation string, records any, batchSize int, waitForResults bool) ([]string, error) {
recordMap, err := convertToSliceOfMaps(records)
if err != nil {
Expand Down Expand Up @@ -266,7 +388,7 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string
break
}

job, jobCreationErr := createBulkJob(auth, body)
job, jobCreationErr := createBulkJob(auth, ingestJobType, body)
if jobCreationErr != nil {
jobErrors = errors.Join(jobErrors, jobCreationErr)
break
Expand All @@ -284,7 +406,7 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string
break
}

uploadErr := uploadJobData(auth, data, *job)
uploadErr := uploadJobData(auth, data, job)
if uploadErr != nil {
jobErrors = uploadErr
break
Expand All @@ -307,6 +429,41 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string
return jobIds, jobErrors
}

func doQueryBulk(auth auth, filePath string, query string) error {
queryJobReq := bulkQueryJobCreationRequest{
Operation: queryOperation,
Query: query,
}
body, jsonErr := json.Marshal(queryJobReq)
if jsonErr != nil {
return jsonErr
}

job, jobCreationErr := createBulkJob(auth, queryJobType, body)
if jobCreationErr != nil {
return jobCreationErr
}
if job.Id == "" {
newErr := errors.New("error creating bulk query job")
return newErr
}

records, jobErr := waitForQueryResults(auth, job.Id)
if jobErr != nil {
return jobErr
}
data, convertErr := mapsToCSVSlices(records)
if convertErr != nil {
return convertErr
}
writeErr := writeCSVFile(filePath, data)
if writeErr != nil {
return writeErr
}

return nil
}

func doInsertBulk(auth auth, sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) {
return doBulkJob(auth, sObjectName, "", insertOperation, records, batchSize, waitForResults)
}
Expand Down
12 changes: 0 additions & 12 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/url"
"strings"

"github.com/forcedotcom/go-soql"
"github.com/mitchellh/mapstructure"
)

Expand Down Expand Up @@ -60,14 +59,3 @@ func performQuery(auth auth, query string, sObject any) error {

return nil
}

func marshalQueryStruct(auth auth, soqlStruct any, sObject any) error {
soqlQuery, err := soql.Marshal(soqlStruct)
if err != nil {
return err
}

performQuery(auth, soqlQuery, sObject)

return nil
}
Loading

0 comments on commit 3cea006

Please sign in to comment.