Skip to content

Commit

Permalink
Massive pipeline overhaul to allow for more control and visibility in…
Browse files Browse the repository at this point in the history
…to what is happenign in Orborus
  • Loading branch information
frikky committed Oct 31, 2024
1 parent cc92660 commit 8ba7936
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 149 deletions.
94 changes: 34 additions & 60 deletions detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,6 @@ import (
"gopkg.in/yaml.v2"
)

func HandleDetectionHealthUpdate(resp http.ResponseWriter, request *http.Request) {
if request.Method != "POST" {
request.Method = "POST"
}

type HealthUpdate struct {
OrgId string `json:"org_id"`
Status string `json:"status"`
Environment string `json:"environment"`
Authorization string `json:"authorization"`
}

var healthUpdate HealthUpdate
err := json.NewDecoder(request.Body).Decode(&healthUpdate)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to decode JSON: %v", err)
return
}

//log.Printf("[DEBUG] Tenzir health update: %#v", healthUpdate)

//ctx := context.Background()
/*
status := healthUpdate.Status
result, err := GetDisabledRules(ctx, user.ActiveOrg.Id)
if (err != nil && err.Error() == "rules doesn't exist") || err == nil {
result.IsTenzirActive = status
result.LastActive = time.Now().Unix()
err = StoreDisabledRules(ctx, *result)
if err != nil {
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
return
}
*/

resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "Not implemented"}`))
return
}

func HandleGetDetectionRules(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
Expand Down Expand Up @@ -647,6 +599,13 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request)
if detectionType == "siem" {

ctx := GetContext(request)
workflow, err = ConfigureDetectionWorkflow(ctx, user.ActiveOrg.Id, "TENZIR-SIGMA")
if err != nil {
log.Printf("\n\n\n[ERROR] Failed to create Sigma handling workflow: %s\n\n\n", err)
}

log.Printf("[DEBUG] Sending orborus request to start Sigma handling workflow")

execType := "START_TENZIR"
err = SetDetectionOrborusRequest(ctx, user.ActiveOrg.Id, execType, "", "SIGMA", "SHUFFLE_DISCOVER")
if err != nil {
Expand All @@ -667,6 +626,8 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request)
resp.Write([]byte(`{"success": false}`))
return
}


} else if detectionType == "email" {

// FIXME:
Expand Down Expand Up @@ -712,37 +673,49 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName,
return err
}

lakeNodes := 0
selectedEnvironments := []Environment{}
for _, env := range environments {
if env.Archived {
continue
}

if env.Name == "cloud" || env.Name == "Cloud" {
if env.Type == "cloud" {
continue
}

if env.Name != environmentName && environmentName != "SHUFFLE_DISCOVER" {
continue
}

cacheKey := fmt.Sprintf("queueconfig-%s-%s", env.Name, env.OrgId)
cache, err := GetCache(ctx, cacheKey)
if err == nil {
newEnv := OrborusStats{}
err = json.Unmarshal(cache.([]uint8), &newEnv)
if err == nil {
// No point in adding a job if the lake is already running
if env.DataLake.Enabled && execType == "START_TENZIR" {
lakeNodes += 1
continue
}
}
}

selectedEnvironments = append(selectedEnvironments, env)
}

log.Printf("[DEBUG] Found %d potentially valid environment(s)", len(selectedEnvironments))

/*
if len(selectedEnvironments) == 0 || environmentName == "SHUFFLE_DISCOVER" {
// FIXME: Get based on the Organisation. This is only tested onprem so far, so there's a lot to do to make this stable ROFL
log.Printf("[DEBUG] Automatically discovering the right environment from '%s'", environmentName)
}
*/

if len(selectedEnvironments) == 0 {
log.Printf("[ERROR] No valid environments found")
return fmt.Errorf("No valid environments found")
if lakeNodes > 0 {
//log.Printf("[ERROR] No environments needing a lake. Found lake nodes: %d", lakeNodes)
return nil
} else {
return fmt.Errorf("No valid environments found")
}
}

log.Printf("[DEBUG] Found %d potentially valid environment(s)", len(selectedEnvironments))

deployedToActiveEnv := false
for _, env := range selectedEnvironments {
execRequest := ExecutionRequest{
Expand Down Expand Up @@ -861,6 +834,7 @@ func ConfigureDetectionWorkflow(ctx context.Context, orgId, workflowType string)
cloudWorkflowId := ""
usecaseNames := []string{}
if workflowType == "TENZIR-SIGMA" {
log.Printf("[INFO] Creating SIEM handling workflow for org %s", orgId)
} else if workflowType == "EMAIL-DETECTION" {
// How do we check what email tool they use?
//log.Printf("[INFO] Creating email handling workflow for org %s", orgId)
Expand Down
18 changes: 10 additions & 8 deletions pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"log"
"net/http"
"strings"

uuid "github.com/satori/go.uuid"
"github.com/google/uuid"
)

// Pipeline is a sequence of stages that are executed in order.
Expand Down Expand Up @@ -154,19 +154,20 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}
}

log.Printf("[INFO] Pipeline type: %s", formattedType)
//log.Printf("[INFO] Pipeline type: %s", formattedType)

// 2. Send to environment queue
execRequest := ExecutionRequest{
Type: formattedType,
ExecutionId: uuid.NewV4().String(),
ExecutionSource: pipeline.TriggerId,
ExecutionId: uuid.New().String(),
ExecutionSource: pipeline.Name,
ExecutionArgument: pipeline.Command,
Priority: 11,
}

pipelineData := Pipeline{}
//log.Printf("EXECREQUEST: Type: %s, Source: %s, Argument: %s", execRequest.Type, execRequest.ExecutionSource, execRequest.ExecutionArgument)

pipelineData := Pipeline{}
if startCommand == "DELETE" {

err := deletePipeline(ctx, *pipelineInfo)
Expand All @@ -186,7 +187,8 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Stopped the pipeline %s sucessfully", pipelineInfo.TriggerId)

log.Printf("[INFO] Stopped the pipeline successfully", pipelineInfo.ID)
} else {

pipelineData.Name = pipeline.Name
Expand Down Expand Up @@ -220,7 +222,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}

resp.WriteHeader(200)
resp.Write([]byte(`{"success": true, "reason": "Pipeline will be created"}`))
resp.Write([]byte(`{"success": true, "reason": "Pipeline handled successfully."}`))
}

func deletePipeline(ctx context.Context, pipeline Pipeline) error {
Expand Down
Loading

0 comments on commit 8ba7936

Please sign in to comment.