forked from pippinlovesyou/pippin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcombined_code.txt
4929 lines (4174 loc) · 194 KB
/
combined_code.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
##### my_digital_being/framework/activity_decorator.py #####
import functools
import logging
from typing import Callable, Any, Dict, List, Optional
from datetime import datetime
import json
logger = logging.getLogger(__name__)
def activity(
name: str,
energy_cost: float = 0.2,
cooldown: int = 0,
required_skills: Optional[List[str]] = None
):
"""Decorator for activity classes."""
def decorator(cls):
cls.activity_name = name
cls.energy_cost = energy_cost
cls.cooldown = cooldown
cls.required_skills = required_skills or []
cls.last_execution = None
# Add metadata to the class
cls.metadata = {
'name': name,
'energy_cost': energy_cost,
'cooldown': cooldown,
'required_skills': required_skills
}
# Wrap the execute method
original_execute = cls.execute
@functools.wraps(original_execute)
async def wrapped_execute(self, *args, **kwargs):
try:
# Pre-execution checks
if not self._can_execute():
logger.warning(f"Activity {name} is on cooldown")
return ActivityResult(
success=False,
error="Activity is on cooldown"
)
# Log activity start
logger.info(f"Starting activity: {name}")
start_time = datetime.now()
# Execute the activity
result = await original_execute(self, *args, **kwargs)
# Post-execution processing
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
cls.last_execution = end_time
# Log activity completion
logger.info(f"Completed activity: {name} in {duration:.2f} seconds")
return result
except Exception as e:
logger.error(f"Error in activity {name}: {e}")
return ActivityResult(
success=False,
error=str(e)
)
cls.execute = wrapped_execute
return cls
return decorator
class ActivityResult:
"""Class to store activity execution results."""
def __init__(
self,
success: bool,
data: Optional[Any] = None,
error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
self.success = success
self.data = data
self.error = error
self.metadata = metadata or {}
self.timestamp = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary format."""
data_dict = {}
if self.data:
if hasattr(self.data, 'to_dict'):
data_dict = self.data.to_dict()
elif isinstance(self.data, dict):
data_dict = self.data
else:
try:
data_dict = json.loads(json.dumps(self.data))
except:
data_dict = str(self.data)
return {
'success': self.success,
'data': data_dict,
'error': self.error,
'metadata': self.metadata,
'timestamp': self.timestamp.isoformat()
}
@classmethod
def success_result(cls, data: Optional[Any] = None, metadata: Optional[Dict[str, Any]] = None):
"""Create a successful result."""
return cls(success=True, data=data, metadata=metadata)
@classmethod
def error_result(cls, error: str, metadata: Optional[Dict[str, Any]] = None):
"""Create an error result."""
return cls(success=False, error=error, metadata=metadata)
class ActivityBase:
"""Base class for all activities."""
def __init__(self):
self.result = None
self.last_execution: Optional[datetime] = None
self.cooldown: int = 0
def _can_execute(self) -> bool:
"""Check if the activity can be executed."""
if self.last_execution is None:
return True
now = datetime.now()
time_since_last = (now - self.last_execution).total_seconds()
return time_since_last >= self.cooldown
def get_result(self) -> Dict[str, Any]:
"""Get the result of the activity execution."""
if isinstance(self.result, ActivityResult):
return self.result.to_dict()
return {
'success': bool(self.result),
'data': self.result if self.result else None,
'error': None,
'timestamp': datetime.now().isoformat()
}
async def execute(self, shared_data) -> ActivityResult:
"""Base execute method that should be overridden by activities."""
raise NotImplementedError("Activities must implement execute method")
def skill_required(skill_name: str):
"""Decorator to specify required skills for methods."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if not hasattr(self, 'required_skills'):
self.required_skills = []
if skill_name not in self.required_skills:
self.required_skills.append(skill_name)
return func(self, *args, **kwargs)
return wrapper
return decorator
##### my_digital_being/framework/activity_loader.py #####
import importlib
import logging
import re
from pathlib import Path
from typing import Dict, Type, Any, Optional
logger = logging.getLogger(__name__)
def read_activity_code(activity_name: str) -> Optional[str]:
"""
Reads the .py file from 'activities/' by the given filename (e.g. 'activity_tweet.py').
Returns its text content or None if file not found.
"""
activity_file = Path(__file__).parent.parent / 'activities' / activity_name
if not activity_file.exists():
logger.warning(f"read_activity_code: File not found: {activity_file}")
return None
return activity_file.read_text()
def write_activity_code(activity_name: str, new_code: str) -> bool:
"""
Writes 'new_code' into the .py file in 'activities/' with the given filename.
Returns True on success, False on error.
"""
activity_file = Path(__file__).parent.parent / 'activities' / activity_name
try:
activity_file.write_text(new_code, encoding='utf-8')
return True
except Exception as e:
logger.error(f"write_activity_code: Failed to write {activity_file}: {e}")
return False
class ActivityLoader:
def __init__(self, activities_path: str = None, config: dict = None):
"""
:param activities_path: Where activity_*.py files live.
:param config: The main config object from being.configs (used to skip disabled).
"""
if activities_path is None:
activities_path = Path(__file__).parent.parent / 'activities'
self.activities_path = Path(activities_path)
# [ADDED] We'll read 'activities_config' from config
self.activities_config = {}
if config:
self.activities_config = (
config.get("activity_constraints", {}).get("activities_config", {})
)
self.loaded_activities: Dict[str, Type[Any]] = {}
logger.info(f"ActivityLoader initialized with path: {self.activities_path}")
def load_activities(self):
"""Load all activities from the activities directory."""
if not self.activities_path.exists():
logger.error(f"Activities directory not found: {self.activities_path}")
return
logger.info(f"Starting to load activities from: {self.activities_path}")
for activity_file in self.activities_path.glob("activity_*.py"):
try:
logger.info(f"Found activity file: {activity_file}")
file_text = activity_file.read_text()
# We expect a pattern like: class SomeActivity(ActivityBase):
class_match = re.search(r'class\s+(\w+)\(.*ActivityBase.*\):', file_text)
if not class_match:
logger.error(f"No recognized activity class in {activity_file}")
continue
class_name = class_match.group(1)
module_name = activity_file.stem # e.g. "activity_draw"
# Possibly skip if "enabled": false in activities_config
activity_cfg = None
if class_name in self.activities_config:
activity_cfg = self.activities_config[class_name]
elif module_name in self.activities_config:
activity_cfg = self.activities_config[module_name]
if activity_cfg and (activity_cfg.get("enabled") is False):
logger.info(f"Activity {class_name} is disabled by config, skipping load.")
continue
spec = importlib.util.spec_from_file_location(module_name, activity_file)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
activity_class = getattr(module, class_name)
self.loaded_activities[module_name] = activity_class
logger.info(f"Successfully loaded activity {module_name} -> class {class_name}")
except Exception as e:
logger.error(f"Failed to load activity {activity_file}: {str(e)}", exc_info=True)
def get_activity(self, activity_name: str) -> Optional[Type[Any]]:
"""Get an activity class by module name (e.g. 'activity_tweet')."""
return self.loaded_activities.get(activity_name)
def get_all_activities(self) -> Dict[str, Type[Any]]:
"""Get all loaded activities (module_name -> class)."""
return self.loaded_activities.copy()
def reload_activities(self):
"""Reload all activities by clearing and reloading."""
self.loaded_activities.clear()
self.load_activities()
##### my_digital_being/framework/activity_selector.py #####
import logging
import random
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class ActivitySelector:
def __init__(self, constraints: Dict[str, Any], state):
"""
:param constraints: A dictionary that typically includes:
{
"activity_cooldowns": { ... }, # No longer used
"activity_requirements": { ... },
"activities_config": { "DrawActivity": {"enabled": false}, ... }
}
:param state: The DigitalBeing's State object, used to check mood, energy, etc.
"""
self.constraints = constraints
self.state = state
# Tracks the last time each activity class was executed
self.last_activity_times: Dict[str, datetime] = {}
# The loader is not set until set_activity_loader() is called
self.activity_loader = None
def set_activity_loader(self, loader):
"""
Attach an ActivityLoader instance to this ActivitySelector.
That loader has the loaded_activities dictionary (module_name -> activity_class).
"""
self.activity_loader = loader
logger.info("Activity loader set in selector")
def select_next_activity(self):
"""
Main entry point:
1. Gather all available activities (not on cooldown, not disabled).
2. Filter them by additional requirements like energy, skill requirements, etc.
3. Use personality to pick one at random (weighted).
4. Record the time we picked it.
5. Return the activity instance or None.
"""
if not self.activity_loader:
logger.error("Activity loader not set; cannot select activity.")
return None
# Step 1: get all activities that are not on cooldown and are enabled
available_activities = self._get_available_activities()
if not available_activities:
next_available = self.get_next_available_times()
logger.info(f"No activities available at this time. Next available activities: {next_available}")
return None
# Step 2: filter out ones that fail "energy" or "activity_requirements"
suitable_activities = []
for activity in available_activities:
activity_name = activity.__class__.__name__
if (self._check_energy_requirements(activity) and
self._check_activity_requirements(activity_name)):
logger.info(f"Activity {activity_name} is suitable for execution.")
suitable_activities.append(activity)
else:
logger.info(f"Activity {activity_name} does not meet requirements.")
if not suitable_activities:
logger.info("No activities suitable for current state.")
return None
# Step 3: personality-based selection
# (If you have a "personality" dict in state, else use {}.)
personality = self.state.get_current_state().get('personality', {})
selected_activity = self._select_based_on_personality(
suitable_activities,
personality
)
if selected_activity:
chosen_name = selected_activity.__class__.__name__
logger.info(f"Selected activity: {chosen_name}")
# Step 4: record the time we picked it
self.last_activity_times[chosen_name] = datetime.now()
return selected_activity
def get_next_available_times(self) -> List[Dict[str, Any]]:
"""
Provide info on when each loaded activity class will be available again.
This is mostly for debugging/logging: "You can next run DrawActivity in 1.5 hours", etc.
We now use the activity's decorator-based cooldown.
"""
current_time = datetime.now()
next_available = []
all_activities = self.activity_loader.get_all_activities()
for activity_name, activity_class in all_activities.items():
base_name = activity_class.__name__
# Pull cooldown from the class (decorator)
cooldown = getattr(activity_class, 'cooldown', 0)
last_time = self.last_activity_times.get(base_name)
if last_time:
time_since_last = (current_time - last_time).total_seconds()
time_remaining = max(0, cooldown - time_since_last)
next_time = current_time + timedelta(seconds=time_remaining)
next_available.append({
'activity': base_name,
'available_in_seconds': time_remaining,
'next_available_at': next_time.strftime('%Y-%m-%d %H:%M:%S'),
'cooldown_period': cooldown
})
else:
# never run before => it's available now
next_available.append({
'activity': base_name,
'available_in_seconds': 0,
'next_available_at': 'Now',
'cooldown_period': cooldown
})
# sort by soonest availability
return sorted(next_available, key=lambda x: x['available_in_seconds'])
def _get_available_activities(self) -> List[Any]:
"""
Return a list of *activity instances* that:
1) Are loaded by the ActivityLoader
2) Are "enabled" in the config
3) Are not on cooldown (based on the activity's own decorator-based cooldown)
Then the caller can further filter them for energy or skill requirements.
"""
available = []
current_time = datetime.now()
all_activities = self.activity_loader.get_all_activities()
activities_config = self.constraints.get('activities_config', {})
for module_name, activity_class in all_activities.items():
base_name = activity_class.__name__
# 1) skip if disabled
if base_name in activities_config:
if activities_config[base_name].get("enabled", True) is False:
logger.info(f"Skipping disabled activity: {base_name}")
continue
# 2) check if it's on cooldown
cooldown = getattr(activity_class, 'cooldown', 0)
last_time = self.last_activity_times.get(base_name)
if last_time:
time_since_last = (current_time - last_time).total_seconds()
if time_since_last < cooldown:
logger.info(f"{base_name} still on cooldown for {cooldown - time_since_last:.1f}s more.")
continue
# If we get here, the activity is enabled & not on cooldown
try:
instance = activity_class()
logger.info(f"Created instance of {base_name} successfully.")
available.append(instance)
except Exception as e:
logger.error(f"Failed to create instance of {base_name}: {e}", exc_info=True)
return available
def _check_activity_requirements(self, activity_name: str) -> bool:
"""
Check constraints['activity_requirements'][activity_name] if you need logic
for required skills or memory usage. Currently returns True to accept all.
"""
requirements = self.constraints.get('activity_requirements', {}).get(activity_name, {})
logger.debug(f"Checking requirements for {activity_name}: {requirements}")
return True
def _check_energy_requirements(self, activity) -> bool:
"""
Check if the being has enough energy for the activity (activity.energy_cost).
"""
current_energy = self.state.get_current_state().get('energy', 1.0)
required_energy = getattr(activity, 'energy_cost', 0.2)
has_energy = current_energy >= required_energy
if not has_energy:
logger.info(f"Insufficient energy for {activity.__class__.__name__} "
f"(required={required_energy}, current={current_energy}).")
return has_energy
def _select_based_on_personality(self, activities: List[Any], personality: Dict[str, float]) -> Optional[Any]:
"""
Given a list of candidate activity instances, choose one with a weighted random approach.
"""
if not activities:
return None
weights = []
for activity in activities:
weight = 1.0
if hasattr(activity, 'creativity_factor'):
weight *= (1 + personality.get('creativity', 0.5) * activity.creativity_factor)
if hasattr(activity, 'social_factor'):
weight *= (1 + personality.get('friendliness', 0.5) * activity.social_factor)
weights.append(weight)
chosen = random.choices(activities, weights=weights, k=1)[0]
return chosen
##### my_digital_being/framework/api_key_setup.py #####
"""Tool for securely setting up API keys."""
import logging
from typing import List, Dict, Tuple
import os
from .secret_storage import secret_manager
logger = logging.getLogger(__name__)
class APIKeySetup:
"""Manages the setup and validation of API keys for skills."""
@staticmethod
async def setup_keys(skill_name: str, required_keys: List[str]) -> Dict[str, bool]:
"""
Set up API keys for a skill using the configured secret storage.
Args:
skill_name: Name of the skill requiring API keys
required_keys: List of required API key names
Returns:
Dictionary mapping key names to setup success status
"""
results = {}
try:
# For Replit environment, use ask_secrets
if 'REPL_ID' in os.environ:
from replit import ask_secrets
env_keys = [f"{skill_name.upper()}_{key.upper()}_API_KEY" for key in required_keys]
await ask_secrets(
secret_keys=env_keys,
user_message=f"""
The {skill_name} skill requires the following API keys to function:
{', '.join(required_keys)}
Please provide these keys to enable the skill's functionality.
These will be stored securely as environment variables.
"""
)
# Verify keys were set properly
for key in required_keys:
exists = await secret_manager.check_api_key_exists(skill_name, key)
results[key] = exists
if exists:
logger.info(f"Successfully set up {key} API key for {skill_name}")
else:
logger.warning(f"Failed to set up {key} API key for {skill_name}")
except Exception as e:
logger.error(f"Error setting up API keys for {skill_name}: {e}")
for key in required_keys:
results[key] = False
return results
@staticmethod
async def check_skill_keys(skill_name: str, required_keys: List[str]) -> Tuple[bool, List[str]]:
"""
Check if a skill has all required API keys configured.
Args:
skill_name: Name of the skill to test
required_keys: List of required API keys
Returns:
Tuple of (success, list of missing keys)
"""
missing_keys = []
for key in required_keys:
exists = await secret_manager.check_api_key_exists(skill_name, key)
if not exists:
missing_keys.append(key)
return len(missing_keys) == 0, missing_keys
@staticmethod
async def list_skill_requirements(skill_requirements: Dict[str, List[str]]) -> str:
"""
Get a formatted string of all skills and their API key requirements.
Args:
skill_requirements: Dictionary mapping skill names to their required keys
Returns:
Formatted string showing all skills and their required API keys
"""
if not skill_requirements:
return "No skills with API key requirements registered."
output = ["Skill API Key Requirements:"]
for skill, keys in skill_requirements.items():
success, missing = await APIKeySetup.check_skill_keys(skill, keys)
status = "✓" if success else "✗"
output.append(f"\n{status} {skill}:")
for key in keys:
exists = await secret_manager.check_api_key_exists(skill, key)
configured = "✓" if exists else "✗"
output.append(f" {configured} {key}")
return "\n".join(output)
##### my_digital_being/framework/api_management.py #####
"""
Unified API key management system with flexible storage backends.
Implements:
- get_skill_status -> returns any "required_keys"
- get_composio_integrations -> calls composio_manager.list_available_integrations()
- set_api_key -> if you want to store them
"""
import logging
from typing import Dict, Any, Optional, Set, List
from .secret_storage import secret_manager
from .composio_integration import composio_manager
logger = logging.getLogger(__name__)
class APIManager:
def __init__(self):
# Example: track required keys for each skill
self._required_keys: Dict[str, Set[str]] = {}
self._secret_manager = secret_manager
self._composio_manager = composio_manager
logger.info("Initialized API Manager with Composio integration")
@property
def composio_manager(self):
return self._composio_manager
def register_required_keys(self, skill_name: str, required_keys: List[str]) -> bool:
"""
Register that a given skill_name requires the specified list of key names (e.g. ["OPENAI"]).
"""
if not skill_name or not required_keys:
return False
self._required_keys[skill_name] = set(required_keys)
logger.info(f"Registered keys for skill {skill_name}: {required_keys}")
return True
def get_required_keys(self, skill_name: Optional[str] = None) -> Dict[str, List[str]]:
"""
Return a dict of skill -> list of required keys.
If skill_name is provided, return only that one skill's key list.
"""
if skill_name:
# Return just one skill's keys if it exists
if skill_name in self._required_keys:
return { skill_name: list(self._required_keys[skill_name]) }
else:
return { skill_name: [] }
else:
# Return all
return { skill: list(keys) for skill, keys in self._required_keys.items() }
async def check_api_key_exists(self, skill_name: str, key_name: str) -> bool:
"""
Pass-through to secret_manager to check if a key is set.
This is used in e.g. ImageGenerationSkill or other activities that do:
`await api_manager.check_api_key_exists(...).`
"""
return await self._secret_manager.check_api_key_exists(skill_name, key_name)
async def get_api_key(self, skill_name: str, key_name: str) -> Optional[str]:
"""
Return the actual API key string from secret_manager.
Called by skill_chat.py's initialize() or skill_generate_image.py, etc.
"""
return await self._secret_manager.get_api_key(skill_name, key_name)
async def get_skill_status(self) -> Dict[str, Any]:
"""
Example: For each skill, show which keys are configured or not.
"""
skills_status = {}
for skill, keys in self._required_keys.items():
skill_info = {
"display_name": skill.title(),
"required_keys": {}
}
for k in keys:
# Check if configured
exists = await self._secret_manager.check_api_key_exists(skill, k)
skill_info["required_keys"][k] = bool(exists)
skills_status[skill] = skill_info
return skills_status
async def set_api_key(self, skill_name: str, key_name: str, value: str) -> Dict[str, Any]:
"""
Store a new API key into secret_manager for a given skill & key name.
"""
success = await self._secret_manager.set_api_key(skill_name, key_name, value)
return {"success": success, "affected_skills": {}}
async def get_composio_integrations(self) -> List[Dict[str, Any]]:
"""
Ask the ComposioManager for the available integrations (connected or not).
"""
return await self._composio_manager.list_available_integrations()
async def list_actions_for_app(self, app_name: str) -> Dict[str, Any]:
"""
Calls composio_manager.list_actions_for_app(app_name).
"""
return await self._composio_manager.list_actions_for_app(app_name)
# Global
api_manager = APIManager()
##### my_digital_being/framework/composio_integration.py #####
"""
Composio integration module for managing OAuth flows and dynamic tool integration.
Implements:
- handle_oauth_callback(...) to finalize the connection
- store a connected indicator in _oauth_connections
- list_available_integrations() returns "connected": True if we have that.
- list_actions_for_app(...) returns the app's actions by calling Composio's API directly
[ADDED] We now persist these connections in ./storage/composio_oauth.json
"""
import os
import logging
import json
from pathlib import Path
from typing import Dict, Any, List
import requests # Used for the direct Composio API call
from .secret_storage import secret_manager
from composio_openai import ComposioToolSet
logger = logging.getLogger(__name__)
class ComposioManager:
def __init__(self):
self._toolset = None
self._entity_id = "MyDigitalBeing"
self._oauth_connections: Dict[str, Dict[str, Any]] = {}
self._available_apps: Dict[str, Any] = {}
# [ADDED] We store the OAuth connections in a JSON file
self.storage_file = Path("./storage/composio_oauth.json")
logger.info("Starting Composio integration initialization...")
# Load persisted OAuth connections if any
self._load_persistence()
# Initialize the Composio toolset
self._initialize_toolset()
# [ADDED] Load connections from disk
def _load_persistence(self):
if self.storage_file.exists():
try:
with self.storage_file.open("r", encoding="utf-8") as f:
self._oauth_connections = json.load(f)
logger.info(f"Loaded Composio OAuth connections from {self.storage_file}")
except Exception as e:
logger.warning(f"Error loading Composio OAuth file: {e}")
else:
logger.info("No existing Composio OAuth file found.")
# [ADDED] Save connections to disk
def _save_persistence(self):
try:
self.storage_file.parent.mkdir(exist_ok=True)
with self.storage_file.open("w", encoding="utf-8") as f:
json.dump(self._oauth_connections, f, indent=2)
logger.info("Saved Composio OAuth connections to disk.")
except Exception as e:
logger.error(f"Failed to save Composio OAuth connections: {e}")
def _initialize_toolset(self):
try:
api_key = os.environ.get("COMPOSIO_API_KEY")
if not api_key:
logger.error("No COMPOSIO_API_KEY in environment")
return
self._toolset = ComposioToolSet(
api_key=api_key,
entity_id=self._entity_id
)
logger.info("Created ComposioToolSet instance")
# Load the list of apps
tools = self._toolset.get_tools(actions=["COMPOSIO_LIST_APPS"])
result = self._toolset.execute_action(
action="COMPOSIO_LIST_APPS",
params={},
entity_id=self._entity_id
)
success_value = result.get("success") or result.get("successfull")
if success_value:
apps_data = result.get("data", {})
apps_list = apps_data.get("apps", [])
for app_info in apps_list:
key = app_info.get("key", "").upper()
if key:
self._available_apps[key] = app_info
logger.info(f"Fetched {len(self._available_apps)} apps from Composio meta-app")
else:
logger.warning("COMPOSIO_LIST_APPS action failed.")
except Exception as e:
logger.error(f"Error init Composio: {e}", exc_info=True)
self._available_apps = {}
def mark_app_connected(self, app_name: str, connection_id: str):
"""Utility to mark an app as connected in our local _oauth_connections dict."""
upper_app = app_name.upper()
self._oauth_connections[upper_app] = {
"connected": True,
"connection_id": connection_id
}
logger.info(f"mark_app_connected: Marked {upper_app} as connected with connection_id={connection_id}")
# [ADDED] Persist updated connections to disk
self._save_persistence()
async def initiate_oauth_flow(self, app_name: str, redirect_url: str) -> Dict[str, Any]:
"""Begin an OAuth connection for a given app."""
if not self._toolset:
return {"success": False, "error": "Toolset not initialized"}
try:
upper_app = app_name.upper()
app_info = self._available_apps.get(upper_app)
if not app_info:
return {"success": False, "error": f"Unknown app: {app_name}"}
connection_req = self._toolset.initiate_connection(
redirect_url=redirect_url,
entity_id=self._entity_id,
app=app_info["key"] # e.g. "twitter"
)
# Some versions of Composio call it 'connectionId', or 'connectedAccountId'
conn_id = getattr(connection_req, "connectionId", None)
if not conn_id:
conn_id = getattr(connection_req, "connectedAccountId", None)
if not conn_id:
return {
"success": False,
"error": "'ConnectionRequestModel' object has no attribute 'connectionId'"
}
return {
"success": True,
"redirect_url": connection_req.redirectUrl,
"connection_id": conn_id
}
except Exception as e:
logger.error(f"initiate_oauth_flow error for {app_name}: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def handle_oauth_callback(self, connection_id: str, code: str) -> Dict[str, Any]:
"""
Finalize the OAuth flow for a given connection_id using the code from the provider.
Then store 'connected' in _oauth_connections so our front-end can see that it's connected.
"""
if not self._toolset:
return {"success": False, "error": "Toolset not initialized"}
try:
result = self._toolset.complete_connection(connection_id=connection_id, code=code)
if result.success:
# Mark as connected
app_key = result.app.upper() if result.app else "UNKNOWN"
self.mark_app_connected(app_key, connection_id)
logger.info(f"handle_oauth_callback: Marked {app_key} as connected.")
else:
logger.warning(f"handle_oauth_callback: success=False for {result.app}")
return {
"success": result.success,
"app": result.app,
"message": "Connection successful" if result.success else "Connection failed"
}
except Exception as e:
logger.error(f"Error in handle_oauth_callback: {e}", exc_info=True)
return {"success": False, "error": str(e)}
def mark_app_connected_without_code(self, app_name: str, connected_account_id: str):
"""
If Composio doesn't require .complete_connection for some flows
but returns connectedAccountId in the callback,
we can directly mark the app as connected.
"""
self.mark_app_connected(app_name, connected_account_id)
async def list_available_integrations(self) -> List[Dict[str, Any]]:
"""
Return a list of all apps from _available_apps,
with "connected" = True if we've tracked them in _oauth_connections.
"""
results = []
for key, info in self._available_apps.items():
upper_key = key.upper()
is_connected = False
if upper_key in self._oauth_connections and self._oauth_connections[upper_key].get("connected"):
is_connected = True
results.append({
"name": upper_key, # e.g. "TWITTER"
"display_name": info.get("name", upper_key),
"connected": is_connected,
"oauth_supported": True,
})
return results
async def list_actions_for_app(self, app_name: str) -> Dict[str, Any]:
"""
Returns a structure with all possible Composio actions for the given app_name,
using a direct GET call to Composio's /api/v2/actions/list/all endpoint.
Example return structure:
{
"success": True,
"actions": [
"TWITTER_TWEET_CREATE",
"TWITTER_DM_SEND",
...
]
}
"""
upper_app = app_name.upper()
# Check if the app is recognized in our local cache
if upper_app not in self._available_apps:
return {"success": False, "error": f"App '{app_name}' not recognized in _available_apps"}
# Check if the app is connected
if not self._oauth_connections.get(upper_app, {}).get("connected"):
return {"success": False, "error": f"App '{app_name}' is not connected yet"}
api_key = os.environ.get("COMPOSIO_API_KEY")
if not api_key:
return {"success": False, "error": "No COMPOSIO_API_KEY set in environment"}
base_url = "https://backend.composio.dev/api/v2/actions/list/all"
headers = {"x-api-key": api_key}
params = {"apps": app_name.lower()} # Composio expects lowercased
try:
resp = requests.get(base_url, headers=headers, params=params, timeout=10)
if resp.status_code == 200:
data_json = resp.json()
items = data_json.get("items", [])
actions = []
for item in items:
action_key = item.get("actionKey")
if action_key:
actions.append(action_key)
else:
display_name = item.get("displayName")
if display_name:
actions.append(display_name)
return {"success": True, "actions": actions}
else:
logger.error(f"Composio API returned {resp.status_code} for app {app_name}")
return {
"success": False,
"error": f"Composio returned status {resp.status_code}"
}
except Exception as ex:
logger.error(f"Error retrieving actions for {app_name} from Composio: {ex}", exc_info=True)
return {"success": False, "error": str(ex)}
# Global single instance
composio_manager = ComposioManager()
##### my_digital_being/framework/main.py #####
import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional
import asyncio
from datetime import datetime
from .memory import Memory
from .state import State
from .activity_selector import ActivitySelector
from .activity_loader import ActivityLoader
from .shared_data import SharedData
from .activity_decorator import ActivityResult
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DigitalBeing:
def __init__(self, config_path: Optional[str] = None):
# Use the config directory relative to this file's location
if config_path is None:
config_path = str(Path(__file__).parent.parent / 'config')
self.config_path = Path(config_path)
self.configs = self._load_configs()
self.shared_data = SharedData()
self.memory = Memory()
self.state = State()
self.activity_loader = ActivityLoader()
self.activity_selector = ActivitySelector(
self.configs.get("activity_constraints", {}),
self.state