forked from openwallet-foundation/acapy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpack_format.py
190 lines (154 loc) · 6.42 KB
/
pack_format.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Standard packed message format classes."""
import json
import logging
from typing import Sequence, Tuple, Union
from ..config.base import InjectorError
from ..config.injection_context import InjectionContext
from ..protocols.routing.v1_0.messages.forward import Forward
from ..messaging.util import time_now
from ..utils.task_queue import TaskQueue
from ..wallet.base import BaseWallet
from ..wallet.error import WalletError
from .error import MessageParseError, MessageEncodeError
from .inbound.receipt import MessageReceipt
from .wire_format import BaseWireFormat
LOGGER = logging.getLogger(__name__)
class PackWireFormat(BaseWireFormat):
"""Standard DIDComm message parser and serializer."""
def __init__(self):
"""Initialize the pack wire format instance."""
super().__init__()
self.task_queue: TaskQueue = None
async def parse_message(
self, context: InjectionContext, message_body: Union[str, bytes],
) -> Tuple[dict, MessageReceipt]:
"""
Deserialize an incoming message and further populate the request context.
Args:
context: The injection context for settings and services
message_body: The body of the message
Returns:
A tuple of the parsed message and a message receipt instance
Raises:
MessageParseError: If the JSON parsing failed
MessageParseError: If a wallet is required but can't be located
"""
receipt = MessageReceipt()
receipt.in_time = time_now()
receipt.raw_message = message_body
message_dict = None
message_json = message_body
if not message_json:
raise MessageParseError("Message body is empty")
try:
message_dict = json.loads(message_json)
except ValueError:
raise MessageParseError("Message JSON parsing failed")
if not isinstance(message_dict, dict):
raise MessageParseError("Message JSON result is not an object")
# packed messages are detected by the absence of @type
if "@type" not in message_dict:
try:
unpack = self.unpack(context, message_body, receipt)
message_json = await (
self.task_queue and self.task_queue.run(unpack) or unpack
)
except MessageParseError:
LOGGER.debug("Message unpack failed, falling back to JSON")
else:
receipt.raw_message = message_json
try:
message_dict = json.loads(message_json)
except ValueError:
raise MessageParseError("Message JSON parsing failed")
if not isinstance(message_dict, dict):
raise MessageParseError("Message JSON result is not an object")
# parse thread ID
thread_dec = message_dict.get("~thread")
receipt.thread_id = (
thread_dec and thread_dec.get("thid") or message_dict.get("@id")
)
# handle transport decorator
transport_dec = message_dict.get("~transport")
if transport_dec:
receipt.direct_response_mode = transport_dec.get("return_route")
LOGGER.debug(f"Expanded message: {message_dict}")
return message_dict, receipt
async def unpack(
self,
context: InjectionContext,
message_body: Union[str, bytes],
receipt: MessageReceipt,
):
"""Look up the wallet instance and perform the message unpack."""
try:
wallet: BaseWallet = await context.inject(BaseWallet)
except InjectorError:
raise MessageParseError("Wallet not defined in request context")
try:
unpacked = await wallet.unpack_message(message_body)
(message_json, receipt.sender_verkey, receipt.recipient_verkey,) = unpacked
return message_json
except WalletError as e:
raise MessageParseError("Message unpack failed") from e
async def encode_message(
self,
context: InjectionContext,
message_json: Union[str, bytes],
recipient_keys: Sequence[str],
routing_keys: Sequence[str],
sender_key: str,
) -> Union[str, bytes]:
"""
Encode an outgoing message for transport.
Args:
context: The injection context for settings and services
message_json: The message body to serialize
recipient_keys: A sequence of recipient verkeys
routing_keys: A sequence of routing verkeys
sender_key: The verification key of the sending agent
Returns:
The encoded message
Raises:
MessageEncodeError: If the message could not be encoded
"""
if sender_key and recipient_keys:
pack = self.pack(
context, message_json, recipient_keys, routing_keys, sender_key
)
message = await (self.task_queue and self.task_queue.run(pack) or pack)
else:
message = message_json
return message
async def pack(
self,
context: InjectionContext,
message_json: Union[str, bytes],
recipient_keys: Sequence[str],
routing_keys: Sequence[str],
sender_key: str,
):
"""Look up the wallet instance and perform the message pack."""
if not sender_key or not recipient_keys:
raise MessageEncodeError("Cannot pack message without associated keys")
wallet: BaseWallet = await context.inject(BaseWallet, required=False)
if not wallet:
raise MessageEncodeError("No wallet instance")
try:
message = await wallet.pack_message(
message_json, recipient_keys, sender_key
)
except WalletError as e:
raise MessageEncodeError("Message pack failed") from e
if routing_keys:
recip_keys = recipient_keys
for router_key in routing_keys:
message = json.loads(message.decode("utf-8"))
fwd_msg = Forward(to=recip_keys[0], msg=message)
# Forwards are anon packed
recip_keys = [router_key]
try:
message = await wallet.pack_message(fwd_msg.to_json(), recip_keys)
except WalletError as e:
raise MessageEncodeError("Forward message pack failed") from e
return message