Skip to content

Commit

Permalink
Merge pull request #64 from atlanhq/DVX-774
Browse files Browse the repository at this point in the history
FT-774: feat : Added support for end-to-end bulk update using batch processing
  • Loading branch information
0xquark authored Nov 26, 2024
2 parents bffe145 + 16edd83 commit a9d1440
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 26 deletions.
23 changes: 23 additions & 0 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,26 @@ func generateCacheKey(baseURL, apiKey string) string {
_, _ = h.Write([]byte(fmt.Sprintf("%s/%s", baseURL, apiKey)))
return fmt.Sprintf("%d", h.Sum32())
}

// Used in End-to-end bulk update

// TrimToRequired trims a SearchAsset to its required attributes and returns an SearchAsset Object.
func TrimToRequired(asset model.SearchAssets) (*model.SearchAssets, error) {
// Validate required fields
if asset.TypeName == nil || asset.QualifiedName == nil {
return nil, fmt.Errorf("asset must have TypeName and QualifiedName")
}

instance := &model.SearchAssets{}
instance.TypeName = asset.TypeName
instance.QualifiedName = asset.QualifiedName
instance.Name = asset.Name
instance.Guid = asset.Guid
// Call the generic Updater method directly on the asset
err := instance.Updater()
if err != nil {
return nil, fmt.Errorf("failed to trim to required fields for asset type: %s, error: %w", *asset.TypeName, err)
}

return instance, nil
}
163 changes: 163 additions & 0 deletions atlan/assets/batch_processing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package assets

import (
"errors"
"github.com/atlanhq/atlan-go/atlan"
"github.com/atlanhq/atlan-go/atlan/model"
)

// FailedBatch is an internal struct to capture batch failures.
type FailedBatch struct {
FailedAssets []AtlanObject // The list of assets that failed during processing.
FailureReason error // The reason for the batch failure.
}

// NewFailedBatch creates a new FailedBatch instance.
func NewFailedBatch(failedAssets []AtlanObject, failureReason error) (*FailedBatch, error) {
if failureReason == nil {
return nil, errors.New("failure reason cannot be nil")
}

return &FailedBatch{
FailedAssets: failedAssets,
FailureReason: failureReason,
}, nil
}

// GetFailedAssets returns the list of failed assets.
func (fb *FailedBatch) GetFailedAssets() []AtlanObject {
return fb.FailedAssets
}

// GetFailureReason returns the reason for the batch failure.
func (fb *FailedBatch) GetFailureReason() error {
return fb.FailureReason
}

// Batch is a utility class for managing bulk updates in batches.
type Batch struct {
client *AtlanClient
maxSize int
replaceAtlanTags bool
customMetadataHandling atlan.CustomMetadataHandling
captureFailures bool
batch []AtlanObject
failures []FailedBatch
created []*model.MutatedAssets
updated []*model.MutatedAssets
}

// NewBatch creates a new Batch for managing bulk updates.
func NewBatch(client *AtlanClient, maxSize int, replaceAtlanTags bool, customMetadataHandling atlan.CustomMetadataHandling, captureFailures bool) *Batch {
return &Batch{
client: client,
maxSize: maxSize,
replaceAtlanTags: replaceAtlanTags,
customMetadataHandling: customMetadataHandling,
captureFailures: captureFailures,
batch: []AtlanObject{},
failures: []FailedBatch{},
created: []*model.MutatedAssets{},
updated: []*model.MutatedAssets{},
}
}

// Failures returns a list of FailedBatch objects containing information about any failed batches.
func (b *Batch) Failures() []FailedBatch {
return b.failures
}

// Created returns a list of Assets that were created.
func (b *Batch) Created() []*model.MutatedAssets {
return b.created
}

// Updated returns a list of Assets that were updated.
func (b *Batch) Updated() []*model.MutatedAssets {
return b.updated
}

/* In Case, We want a list of assets dereferenced with attributes
// Created returns a list of Assets that were created (dereferenced).
func (b *Batch) Created() []model.MutatedAssets {
// Dereference the pointers to get the actual values.
dereferencedCreated := make([]model.MutatedAssets, len(b.created))
for i, asset := range b.created {
dereferencedCreated[i] = *asset // Dereference the pointer to get the actual value
}
return dereferencedCreated
}
// Updated returns a list of Assets that were updated (dereferenced).
func (b *Batch) Updated() []model.MutatedAssets {
// Dereference the pointers to get the actual values.
dereferencedUpdated := make([]model.MutatedAssets, len(b.updated))
for i, asset := range b.updated {
dereferencedUpdated[i] = *asset // Dereference the pointer to get the actual value
}
return dereferencedUpdated
}
*/

// Add adds an asset to the batch and processes it if the batch size is reached.
func (b *Batch) Add(asset AtlanObject) error {
b.batch = append(b.batch, asset)
if len(b.batch) >= b.maxSize {
_, err := b.Flush()
return err
}
return nil
}

// process checks if the batch size is reached and flushes the batch if needed.
func (b *Batch) process() (*model.AssetMutationResponse, error) {
if len(b.batch) == b.maxSize {
return b.Flush()
}
return nil, nil
}

// Flush sends the current batch to the Save function and clears the batch.
func (b *Batch) Flush() (*model.AssetMutationResponse, error) {
if len(b.batch) == 0 {
return nil, nil // No assets to process
}

response, err := Save(b.batch...)
if err != nil {
if b.captureFailures {
b.failures = append(b.failures, FailedBatch{
FailedAssets: b.batch,
FailureReason: err,
})
} else {
return nil, err
}
}

if response != nil {
b.trackResponse(response)
}

b.batch = []AtlanObject{} // Clear the batch after processing
return response, nil
}

// trackResponse processes the response and updates the created and updated assets.
func (b *Batch) trackResponse(response *model.AssetMutationResponse) {
if response.MutatedEntities != nil {
for _, asset := range response.MutatedEntities.CREATE {
b.track(&b.created, asset)
}
for _, asset := range response.MutatedEntities.UPDATE {
b.track(&b.updated, asset)
}
}
}

// track adds an asset to the tracker.
func (b *Batch) track(tracker *[]*model.MutatedAssets, asset *model.MutatedAssets) {
*tracker = append(*tracker, asset)
}
3 changes: 3 additions & 0 deletions atlan/assets/glossary_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (g *AtlasGlossary) MarshalJSON() ([]byte, error) {
if g.AnnouncementMessage != nil && *g.AnnouncementMessage != "" {
customJSON["attributes"].(map[string]interface{})["announcementMessage"] = *g.AnnouncementMessage
}
if g.CertificateStatus != nil {
customJSON["attributes"].(map[string]interface{})["certificateStatus"] = *g.CertificateStatus
}

// Marshal the custom JSON
return json.MarshalIndent(customJSON, "", " ")
Expand Down
6 changes: 5 additions & 1 deletion atlan/assets/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (t *Table) UnmarshalJSON(data []byte) error {
return nil
}

// Updater is used to modify a glossary asset in memory.
// Updater is used to modify a Table asset in memory.
func (t *Table) Updater(name string, qualifiedName string) error {
if name == "" || qualifiedName == "" {
return errors.New("name, qualified_name are required fields")
Expand Down Expand Up @@ -189,6 +189,10 @@ func (t *Table) MarshalJSON() ([]byte, error) {
customJSON["attributes"].(map[string]interface{})["connectionQualifiedName"] = *t.ConnectionQualifiedName
}

if t.CertificateStatus != nil {
customJSON["attributes"].(map[string]interface{})["certificateStatus"] = *t.CertificateStatus
}

// Marshal the custom JSON
return json.MarshalIndent(customJSON, "", " ")
}
Expand Down
82 changes: 82 additions & 0 deletions atlan/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,3 +2821,85 @@ func (u *UTMTags) UnmarshalJSON(data []byte) error {
}
return nil
}

type CustomMetadataHandling struct {
Name string
}

func (c CustomMetadataHandling) String() string {
return c.Name
}

var (
IGNORE = CustomMetadataHandling{"ignore"}
OVERWRITE = CustomMetadataHandling{Name: "overwrite"}
MERGE = CustomMetadataHandling{Name: "merge"}
)

func (c CustomMetadataHandling) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Name)
}

func (c *CustomMetadataHandling) UnmarshalJSON(data []byte) error {
var CustomMetadataType string
if err := json.Unmarshal(data, &CustomMetadataType); err != nil {
return err
}

switch CustomMetadataType {
case "ignore":
*c = IGNORE
case "overwrite":
*c = OVERWRITE
case "merge":
*c = MERGE

default:
*c = CustomMetadataHandling{Name: CustomMetadataType}
}

return nil
}

type CertificateStatus struct {
Name string
}

func (a CertificateStatus) String() string {
return a.Name
}

var (
CertificateStatusDeprecated = CertificateStatus{"DEPRECATED"}
CertificateStatusDraft = CertificateStatus{"DRAFT"}
CertificateStatusVerified = CertificateStatus{"VERIFIED"}
)

// UnmarshalJSON customizes the unmarshalling of a certificate_status from JSON.
func (c *CertificateStatus) UnmarshalJSON(data []byte) error {
var name string
if err := json.Unmarshal(data, &name); err != nil {
return err
}

switch name {

case "DEPRECATED":
*c = CertificateStatusDeprecated

case "DRAFT":
*c = CertificateStatusDraft

case "VERIFIED":
*c = CertificateStatusVerified
default:
*c = CertificateStatus{Name: name}
}

return nil
}

// MarshalJSON customizes the marshalling of a certificate_status to JSON.
func (c CertificateStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Name)
}
3 changes: 2 additions & 1 deletion atlan/model/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"reflect"
)

// Add Mutated assets for Response in Creation, Updation and Deletion
// Unmarshal on assets changed the unmarshalling for the whole sdk asset structure

// Add Mutated assets for Response in Creation, Updation and Deletion
type MutatedAssets struct {
TypeName string `json:"typeName"`
Attributes structs.Asset `json:"attributes"`
Expand Down
Loading

0 comments on commit a9d1440

Please sign in to comment.