-
Notifications
You must be signed in to change notification settings - Fork 0
/
agency_rpcs.py
68 lines (54 loc) · 3.49 KB
/
agency_rpcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from collections import defaultdict
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.types import SubscribeOptions, PublishOptions
# A map of lists for different object types. Each object type gets its own list
# of active objects. When a new object is activated, it will be added to the
# list of its type. When that object is deactivated, it will be removed from
# that list.
#
# The result is a list for each object type that represents the currently-
# active set of objects for that type.
ACTIVE_OBJECTS = defaultdict(dict)
# A Crossbar component for storing lists of active objects for an agency (realm).
# This component also handles the "Agency Topic" RPCs as defined in the wiki
# article linked below.
#
# See https://github.com/propershark/shark/wiki/WAMP-Events-and-RPCs#agency-topics
# for more information on these RPCs.
class AgencyRPCSession(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Agency RPC Session joined: {}".format(details))
ACTIONS = defaultdict(lambda: lambda _, __, ___: "nothing")
# Append the given object to the list of ACTIVE_OBJECTS under the given type.
ACTIONS['activate'] = lambda typ, topic, obj: ACTIVE_OBJECTS[typ].update({ topic: obj })
# Update the given object in the list of ACTIVE_OBJECTS under the given type.
ACTIONS['update'] = lambda typ, topic, obj: ACTIVE_OBJECTS[typ].update({ topic: obj })
# Remove the given object from the list of ACTIVE_OBJECTS under the given type.
ACTIONS['deactivate'] = lambda typ, topic, obj: ACTIVE_OBJECTS[typ].pop(topic, None)
# Subscribe to all vehicle topics and store their activations/deactivations.
def vehicle_event(*args, **kwargs): ACTIONS[kwargs['event']]('vehicle', kwargs['details'].topic, args[0])
# Subscribe to all station topics and store their activations/deactivations.
def station_event(*args, **kwargs): ACTIONS[kwargs['event']]('station', kwargs['details'].topic, args[0])
# Subscribe to all route topics and store their activations/deactivations.
def route_event(*args, **kwargs): ACTIONS[kwargs['event']]('route', kwargs['details'].topic, args[0])
# Subscribe to the appropriate channels based on type
yield self.subscribe(vehicle_event, 'vehicles.', options=SubscribeOptions(match=u'prefix', details_arg='details'))
print('Now tracking activations/deactivations on vehicle channels.')
yield self.subscribe(station_event, 'stations.', options=SubscribeOptions(match=u'prefix', details_arg='details'))
print('Now tracking activations/deactivations on station channels.')
yield self.subscribe(route_event, 'routes.', options=SubscribeOptions(match=u'prefix', details_arg='details'))
print('Now tracking activations/deactivations on route channels.')
# Register the `agency.vehicles` rpc.
def agency_vehicles(): return ACTIVE_OBJECTS['vehicle']
# Register the `agency.vehicles` rpc.
def agency_stations(): return ACTIVE_OBJECTS['station']
# Register the `agency.vehicles` rpc.
def agency_routes(): return ACTIVE_OBJECTS['route']
reg = yield self.register(agency_vehicles, 'agency.vehicles')
print('Active Vehicle list now available at `agency.vehicles`.')
reg = yield self.register(agency_stations, 'agency.stations')
print('Active Station list now available at `agency.stations`.')
reg = yield self.register(agency_routes, 'agency.routes')
print('Active Route list now available at `agency.routes`.')