from __future__ import absolute_import, division, print_function
import logging
from inspect import getmro
from collections import defaultdict
from glue.core.exceptions import InvalidSubscriber, InvalidMessage
from glue.core.message import Message
__all__ = ['Hub', 'HubListener']
[docs]class Hub(object):
"""The hub manages communication between subscribers.
Objects :func:`subscribe` to receive specific message types. When
a message is passed to :func:`broadcast`, the hub observes the
following protocol:
* For each subscriber, it looks for a message class
subscription that is a superclass of the input message type
(if several are found, the most-subclassed one is chosen)
* If one is found, it calls the subscriptions filter(message)
class (if provided)
* If filter(message) == True, it calls handler(message)
(or notify(message) if handler wasn't provided).
"""
def __init__(self, *args):
"""
Any arguments that are passed to Hub will be registered
to the new hub object.
"""
# Dictionary of subscriptions
self._subscriptions = defaultdict(dict)
from glue.core.data import Data
from glue.core.subset import Subset
from glue.core.data_collection import DataCollection
listeners = set(filter(lambda x: isinstance(x, HubListener), args))
data = set(filter(lambda x: isinstance(x, Data), args))
subsets = set(filter(lambda x: isinstance(x, Subset), args))
dcs = set(filter(lambda x: isinstance(x, DataCollection), args))
listeners -= (data | subsets | dcs)
if set(listeners | data | subsets | dcs) != set(args):
raise TypeError("Inputs must be HubListener, data, subset, or "
"data collection objects")
for l in listeners:
l.register_to_hub(self)
for d in data:
d.register_to_hub(self)
for dc in dcs:
dc.register_to_hub(self)
for s in subsets:
s.register()
[docs] def subscribe(self, subscriber, message_class,
handler=None,
filter=lambda x: True):
"""Subscribe an object to a type of message class.
:param subscriber: The subscribing object
:type subscriber: :class:`~glue.core.hub.HubListener`
:param message_class: A :class:`~glue.core.message.Message` class
to subscribe to
:param handler:
An optional function of the form handler(message) that will
receive the message on behalf of the subscriber. If not provided,
this defaults to the HubListener's notify method
:type handler: Callable
:param filter:
An optional function of the form filter(message). Messages
are only passed to the subscriber if filter(message) == True.
The default is to always pass messages.
:type filter: Callable
Raises:
InvalidMessage: If the input class isn't a
:class:`~glue.core.message.Message` class
InvalidSubscriber: If the input subscriber isn't a
HubListener object.
"""
if not isinstance(subscriber, HubListener):
raise InvalidSubscriber("Subscriber must be a HubListener: %s" %
type(subscriber))
if not isinstance(message_class, type) or \
not issubclass(message_class, Message):
raise InvalidMessage("message class must be a subclass of "
"glue.Message: %s" % type(message_class))
logging.getLogger(__name__).info("Subscribing %s to %s",
subscriber, message_class.__name__)
if not handler:
handler = subscriber.notify
self._subscriptions[subscriber][message_class] = (filter, handler)
[docs] def is_subscribed(self, subscriber, message):
"""
Test whether the subscriber has suscribed to a given message class
:param subscriber: The subscriber to test
:param message: The message class to test
Returns:
True if the subscriber/message pair have been subscribed to the hub
"""
return subscriber in self._subscriptions and \
message in self._subscriptions[subscriber]
[docs] def get_handler(self, subscriber, message):
try:
return self._subscriptions[subscriber][message][1]
except KeyError:
return None
[docs] def unsubscribe(self, subscriber, message):
"""
Remove a (subscriber,message) pair from subscription list.
The handler originally attached to the subscription will
no longer be called when broadcasting messages of type message
"""
if subscriber not in self._subscriptions:
return
if message in self._subscriptions[subscriber]:
self._subscriptions[subscriber].pop(message)
[docs] def unsubscribe_all(self, subscriber):
"""
Unsubscribe the object from any subscriptions.
"""
if subscriber in self._subscriptions:
self._subscriptions.pop(subscriber)
def _find_handlers(self, message):
"""Yields all (subscriber, handler) pairs that should receive a message
"""
# self._subscriptions:
# subscriber => { message type => (filter, handler)}
# loop over subscribed objects
for subscriber, subscriptions in list(self._subscriptions.items()):
# subscriptions to message or its superclasses
messages = [msg for msg in subscriptions.keys() if
issubclass(type(message), msg)]
if len(messages) == 0:
continue
# narrow to the most-specific message
candidate = max(messages, key=_mro_count)
test, handler = subscriptions[candidate]
if test(message):
yield subscriber, handler
[docs] def broadcast(self, message):
"""Broadcasts a message to all subscribed objects.
:param message: The message to broadcast
:type message: :class:`~glue.core.message.Message`
"""
logging.getLogger(__name__).info("Broadcasting %s", message)
for subscriber, handler in self._find_handlers(message):
handler(message)
def __getstate__(self):
""" Return a picklable representation of the hub
Note: Only objects in glue.core are currently supported
as pickleable. Thus, any subscriptions from objects outside
glue.core will note be saved or restored
"""
result = self.__dict__.copy()
result['_subscriptions'] = self._subscriptions.copy()
for s in self._subscriptions:
try:
module = s.__module__
except AttributeError:
module = ''
if not module.startswith('glue.core'):
print('Pickle warning: Hub removing subscription to %s' % s)
result['_subscriptions'].pop(s)
return result
[docs]class HubListener(object):
"""
The base class for any object that subscribes to hub messages.
This interface defines a single method, notify, that receives
messages
"""
[docs] def register_to_hub(self, hub):
raise NotImplementedError
[docs] def unregister(self, hub):
""" Default unregistration action. Calls hub.unsubscribe_all on self"""
hub.unsubscribe_all(self)
[docs] def notify(self, message):
raise NotImplementedError("Message has no handler: %s" % message)
def _mro_count(obj):
return len(getmro(obj))