-
Notifications
You must be signed in to change notification settings - Fork 143
/
pluginbase.py
449 lines (376 loc) · 15.9 KB
/
pluginbase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# -*- coding: utf-8 -*-
"""
pluginbase
~~~~~~~~~~
Pluginbase is a module for Python that provides a system for building
plugin based applications.
:copyright: (c) Copyright 2014 by Armin Ronacher.
:license: BSD, see LICENSE for more details.
"""
import os
import sys
import uuid
import errno
import pkgutil
import hashlib
import threading
from types import ModuleType
from weakref import ref as weakref
PY2 = sys.version_info[0] == 2
if PY2:
text_type = unicode
string_types = (unicode, str)
from cStringIO import StringIO as NativeBytesIO
else:
text_type = str
string_types = (str,)
from io import BytesIO as NativeBytesIO
__version__ = '1.0.1'
_local = threading.local()
_internalspace = ModuleType(__name__ + '._internalspace')
_internalspace.__path__ = []
sys.modules[_internalspace.__name__] = _internalspace
def get_plugin_source(module=None, stacklevel=None):
"""Returns the :class:`PluginSource` for the current module or the given
module. The module can be provided by name (in which case an import
will be attempted) or as a module object.
If no plugin source can be discovered, the return value from this method
is `None`.
This function can be very useful if additional data has been attached
to the plugin source. For instance this could allow plugins to get
access to a back reference to the application that created them.
:param module: optionally the module to locate the plugin source of.
:param stacklevel: defines how many levels up the module should search
for before it discovers the plugin frame. The
default is 0. This can be useful for writing wrappers
around this function.
"""
if module is None:
frm = sys._getframe((stacklevel or 0) + 1)
name = frm.f_globals['__name__']
glob = frm.f_globals
elif isinstance(module, string_types):
frm = sys._getframe(1)
name = module
glob = __import__(module, frm.f_globals,
frm.f_locals, ['__dict__']).__dict__
else:
name = module.__name__
glob = module.__dict__
return _discover_space(name, glob)
def get_searchpath(path, depth=float('inf'), followlinks=False):
"""This utility function returns a list directories suitable for use as the
*searchpath* argument to :class:`PluginSource`. This will recursively add
directories up to the specified depth.
:param str path: The directory on the file system to start the search path
at. It will be included in the result.
:param int depth: The number of directories to recurse into while building
the search path. By default the function will iterate into
all child directories.
:param bool followlinks: Whether or not to recurse into directories which
are symbolic links.
:return: A list of directories, including *path* and child directories.
:rtype: list
"""
# os.walk implements a depth-first approach which results in unnecessarily
# slow execution when *path* is a large tree and *depth* is a small number
paths = [path]
for dir_entry in os.listdir(path):
sub_path = os.path.join(path, dir_entry)
if not os.path.isdir(sub_path):
continue
if not followlinks and os.path.islink(sub_path):
continue
if depth:
paths.extend(get_searchpath(sub_path, depth - 1, followlinks))
return paths
def _discover_space(name, globals):
try:
return _local.space_stack[-1]
except (AttributeError, IndexError):
pass
if '__pluginbase_state__' in globals:
return globals['__pluginbase_state__'].source
mod_name = globals.get('__name__')
if mod_name is not None and \
mod_name.startswith(_internalspace.__name__ + '.'):
end = mod_name.find('.', len(_internalspace.__name__) + 1)
space = sys.modules.get(mod_name[:end])
if space is not None:
return space.__pluginbase_state__.source
def _shutdown_module(mod):
members = list(mod.__dict__.items())
for key, value in members:
if key[:1] != '_':
setattr(mod, key, None)
for key, value in members:
setattr(mod, key, None)
def _to_bytes(s):
if isinstance(s, text_type):
return s.encode('utf-8')
return s
class _IntentionallyEmptyModule(ModuleType):
def __getattr__(self, name):
try:
return ModuleType.__getattr__(self, name)
except AttributeError:
if name[:2] == '__':
raise
raise RuntimeError(
'Attempted to import from a plugin base module (%s) without '
'having a plugin source activated. To solve this error '
'you have to move the import into a "with" block of the '
'associated plugin source.' % self.__name__)
class _PluginSourceModule(ModuleType):
def __init__(self, source):
modname = '%s.%s' % (_internalspace.__name__, source.spaceid)
ModuleType.__init__(self, modname)
self.__pluginbase_state__ = PluginBaseState(source)
@property
def __path__(self):
try:
ps = self.__pluginbase_state__.source
except AttributeError:
return []
return ps.searchpath + ps.base.searchpath
def _setup_base_package(module_name):
try:
mod = __import__(module_name, None, None, ['__name__'])
except ImportError:
mod = None
if '.' in module_name:
parent_mod = __import__(module_name.rsplit('.', 1)[0],
None, None, ['__name__'])
else:
parent_mod = None
if mod is None:
mod = _IntentionallyEmptyModule(module_name)
if parent_mod is not None:
setattr(parent_mod, module_name.rsplit('.', 1)[-1], mod)
sys.modules[module_name] = mod
class PluginBase(object):
"""The plugin base acts as a control object around a dummy Python
package that acts as a container for plugins. Usually each
application creates exactly one base object for all plugins.
:param package: the name of the package that acts as the plugin base.
Usually this module does not exist. Unless you know
what you are doing you should not create this module
on the file system.
:param searchpath: optionally a shared search path for modules that
will be used by all plugin sources registered.
"""
def __init__(self, package, searchpath=None):
#: the name of the dummy package.
self.package = package
if searchpath is None:
searchpath = []
#: the default search path shared by all plugins as list.
self.searchpath = searchpath
_setup_base_package(package)
def make_plugin_source(self, *args, **kwargs):
"""Creates a plugin source for this plugin base and returns it.
All parameters are forwarded to :class:`PluginSource`.
"""
return PluginSource(self, *args, **kwargs)
class PluginSource(object):
"""The plugin source is what ultimately decides where plugins are
loaded from. Plugin bases can have multiple plugin sources which act
as isolation layer. While this is not a security system it generally
is not possible for plugins from different sources to accidentally
cross talk.
Once a plugin source has been created it can be used in a ``with``
statement to change the behavior of the ``import`` statement in the
block to define which source to load the plugins from::
plugin_source = plugin_base.make_plugin_source(
searchpath=['./path/to/plugins', './path/to/more/plugins'])
with plugin_source:
from myapplication.plugins import my_plugin
:param base: the base this plugin source belongs to.
:param identifier: optionally a stable identifier. If it's not defined
a random identifier is picked. It's useful to set this
to a stable value to have consistent tracebacks
between restarts and to support pickle.
:param searchpath: a list of paths where plugins are looked for.
:param persist: optionally this can be set to `True` and the plugins
will not be cleaned up when the plugin source gets
garbage collected.
"""
# Set these here to false by default so that a completely failing
# constructor does not fuck up the destructor.
persist = False
mod = None
def __init__(self, base, identifier=None, searchpath=None,
persist=False):
#: indicates if this plugin source persists or not.
self.persist = persist
if identifier is None:
identifier = str(uuid.uuid4())
#: the identifier for this source.
self.identifier = identifier
#: A reference to the plugin base that created this source.
self.base = base
#: a list of paths where plugins are searched in.
self.searchpath = searchpath
#: The internal module name of the plugin source as it appears
#: in the :mod:`pluginsource._internalspace`.
self.spaceid = '_sp' + hashlib.md5(
_to_bytes(self.base.package) + b'|' +
_to_bytes(identifier),
).hexdigest()
#: a reference to the module on the internal
#: :mod:`pluginsource._internalspace`.
self.mod = _PluginSourceModule(self)
if hasattr(_internalspace, self.spaceid):
raise RuntimeError('This plugin source already exists.')
sys.modules[self.mod.__name__] = self.mod
setattr(_internalspace, self.spaceid, self.mod)
def __del__(self):
if not self.persist:
self.cleanup()
def list_plugins(self):
"""Returns a sorted list of all plugins that are available in this
plugin source. This can be useful to automatically discover plugins
that are available and is usually used together with
:meth:`load_plugin`.
"""
rv = []
for _, modname, ispkg in pkgutil.iter_modules(self.mod.__path__):
rv.append(modname)
return sorted(rv)
def load_plugin(self, name):
"""This automatically loads a plugin by the given name from the
current source and returns the module. This is a convenient
alternative to the import statement and saves you from invoking
``__import__`` or a similar function yourself.
:param name: the name of the plugin to load.
"""
if '.' in name:
raise ImportError('Plugin names cannot contain dots.')
with self:
return __import__(self.base.package + '.' + name,
globals(), {}, ['__name__'])
def open_resource(self, plugin, filename):
"""This function locates a resource inside the plugin and returns
a byte stream to the contents of it. If the resource cannot be
loaded an :exc:`IOError` will be raised. Only plugins that are
real Python packages can contain resources. Plain old Python
modules do not allow this for obvious reasons.
.. versionadded:: 0.3
:param plugin: the name of the plugin to open the resource of.
:param filename: the name of the file within the plugin to open.
"""
mod = self.load_plugin(plugin)
fn = getattr(mod, '__file__', None)
if fn is not None:
if fn.endswith(('.pyc', '.pyo')):
fn = fn[:-1]
if os.path.isfile(fn):
return open(os.path.join(os.path.dirname(fn), filename), 'rb')
buf = pkgutil.get_data(self.mod.__name__ + '.' + plugin, filename)
if buf is None:
raise IOError(errno.ENOENT, 'Could not find resource')
return NativeBytesIO(buf)
def cleanup(self):
"""Cleans up all loaded plugins manually. This is necessary to
call only if :attr:`persist` is enabled. Otherwise this happens
automatically when the source gets garbage collected.
"""
self.__cleanup()
def __cleanup(self, _sys=sys, _shutdown_module=_shutdown_module):
# The default parameters are necessary because this can be fired
# from the destructor and so late when the interpreter shuts down
# that these functions and modules might be gone.
if self.mod is None or self.mod.__name__ is None:
return
modname = self.mod.__name__
self.mod.__pluginbase_state__ = None
self.mod = None
try:
delattr(_internalspace, self.spaceid)
except AttributeError:
pass
prefix = modname + '.'
# avoid the bug described in issue #6
if modname in _sys.modules:
del _sys.modules[modname]
for key, value in list(_sys.modules.items()):
if not key.startswith(prefix):
continue
mod = _sys.modules.pop(key, None)
if mod is None:
continue
_shutdown_module(mod)
def __assert_not_cleaned_up(self):
if self.mod is None:
raise RuntimeError('The plugin source was already cleaned up.')
def __enter__(self):
self.__assert_not_cleaned_up()
_local.__dict__.setdefault('space_stack', []).append(self)
return self
def __exit__(self, exc_type, exc_value, tb):
try:
_local.space_stack.pop()
except (AttributeError, IndexError):
pass
def _rewrite_module_path(self, modname):
self.__assert_not_cleaned_up()
if modname == self.base.package:
return self.mod.__name__
elif modname.startswith(self.base.package + '.'):
pieces = modname.split('.')
return self.mod.__name__ + '.' + '.'.join(
pieces[self.base.package.count('.') + 1:])
class PluginBaseState(object):
__slots__ = ('_source',)
def __init__(self, source):
if source.persist:
self._source = lambda: source
else:
self._source = weakref(source)
@property
def source(self):
rv = self._source()
if rv is None:
raise AttributeError('Plugin source went away')
return rv
class _ImportHook(ModuleType):
def __init__(self, name, system_import):
ModuleType.__init__(self, name)
self._system_import = system_import
self.enabled = True
def enable(self):
"""Enables the import hook which drives the plugin base system.
This is the default.
"""
self.enabled = True
def disable(self):
"""Disables the import hook and restores the default import system
behavior. This effectively breaks pluginbase but can be useful
for testing purposes.
"""
self.enabled = False
def plugin_import(self, name, globals=None, locals=None,
fromlist=None, level=None):
if level is None:
# set the level to the default value specific to this python version
level = -1 if PY2 else 0
import_name = name
if self.enabled:
ref_globals = globals
if ref_globals is None:
ref_globals = sys._getframe(1).f_globals
space = _discover_space(name, ref_globals)
if space is not None:
actual_name = space._rewrite_module_path(name)
if actual_name is not None:
import_name = actual_name
return self._system_import(import_name, globals, locals,
fromlist, level)
try:
import __builtin__ as builtins
except ImportError:
import builtins
import_hook = _ImportHook(__name__ + '.import_hook', builtins.__import__)
builtins.__import__ = import_hook.plugin_import
sys.modules[import_hook.__name__] = import_hook
del builtins