Skip to content

Commit

Permalink
implement redeployment of missing deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
IngoRoessner committed Jan 16, 2025
1 parent cbef36f commit c8c045e
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 14 deletions.
55 changes: 55 additions & 0 deletions pkg/api/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 InfAI (CC SES)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import (
"encoding/json"
"github.com/SENERGY-Platform/process-sync/pkg/configuration"
"github.com/SENERGY-Platform/process-sync/pkg/controller"
"github.com/julienschmidt/httprouter"
"log"
"net/http"
)

func init() {
endpoints = append(endpoints, SyncEndpoints)
}

func SyncEndpoints(config configuration.Config, ctrl *controller.Controller, router *httprouter.Router) {
resource := "/sync"

router.POST(resource+"/deployments/:networkId", func(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
networkId := params.ByName("networkId")
token, err, errCode := ctrl.ApiCheckAccessReturnToken(request, networkId, "a")
if err != nil {
http.Error(writer, err.Error(), errCode)
return
}
err, errCode = ctrl.ApiSyncDeployments(token, networkId)
if err != nil {
http.Error(writer, err.Error(), errCode)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
err = json.NewEncoder(writer).Encode(true)
if err != nil {
log.Println("ERROR: unable to encode response", err)
}
return
})

}
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func New(config configuration.Config, ctx context.Context, db database.Database,
var IsPlaceholderProcessErr = errors.New("is placeholder process")
var IsMarkedForDeleteErr = errors.New("is market for deletion")
var HistoryMayOnlyDeletedIfFinishedOrPlaceholderErr = errors.New("history may only deleted if the process instance is finished or the element is a placeholder")
var IsMarkedAsMissingErr = errors.New("is market as missing (you may try to redeploy)")

func (this *Controller) SetErrCode(err error) int {
switch err {
Expand Down
83 changes: 77 additions & 6 deletions pkg/controller/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package controller

import (
"errors"
"github.com/SENERGY-Platform/process-deployment/lib/auth"
"github.com/SENERGY-Platform/process-deployment/lib/model/deploymentmodel"
"github.com/SENERGY-Platform/process-sync/pkg/configuration"
"github.com/SENERGY-Platform/process-sync/pkg/controller/transformer"
"github.com/SENERGY-Platform/process-sync/pkg/database"
"github.com/SENERGY-Platform/process-sync/pkg/model"
"github.com/SENERGY-Platform/process-sync/pkg/model/camundamodel"
"log"
Expand Down Expand Up @@ -50,25 +52,86 @@ func (this *Controller) UpdateDeployment(networkId string, deployment camundamod
}

func (this *Controller) DeleteDeployment(networkId string, deploymentId string) {
err := this.db.RemoveDeployment(networkId, deploymentId)
deployment, err := this.db.ReadDeployment(networkId, deploymentId)
if errors.Is(err, database.ErrNotFound) {
this.deleteDeployment(networkId, deploymentId)
return
}
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
_, err = this.db.ReadDeploymentMetadata(networkId, deploymentId)
if errors.Is(err, database.ErrNotFound) {
this.deleteDeployment(networkId, deploymentId)
return
}
err = this.db.RemoveDeploymentMetadata(networkId, deploymentId)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
if deployment.SyncInfo.MarkedForDelete {
this.deleteDeployment(networkId, deploymentId)
} else {
deployment.SyncInfo.MarkedAsMissing = true
err = this.db.SaveDeployment(deployment)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
}
}

func (this *Controller) deleteDeployment(networkId string, deploymentId string) {
err := this.db.RemoveDeploymentMetadata(networkId, deploymentId)
if err != nil && !errors.Is(err, database.ErrNotFound) {
log.Println("ERROR:", err)
debug.PrintStack()
}
err = this.db.RemoveDeployment(networkId, deploymentId)
if err != nil && !errors.Is(err, database.ErrNotFound) {
log.Println("ERROR:", err)
debug.PrintStack()
}
}

func (this *Controller) DeleteUnknownDeployments(networkId string, knownIds []string) {
err := this.db.RemoveUnknownDeployments(networkId, knownIds)
deployments, err := this.db.ListUnknownDeployments(networkId, knownIds)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
handled := []string{}
handled = append(handled, knownIds...)
for _, deployment := range deployments {
handled = append(handled, deployment.Id)
if deployment.SyncInfo.MarkedForDelete {
this.deleteDeployment(networkId, deployment.Id)
continue
}
if deployment.SyncInfo.IsPlaceholder {
this.deleteDeployment(networkId, deployment.Id)
continue
}
if !deployment.SyncInfo.MarkedAsMissing {
deployment.SyncInfo.MarkedAsMissing = true
err = this.db.SaveDeployment(deployment)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
}
}
}
err = this.db.RemoveUnknownDeployments(networkId, handled)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
}
err = this.db.RemoveUnknownDeploymentMetadata(networkId, knownIds)
err = this.db.RemoveUnknownDeploymentMetadata(networkId, handled)
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
Expand Down Expand Up @@ -96,7 +159,7 @@ func (this *Controller) ApiDeleteDeployment(networkId string, deploymentId strin
if err != nil {
return
}
if current.IsPlaceholder {
if current.IsPlaceholder || current.MarkedAsMissing {
err = this.db.RemoveDeployment(networkId, deploymentId)
} else {
err = this.mgw.SendDeploymentDeleteCommand(networkId, deploymentId)
Expand Down Expand Up @@ -182,6 +245,10 @@ func (this *Controller) ApiStartDeployment(networkId string, deploymentId string
err = IsMarkedForDeleteErr
return
}
if current.MarkedAsMissing {
err = IsMarkedAsMissingErr
return
}

definition, err := this.db.GetDefinitionByDeploymentId(networkId, deploymentId)
if err != nil {
Expand Down Expand Up @@ -267,7 +334,11 @@ func (this *Controller) ExtendDeployments(deployments []model.Deployment) (resul
for _, deployment := range deployments {
if deployment.IsPlaceholder {
result = append(result, model.ExtendedDeployment{Deployment: deployment, Diagram: constructionSvg})
break
continue
}
if deployment.MarkedAsMissing {
result = append(result, model.ExtendedDeployment{Deployment: deployment, Diagram: constructionSvg})
continue
}
element := model.ExtendedDeployment{Deployment: deployment}
err := errors[deployment.NetworkId]
Expand Down
96 changes: 96 additions & 0 deletions pkg/controller/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2025 InfAI (CC SES)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller

import (
"errors"
"github.com/SENERGY-Platform/process-sync/pkg/configuration"
"github.com/SENERGY-Platform/process-sync/pkg/database"
"github.com/SENERGY-Platform/process-sync/pkg/model"
"github.com/SENERGY-Platform/process-sync/pkg/model/camundamodel"
"net/http"
)

func (this *Controller) ApiSyncDeployments(token string, networkId string) (error, int) {
err := this.db.RemovePlaceholderDeployments(networkId)
if err != nil {
return err, http.StatusInternalServerError
}

var limit int64 = 1000
var offset int64 = 0
errorList := []error{}
for {
deployments, err := this.db.ListDeployments([]string{networkId}, limit, offset, "id.asc")
if err != nil {
return err, http.StatusInternalServerError
}
for _, deployment := range deployments {
if deployment.SyncInfo.MarkedForDelete {
err = this.mgw.SendDeploymentDeleteCommand(networkId, deployment.Id)
if err != nil {
errorList = append(errorList, err)
continue
}
}
if deployment.SyncInfo.MarkedAsMissing {
metadata, err := this.db.ReadDeploymentMetadata(networkId, deployment.Id)
if err != nil && !errors.Is(err, database.ErrNotFound) {
errorList = append(errorList, err)
continue
}
if err != nil {
continue
}
now := configuration.TimeNow()
err = this.db.SaveDeployment(model.Deployment{
Deployment: camundamodel.Deployment{
Id: "placeholder-" + configuration.Id(),
Name: deployment.Name,
Source: "senergy",
DeploymentTime: now,
TenantId: "senergy",
},
SyncInfo: model.SyncInfo{
NetworkId: networkId,
IsPlaceholder: true,
MarkedForDelete: false,
SyncDate: now,
},
})
if err != nil {
errorList = append(errorList, err)
continue
}
err = this.mgw.SendDeploymentCommand(networkId, metadata.DeploymentModel)
if err != nil {
errorList = append(errorList, err)
continue
}
this.deleteDeployment(networkId, deployment.Id)
}
}
if int64(len(deployments)) <= limit {
err = errors.Join(errorList...)
if err != nil {
return err, http.StatusInternalServerError
}
return nil, http.StatusOK
}
offset += limit
}
}
1 change: 1 addition & 0 deletions pkg/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Database interface {
RemoveDeployment(networkId string, deploymentId string) error
RemovePlaceholderDeployments(networkId string) error
RemoveUnknownDeployments(networkId string, knownIds []string) error
ListUnknownDeployments(networkId string, knownIds []string) (result []model.Deployment, err error)
ReadDeployment(networkId string, deploymentId string) (deployment model.Deployment, err error)
ListDeployments(networkIds []string, limit int64, offset int64, sort string) (deployment []model.Deployment, err error)
SearchDeployments(networkIds []string, search string, limit int64, offset int64, sort string) ([]model.Deployment, error)
Expand Down
23 changes: 21 additions & 2 deletions pkg/database/mongo/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package mongo

import (
"errors"
"github.com/SENERGY-Platform/process-sync/pkg/configuration"
"github.com/SENERGY-Platform/process-sync/pkg/database"
"github.com/SENERGY-Platform/process-sync/pkg/model"
Expand Down Expand Up @@ -137,6 +138,24 @@ func (this *Mongo) RemoveUnknownDeployments(networkId string, knownIds []string)
return err
}

func (this *Mongo) ListUnknownDeployments(networkId string, knownIds []string) (result []model.Deployment, err error) {
ctx, _ := this.getTimeoutContext()
iter, err := this.deploymentCollection().Find(
ctx,
bson.M{
deploymentIdKey: bson.M{"$nin": knownIds},
deploymentNetworkIdKey: networkId,
})
if err != nil {
return result, err
}
err = iter.All(ctx, &result)
if err != nil {
return result, err
}
return result, nil
}

func (this *Mongo) ReadDeployment(networkId string, deploymentId string) (deployment model.Deployment, err error) {
ctx, _ := this.getTimeoutContext()
result := this.deploymentCollection().FindOne(
Expand All @@ -146,14 +165,14 @@ func (this *Mongo) ReadDeployment(networkId string, deploymentId string) (deploy
deploymentNetworkIdKey: networkId,
})
err = result.Err()
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return deployment, database.ErrNotFound
}
if err != nil {
return
}
err = result.Decode(&deployment)
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return deployment, database.ErrNotFound
}
return deployment, err
Expand Down
5 changes: 3 additions & 2 deletions pkg/database/mongo/deploymentmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package mongo

import (
"errors"
"github.com/SENERGY-Platform/process-sync/pkg/configuration"
"github.com/SENERGY-Platform/process-sync/pkg/database"
"github.com/SENERGY-Platform/process-sync/pkg/model"
Expand Down Expand Up @@ -125,14 +126,14 @@ func (this *Mongo) ReadDeploymentMetadata(networkId string, deploymentId string)
metadataNetworkIdKey: networkId,
})
err = result.Err()
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return metadata, database.ErrNotFound
}
if err != nil {
return
}
err = result.Decode(&metadata)
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return metadata, database.ErrNotFound
}
return metadata, err
Expand Down
1 change: 1 addition & 0 deletions pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SyncInfo struct {
NetworkId string `json:"network_id"`
IsPlaceholder bool `json:"is_placeholder"`
MarkedForDelete bool `json:"marked_for_delete"`
MarkedAsMissing bool `json:"marked_as_missing"`
SyncDate time.Time `json:"sync_date"`
}

Expand Down
Loading

0 comments on commit c8c045e

Please sign in to comment.