Skip to content

Commit

Permalink
Merge pull request #439 from DrDroidLab/integration
Browse files Browse the repository at this point in the history
Context Propagation V2
  • Loading branch information
dimittal authored Aug 12, 2024
2 parents 0c5948b + 580895d commit 034daf0
Show file tree
Hide file tree
Showing 42 changed files with 547 additions and 187 deletions.
25 changes: 13 additions & 12 deletions executor/playbook_source_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@
from utils.proto_utils import proto_to_dict, dict_to_proto


def apply_result_transformer(result_dict, lambda_function: Lambda.Function) -> Dict:
lambda_function_processor = LambdaFunctionProcessor(lambda_function.definition.value,
lambda_function.requirements)
transformer_result = lambda_function_processor.execute(result_dict)
if not isinstance(transformer_result, Dict):
raise ValueError("Result transformer should return a dictionary")
transformer_result = {f"${k}" if not k.startswith("$") else k: v for k, v in transformer_result.items()}
return transformer_result


def resolve_global_variables(form_fields: [FormField], global_variable_set: Struct,
source_type_task_def: Dict) -> (Dict, Dict):
all_string_fields = [ff.key_name.value for ff in form_fields if ff.data_type == LiteralType.STRING]
Expand Down Expand Up @@ -97,16 +107,6 @@ def get_active_connectors(self, account_id, connector_id: int = None) -> [Connec
connector_protos.append(dbc.unmasked_proto)
return connector_protos

def apply_result_transformer(self, result: PlaybookTaskResult, lambda_function: Lambda.Function) -> Dict:
lambda_function_processor = LambdaFunctionProcessor(lambda_function.definition.value,
lambda_function.requirements)
result_dict = proto_to_dict(result)
transformer_result = lambda_function_processor.execute(result_dict)
if not isinstance(transformer_result, Dict):
raise ValueError("Result transformer should return a dictionary")
transformer_result = {f"${k}" if not k.startswith("$") else k: v for k, v in transformer_result.items()}
return transformer_result

def execute_task(self, account_id, time_range: TimeRange, global_variable_set: Struct,
task: PlaybookTask) -> PlaybookTaskResult:
try:
Expand Down Expand Up @@ -166,8 +166,9 @@ def execute_task(self, account_id, time_range: TimeRange, global_variable_set: S
# Apply result transformer
if task.execution_configuration.is_result_transformer_enabled.value:
lambda_function = task.execution_configuration.result_transformer_lambda_function
result_transformer_lambda_function_variable_set = self.apply_result_transformer(
playbook_task_result, lambda_function)
playbook_task_result_dict = proto_to_dict(playbook_task_result) if playbook_task_result else {}
result_transformer_lambda_function_variable_set = apply_result_transformer(playbook_task_result_dict,
lambda_function)
result_transformer_lambda_function_variable_set_proto = dict_to_proto(
result_transformer_lambda_function_variable_set,
Struct) if result_transformer_lambda_function_variable_set else Struct()
Expand Down
27 changes: 13 additions & 14 deletions executor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ def execute_playbook_step_impl(tr: TimeRange, account: Account, step: PlaybookSt
interpreter_type: InterpreterType = step.interpreter_type if step.interpreter_type else InterpreterType.BASIC_I
pte_logs = []
task_interpretations = []
execution_global_variable_set = Struct()
if global_variable_set and global_variable_set.items():
execution_global_variable_set.CopyFrom(global_variable_set)
for task_proto in tasks:
if not global_variable_set or not global_variable_set.items():
if not execution_global_variable_set or not execution_global_variable_set.items():
if task_proto.global_variable_set and task_proto.global_variable_set.items():
global_variable_set = task_proto.global_variable_set
else:
global_variable_set = Struct()

execution_global_variable_set = Struct()
execution_global_variable_set.CopyFrom(global_variable_set)
execution_global_variable_set = task_proto.global_variable_set

is_bulk_execution = False
bulk_execution_var_values = ['Single Execution']
Expand All @@ -94,15 +92,15 @@ def execute_playbook_step_impl(tr: TimeRange, account: Account, step: PlaybookSt
task_result = PlaybookTaskResult(error=StringValue(value="Bulk execution variable not found"))
pte_logs.append(PlaybookTaskExecutionLog(task=task_proto,
result=task_result,
execution_global_variable_set=global_variable_set))
execution_global_variable_set=execution_global_variable_set))
continue

if bulk_task_var not in global_variable_set:
if bulk_task_var not in execution_global_variable_set:
task_result = PlaybookTaskResult(
error=StringValue(value="Bulk execution variable not found in global variables"))
pte_logs.append(PlaybookTaskExecutionLog(task=task_proto,
result=task_result,
execution_global_variable_set=global_variable_set))
execution_global_variable_set=execution_global_variable_set))
continue

bulk_execution_var_values = execution_global_variable_set[bulk_task_var].split(',')
Expand All @@ -124,17 +122,18 @@ def execute_playbook_step_impl(tr: TimeRange, account: Account, step: PlaybookSt
task_interpretations.append(task_interpretation)

if task_result.result_transformer_lambda_function_variable_set and task_result.result_transformer_lambda_function_variable_set.items():
global_variable_set.update(task_result.result_transformer_lambda_function_variable_set)
execution_global_variable_set.update(
task_result.result_transformer_lambda_function_variable_set)

playbook_task_execution_log = PlaybookTaskExecutionLog(task=task_proto, result=task_result,
interpretation=task_interpretation,
execution_global_variable_set=global_variable_set)
execution_global_variable_set=execution_global_variable_set)
except Exception as exc:
logger.error(f"Error occurred while running task: {exc}")
playbook_task_execution_log = PlaybookTaskExecutionLog(task=task_proto,
result=PlaybookTaskResult(
error=StringValue(value=str(exc))),
execution_global_variable_set=global_variable_set)
execution_global_variable_set=execution_global_variable_set)
pte_logs.append(playbook_task_execution_log)
step_interpretation: InterpretationProto = step_result_interpret(interpreter_type, step, task_interpretations)

Expand Down Expand Up @@ -175,7 +174,7 @@ def execute_playbook_step_impl(tr: TimeRange, account: Account, step: PlaybookSt
step_interpretation=step_interpretation,
relation_execution_logs=relation_execution_logs)

return step_execution_log, global_variable_set
return step_execution_log, execution_global_variable_set
except Exception as exc:
logger.error(f"Error occurred while running playbook: {exc}")
raise exc
Expand Down
1 change: 1 addition & 0 deletions executor/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
path('task/run/v2', executor_views.task_run_v2), # Deprecated
path('task/run/v3', executor_views.task_run_v3),
path('bulk_task/run', executor_views.bulk_task_run),
path('task/test_result_transformer', executor_views.task_test_result_transformer),

path('step/run', executor_views.step_run), # Deprecated
path('step/run/v2', executor_views.step_run_v2), # Deprecated
Expand Down
25 changes: 24 additions & 1 deletion executor/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from executor.crud.playbooks_crud import update_or_create_db_playbook, get_db_playbooks
from executor.deprecated_task_executor import deprecated_execute_task
from executor.playbook_source_facade import playbook_source_facade
from executor.playbook_source_manager import apply_result_transformer
from executor.tasks import execute_playbook, execute_playbook_impl, store_step_execution_logs, \
execute_playbook_step_impl
from executor.utils.old_to_new_model_transformers import transform_PlaybookTaskExecutionResult_to_PlaybookTaskResult
from executor.workflows.tasks import test_workflow_transformer
from intelligence_layer.result_interpreters.result_interpreter_facade import task_result_interpret, \
step_result_interpret
from management.crud.task_crud import get_or_create_task, check_scheduled_or_running_task_run_for_task
Expand All @@ -38,6 +40,7 @@
from protos.playbooks.playbook_commons_pb2 import PlaybookTaskResult, PlaybookExecutionStatusType
from protos.playbooks.playbook_pb2 import PlaybookTask, PlaybookTaskExecutionLog, \
PlaybookExecution, Playbook
from protos.playbooks.source_task_definitions.lambda_function_task_pb2 import Lambda
from utils.time_utils import current_epoch_timestamp, current_datetime
from protos.base_pb2 import Meta, TimeRange, Message, Page
from protos.playbooks.api_pb2 import RunPlaybookTaskRequest, RunPlaybookTaskResponse, RunPlaybookStepRequest, \
Expand All @@ -52,7 +55,8 @@
CreatePlaybookResponseV2, UpdatePlaybookRequestV2, UpdatePlaybookResponseV2, ExecutionPlaybookGetRequestV2, \
ExecutionPlaybookGetResponseV2, ExecutionPlaybookAPIGetResponseV2, PlaybookExecutionCreateRequest, \
PlaybookExecutionCreateResponse, PlaybookExecutionStepExecuteResponse, PlaybookExecutionStepExecuteRequest, \
PlaybookExecutionStatusUpdateRequest, PlaybookExecutionStatusUpdateResponse, RunBulkPlaybookTaskResponse
PlaybookExecutionStatusUpdateRequest, PlaybookExecutionStatusUpdateResponse, RunBulkPlaybookTaskResponse, \
TestResultTransformerRequest, TestResultTransformerResponse

from protos.playbooks.deprecated_playbook_pb2 import DeprecatedPlaybookTaskExecutionResult, DeprecatedPlaybook, \
DeprecatedPlaybookExecutionLog, DeprecatedPlaybookStepExecutionLog, DeprecatedPlaybookExecution
Expand Down Expand Up @@ -933,3 +937,22 @@ def playbooks_execution_status_update(request_message: PlaybookExecutionStatusUp
message=Message(title="Internal Error",
description="Failed to stop playbook execution status"))
return PlaybookExecutionStatusUpdateResponse(meta=get_meta(tr=time_range), success=BoolValue(value=True))


@web_api(TestResultTransformerRequest)
def task_test_result_transformer(request_message: TestResultTransformerRequest) -> \
Union[TestResultTransformerResponse, HttpResponse]:
lambda_function: Lambda.Function = request_message.transformer_lambda_function
payload = request_message.payload
try:
payload_dict = proto_to_dict(payload) if payload and payload.items() else {}
output = apply_result_transformer(payload_dict, lambda_function)
if not isinstance(output, dict):
return TestResultTransformerResponse(success=BoolValue(value=False),
message=Message(title="Error",
description="Transformer function did not return a valid dictioanry."))
output_struct = dict_to_proto(output, Struct)
return TestResultTransformerResponse(success=BoolValue(value=True), output=output_struct)
except Exception as e:
return TestResultTransformerResponse(success=BoolValue(value=False),
message=Message(title="Error", description=str(e)))
12 changes: 10 additions & 2 deletions executor/workflows/crud/workflow_execution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def get_next_workflow_execution(schedule: WorkflowSchedule, scheduled_at, last_s
if not keep_alive and expiry_at and latest_scheduled_at > expiry_at:
raise WorkflowExpiredException(f"Next Scheduled At is greater than Workflow Expiry time for workflow")
elif schedule.type == WorkflowSchedule.Type.ONE_OFF:
if last_scheduled_at:
raise WorkflowExpiredException(f"One Off Schedule in workflow already executed")
latest_scheduled_at = scheduled_at + timedelta(seconds=int(settings.WORKFLOW_SCHEDULER_INTERVAL))
expiry_at = latest_scheduled_at
else:
Expand Down Expand Up @@ -129,13 +131,19 @@ def create_workflow_execution_util(account: Account, workflow: Workflow, schedul
latest_scheduled_at, expiry_at, keep_alive = get_next_workflow_execution(schedule, scheduled_at)
except Exception as e:
return False, f"Error in calculating next workflow execution: {e} for workflow: {workflow.id}"
time_range = TimeRange(time_geq=int(latest_scheduled_at.timestamp()) - 3600,

offset = 3600
workflow_configuration: WorkflowConfiguration = workflow.configuration if workflow.configuration else WorkflowConfiguration()
if workflow_configuration and workflow_configuration.evaluation_window_in_seconds.value:
offset = workflow_configuration.evaluation_window_in_seconds.value

time_range = TimeRange(time_geq=int(latest_scheduled_at.timestamp()) - offset,
time_lt=int(latest_scheduled_at.timestamp()))
if schedule.type != WorkflowSchedule.Type.ONE_OFF and not expiry_at and not keep_alive:
return False, f"Expiry time is required for non-keep alive workflows"

workflow_execution = create_workflow_execution(account, time_range, workflow.id.value, workflow_run_uuid,
scheduled_at, latest_scheduled_at, expiry_at, keep_alive,
triggered_by, execution_metadata,
proto_to_dict(workflow.configuration))
proto_to_dict(workflow_configuration))
return workflow_execution, ''
11 changes: 11 additions & 0 deletions protos/playbooks/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,17 @@ message PlaybookExecutionStatusUpdateResponse {
Message message = 3;
}

message TestResultTransformerRequest {
Lambda.Function transformer_lambda_function = 1;
google.protobuf.Struct payload = 2;
}

message TestResultTransformerResponse {
google.protobuf.BoolValue success = 1;
Message message = 2;
google.protobuf.Struct output = 3;
}

///////////////////// Playbook Builder APIs /////////////////////

message PlaybooksBuilderOptionsRequest {
Expand Down
130 changes: 67 additions & 63 deletions protos/playbooks/api_pb2.py

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions protos/playbooks/api_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,52 @@ class PlaybookExecutionStatusUpdateResponse(google.protobuf.message.Message):

global___PlaybookExecutionStatusUpdateResponse = PlaybookExecutionStatusUpdateResponse

@typing_extensions.final
class TestResultTransformerRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

TRANSFORMER_LAMBDA_FUNCTION_FIELD_NUMBER: builtins.int
PAYLOAD_FIELD_NUMBER: builtins.int
@property
def transformer_lambda_function(self) -> protos.playbooks.source_task_definitions.lambda_function_task_pb2.Lambda.Function: ...
@property
def payload(self) -> google.protobuf.struct_pb2.Struct: ...
def __init__(
self,
*,
transformer_lambda_function: protos.playbooks.source_task_definitions.lambda_function_task_pb2.Lambda.Function | None = ...,
payload: google.protobuf.struct_pb2.Struct | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["payload", b"payload", "transformer_lambda_function", b"transformer_lambda_function"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["payload", b"payload", "transformer_lambda_function", b"transformer_lambda_function"]) -> None: ...

global___TestResultTransformerRequest = TestResultTransformerRequest

@typing_extensions.final
class TestResultTransformerResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

SUCCESS_FIELD_NUMBER: builtins.int
MESSAGE_FIELD_NUMBER: builtins.int
OUTPUT_FIELD_NUMBER: builtins.int
@property
def success(self) -> google.protobuf.wrappers_pb2.BoolValue: ...
@property
def message(self) -> protos.base_pb2.Message: ...
@property
def output(self) -> google.protobuf.struct_pb2.Struct: ...
def __init__(
self,
*,
success: google.protobuf.wrappers_pb2.BoolValue | None = ...,
message: protos.base_pb2.Message | None = ...,
output: google.protobuf.struct_pb2.Struct | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["message", b"message", "output", b"output", "success", b"success"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["message", b"message", "output", b"output", "success", b"success"]) -> None: ...

global___TestResultTransformerResponse = TestResultTransformerResponse

@typing_extensions.final
class PlaybooksBuilderOptionsRequest(google.protobuf.message.Message):
"""/////////////////// Playbook Builder APIs /////////////////////"""
Expand Down
1 change: 1 addition & 0 deletions protos/playbooks/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message WorkflowConfiguration {
google.protobuf.BoolValue generate_summary = 1;
google.protobuf.Struct global_variable_set = 2;
Lambda.Function transformer_lambda_function = 3;
google.protobuf.UInt64Value evaluation_window_in_seconds = 4;
}


Expand Down
Loading

0 comments on commit 034daf0

Please sign in to comment.