diff --git a/README.md b/README.md index 16ae411..a834cf9 100644 --- a/README.md +++ b/README.md @@ -82,13 +82,13 @@ if err != nil { ### QueryStruct `func (sf *Salesforce) QueryStruct(soqlStruct any, sObject any) error` -- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql) Performs a SOQL query given a go-soql struct and decodes the response into the given struct - `soqlStruct`: a custom struct using `soql` tags - `sObject`: a slice of a custom struct type representing a Salesforce Object -- Eliminates need to separately maintain query string and struct -- Helps prevent SOQL injection +- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql) + - Eliminates need to separately maintain query string and struct + - Helps prevent SOQL injection ```go type Contact struct { @@ -498,6 +498,51 @@ type BulkJobResults struct { } ``` +### QueryBulkExport +`func (sf *Salesforce) QueryBulkExport(filePath string, query string) error` + +Performs a query and exports the data to a csv file +- `filePath`: name and path of a csv file to be created +- `query`: a SOQL query + +```go +err := sf.QueryBulkExport("data/export.csv", "SELECT Id, FirstName, LastName FROM Contact") +if err != nil { + panic(err) +} +``` + +### QueryStructBulkExport +`func (sf *Salesforce) QueryStructBulkExport(filePath string, soqlStruct any) error` + +Performs a SOQL query given a go-soql struct and decodes the response into the given struct +- `filePath`: name and path of a csv file to be created +- `soqlStruct`: a custom struct using `soql` tags +- Review [forcedotcom/go-soql](https://github.com/forcedotcom/go-soql) + - Eliminates need to separately maintain query string and struct + - Helps prevent SOQL injection + +```go +type ContactSoql struct { + Id string `soql:"selectColumn,fieldName=Id" json:"Id"` + FirstName string `soql:"selectColumn,fieldName=FirstName" json:"FirstName"` + LastName string `soql:"selectColumn,fieldName=LastName" json:"LastName"` +} + +type ContactSoqlQuery struct { + SelectClause ContactSoql `soql:"selectClause,tableName=Contact"` +} +``` +```go +soqlStruct := ContactSoqlQuery{ + SelectClause: ContactSoql{}, +} +err := sf.QueryStructBulkExport("data/export2.csv", soqlStruct) +if err != nil { + panic(err) +} +``` + ### InsertBulk `func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error)` diff --git a/bulk.go b/bulk.go index 912b30a..e595af9 100644 --- a/bulk.go +++ b/bulk.go @@ -22,6 +22,11 @@ type bulkJobCreationRequest struct { ExternalIdFieldName string `json:"externalIdFieldName"` } +type bulkQueryJobCreationRequest struct { + Operation string `json:"operation"` + Query string `json:"query"` +} + type bulkJob struct { Id string `json:"id"` State string `json:"state"` @@ -34,6 +39,12 @@ type BulkJobResults struct { ErrorMessage string `json:"errorMessage"` } +type bulkJobQueryResults struct { + NumberOfRecords int `json:"Sforce-Numberofrecords"` + Locator string `json:"Sforce-Locator"` + Data []map[string]string +} + const ( jobStateAborted = "Aborted" jobStateUploadComplete = "UploadComplete" @@ -44,6 +55,9 @@ const ( updateOperation = "update" upsertOperation = "upsert" deleteOperation = "delete" + queryOperation = "query" + ingestJobType = "ingest" + queryJobType = "query" ) func updateJobState(job bulkJob, state string, auth auth) error { @@ -59,27 +73,27 @@ func updateJobState(job bulkJob, state string, auth auth) error { return nil } -func createBulkJob(auth auth, body []byte) (*bulkJob, error) { - resp, err := doRequest(http.MethodPost, "/jobs/ingest", jsonType, auth, string(body)) +func createBulkJob(auth auth, jobType string, body []byte) (bulkJob, error) { + resp, err := doRequest(http.MethodPost, "/jobs/"+jobType, jsonType, auth, string(body)) if err != nil { - return nil, err + return bulkJob{}, err } if resp.StatusCode != http.StatusOK { - return nil, processSalesforceError(*resp) + return bulkJob{}, processSalesforceError(*resp) } respBody, readErr := io.ReadAll(resp.Body) if readErr != nil { - return nil, readErr + return bulkJob{}, readErr } - bulkJob := &bulkJob{} - jsonError := json.Unmarshal(respBody, bulkJob) + newJob := &bulkJob{} + jsonError := json.Unmarshal(respBody, newJob) if jsonError != nil { - return nil, jsonError + return bulkJob{}, jsonError } - return bulkJob, nil + return *newJob, nil } func uploadJobData(auth auth, data string, bulkJob bulkJob) error { @@ -100,8 +114,8 @@ func uploadJobData(auth auth, data string, bulkJob bulkJob) error { return nil } -func getJobResults(auth auth, bulkJobId string) (BulkJobResults, error) { - resp, err := doRequest(http.MethodGet, "/jobs/ingest/"+bulkJobId, jsonType, auth, "") +func getJobResults(auth auth, jobType string, bulkJobId string) (BulkJobResults, error) { + resp, err := doRequest(http.MethodGet, "/jobs/"+jobType+"/"+bulkJobId, jsonType, auth, "") if err != nil { return BulkJobResults{}, err } @@ -125,16 +139,16 @@ func getJobResults(auth auth, bulkJobId string) (BulkJobResults, error) { func waitForJobResult(auth auth, bulkJobId string, c chan error) { err := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute, false, func(context.Context) (bool, error) { - bulkJob, reqErr := getJobResults(auth, bulkJobId) + bulkJob, reqErr := getJobResults(auth, ingestJobType, bulkJobId) if reqErr != nil { return true, reqErr } - return processJobResults(auth, bulkJob) + return isBulkJobDone(auth, bulkJob) }) c <- err } -func processJobResults(auth auth, bulkJob BulkJobResults) (bool, error) { +func isBulkJobDone(auth auth, bulkJob BulkJobResults) (bool, error) { if bulkJob.State == jobStateJobComplete || bulkJob.State == jobStateFailed { if bulkJob.ErrorMessage != "" { return true, errors.New(bulkJob.ErrorMessage) @@ -154,6 +168,67 @@ func processJobResults(auth auth, bulkJob BulkJobResults) (bool, error) { return false, nil } +func getQueryJobResults(auth auth, bulkJobId string, locator string) (bulkJobQueryResults, error) { + uri := "/jobs/query/" + bulkJobId + "/results" + if locator != "" { + uri = uri + "/?locator=" + locator + } + resp, err := doRequest(http.MethodGet, uri, jsonType, auth, "") + if err != nil { + return bulkJobQueryResults{}, err + } + if resp.StatusCode != http.StatusOK { + return bulkJobQueryResults{}, processSalesforceError(*resp) + } + + reader := csv.NewReader(resp.Body) + recordMap, readErr := csvToMap(*reader) + if readErr != nil { + return bulkJobQueryResults{}, readErr + } + numberOfRecords, _ := strconv.Atoi(resp.Header["Sforce-Numberofrecords"][0]) + locator = "" + if resp.Header["Sforce-Locator"][0] != "null" { + locator = resp.Header["Sforce-Locator"][0] + } + + queryResults := bulkJobQueryResults{ + NumberOfRecords: numberOfRecords, + Locator: locator, + Data: recordMap, + } + + return queryResults, nil +} + +func waitForQueryResults(auth auth, bulkJobId string) ([]map[string]string, error) { + err := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute, false, func(context.Context) (bool, error) { + bulkJob, reqErr := getJobResults(auth, queryJobType, bulkJobId) + if reqErr != nil { + return true, reqErr + } + return isBulkJobDone(auth, bulkJob) + }) + if err != nil { + return nil, err + } + + queryResults, resultsErr := getQueryJobResults(auth, bulkJobId, "") + if resultsErr != nil { + return nil, resultsErr + } + records := queryResults.Data + for queryResults.Locator != "" { + queryResults, resultsErr = getQueryJobResults(auth, bulkJobId, queryResults.Locator) + if resultsErr != nil { + return nil, resultsErr + } + records = append(records, queryResults.Data...) + } + + return records, nil +} + func getFailedRecords(auth auth, bulkJobId string) (string, error) { resp, err := doRequest(http.MethodGet, "/jobs/ingest/"+bulkJobId+"/failedResults", jsonType, auth, "") if err != nil { @@ -211,14 +286,31 @@ func mapsToCSV(maps []map[string]any) (string, error) { return buf.String(), nil } -func readCSVFile(filePath string) ([]map[string]string, error) { - file, fileErr := os.Open(filePath) - if fileErr != nil { - return nil, fileErr +func mapsToCSVSlices(maps []map[string]string) ([][]string, error) { + var data [][]string + var headers []string + + if len(maps) > 0 { + headers = make([]string, 0, len(maps[0])) + for header := range maps[0] { + headers = append(headers, header) + } + data = append(data, headers) } - defer file.Close() - reader := csv.NewReader(file) + for _, m := range maps { + row := make([]string, 0, len(headers)) + for _, header := range headers { + val := m[header] + row = append(row, val) + } + data = append(data, row) + } + + return data, nil +} + +func csvToMap(reader csv.Reader) ([]map[string]string, error) { records, readErr := reader.ReadAll() if readErr != nil { return nil, readErr @@ -238,6 +330,36 @@ func readCSVFile(filePath string) ([]map[string]string, error) { return recordMap, nil } +func readCSVFile(filePath string) ([]map[string]string, error) { + file, fileErr := os.Open(filePath) + if fileErr != nil { + return nil, fileErr + } + defer file.Close() + + reader := csv.NewReader(file) + recordMap, readErr := csvToMap(*reader) + if readErr != nil { + return nil, readErr + } + + return recordMap, nil +} + +func writeCSVFile(filePath string, data [][]string) error { + file, fileErr := os.Create(filePath) + if fileErr != nil { + return fileErr + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + writer.WriteAll(data) + + return nil +} + func doBulkJob(auth auth, sObjectName string, fieldName string, operation string, records any, batchSize int, waitForResults bool) ([]string, error) { recordMap, err := convertToSliceOfMaps(records) if err != nil { @@ -266,7 +388,7 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string break } - job, jobCreationErr := createBulkJob(auth, body) + job, jobCreationErr := createBulkJob(auth, ingestJobType, body) if jobCreationErr != nil { jobErrors = errors.Join(jobErrors, jobCreationErr) break @@ -284,7 +406,7 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string break } - uploadErr := uploadJobData(auth, data, *job) + uploadErr := uploadJobData(auth, data, job) if uploadErr != nil { jobErrors = uploadErr break @@ -307,6 +429,41 @@ func doBulkJob(auth auth, sObjectName string, fieldName string, operation string return jobIds, jobErrors } +func doQueryBulk(auth auth, filePath string, query string) error { + queryJobReq := bulkQueryJobCreationRequest{ + Operation: queryOperation, + Query: query, + } + body, jsonErr := json.Marshal(queryJobReq) + if jsonErr != nil { + return jsonErr + } + + job, jobCreationErr := createBulkJob(auth, queryJobType, body) + if jobCreationErr != nil { + return jobCreationErr + } + if job.Id == "" { + newErr := errors.New("error creating bulk query job") + return newErr + } + + records, jobErr := waitForQueryResults(auth, job.Id) + if jobErr != nil { + return jobErr + } + data, convertErr := mapsToCSVSlices(records) + if convertErr != nil { + return convertErr + } + writeErr := writeCSVFile(filePath, data) + if writeErr != nil { + return writeErr + } + + return nil +} + func doInsertBulk(auth auth, sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) { return doBulkJob(auth, sObjectName, "", insertOperation, records, batchSize, waitForResults) } diff --git a/query.go b/query.go index b45e40a..f8ac319 100644 --- a/query.go +++ b/query.go @@ -7,7 +7,6 @@ import ( "net/url" "strings" - "github.com/forcedotcom/go-soql" "github.com/mitchellh/mapstructure" ) @@ -60,14 +59,3 @@ func performQuery(auth auth, query string, sObject any) error { return nil } - -func marshalQueryStruct(auth auth, soqlStruct any, sObject any) error { - soqlQuery, err := soql.Marshal(soqlStruct) - if err != nil { - return err - } - - performQuery(auth, soqlQuery, sObject) - - return nil -} diff --git a/salesforce.go b/salesforce.go index 292b246..08334f5 100644 --- a/salesforce.go +++ b/salesforce.go @@ -8,6 +8,8 @@ import ( "reflect" "strconv" "strings" + + "github.com/forcedotcom/go-soql" ) type Salesforce struct { @@ -52,7 +54,7 @@ func doRequest(method string, uri string, content string, auth auth, body string req.Header.Set("User-Agent", "go-salesforce") req.Header.Set("Content-Type", content) - req.Header.Set("Accept", jsonType) + req.Header.Set("Accept", content) req.Header.Set("Authorization", "Bearer "+auth.AccessToken) return http.DefaultClient.Do(req) @@ -217,12 +219,16 @@ func (sf *Salesforce) Query(query string, sObject any) error { } func (sf *Salesforce) QueryStruct(soqlStruct any, sObject any) error { - authErr := validateAuth(*sf) - if authErr != nil { - return authErr + validationErr := validateSingles(*sf, soqlStruct) + if validationErr != nil { + return validationErr } - queryErr := marshalQueryStruct(*sf.auth, soqlStruct, sObject) + soqlQuery, err := soql.Marshal(soqlStruct) + if err != nil { + return err + } + queryErr := performQuery(*sf.auth, soqlQuery, sObject) if queryErr != nil { return queryErr } @@ -233,7 +239,7 @@ func (sf *Salesforce) QueryStruct(soqlStruct any, sObject any) error { func (sf *Salesforce) InsertOne(sObjectName string, record any) error { validationErr := validateSingles(*sf, record) if validationErr != nil { - return nil + return validationErr } dmlErr := doInsertOne(*sf.auth, sObjectName, record) @@ -247,7 +253,7 @@ func (sf *Salesforce) InsertOne(sObjectName string, record any) error { func (sf *Salesforce) UpdateOne(sObjectName string, record any) error { validationErr := validateSingles(*sf, record) if validationErr != nil { - return nil + return validationErr } dmlErr := doUpdateOne(*sf.auth, sObjectName, record) @@ -261,7 +267,7 @@ func (sf *Salesforce) UpdateOne(sObjectName string, record any) error { func (sf *Salesforce) UpsertOne(sObjectName string, externalIdFieldName string, record any) error { validationErr := validateSingles(*sf, record) if validationErr != nil { - return nil + return validationErr } dmlErr := doUpsertOne(*sf.auth, sObjectName, externalIdFieldName, record) @@ -275,7 +281,7 @@ func (sf *Salesforce) UpsertOne(sObjectName string, externalIdFieldName string, func (sf *Salesforce) DeleteOne(sObjectName string, record any) error { validationErr := validateSingles(*sf, record) if validationErr != nil { - return nil + return validationErr } dmlErr := doDeleteOne(*sf.auth, sObjectName, record) @@ -398,6 +404,37 @@ func (sf *Salesforce) DeleteComposite(sObjectName string, records any, batchSize return nil } +func (sf *Salesforce) QueryBulkExport(filePath string, query string) error { + authErr := validateAuth(*sf) + if authErr != nil { + return authErr + } + queryErr := doQueryBulk(*sf.auth, filePath, query) + if queryErr != nil { + return queryErr + } + + return nil +} + +func (sf *Salesforce) QueryStructBulkExport(filePath string, soqlStruct any) error { + validationErr := validateSingles(*sf, soqlStruct) + if validationErr != nil { + return validationErr + } + + soqlQuery, err := soql.Marshal(soqlStruct) + if err != nil { + return err + } + queryErr := doQueryBulk(*sf.auth, filePath, soqlQuery) + if queryErr != nil { + return queryErr + } + + return nil +} + func (sf *Salesforce) InsertBulk(sObjectName string, records any, batchSize int, waitForResults bool) ([]string, error) { validationErr := validateBulk(*sf, records, batchSize) if validationErr != nil { @@ -516,7 +553,7 @@ func (sf *Salesforce) GetJobResults(bulkJobId string) (BulkJobResults, error) { return BulkJobResults{}, authErr } - job, err := getJobResults(*sf.auth, bulkJobId) + job, err := getJobResults(*sf.auth, ingestJobType, bulkJobId) if err != nil { return BulkJobResults{}, err }