Source code for glue.core.data_factories.helpers

""" Factory methods to build Data objects from files

Implementation notes:

Each factory method conforms to the folowing structure, which
helps the GUI Frontend easily load data:

1) The first argument is a file name to open

2) The return value is a Data object

3) The function should be decorated with data_factory and the decorator should
be given a label parameter that describes (in human language) what kinds of
files it understands, as well as a callable identifier parameter that returns
whether it can handle a requested filename and keyword set

Putting this together, the simplest data factory code looks like this::

    from glue.config import data_factory
    @data_factory(label="Foo file", identifier=has_extension('foo FOO'))
    def dummy_factory(file_name):
        return glue.core.Data()
"""

from __future__ import absolute_import, division, print_function

import os
import warnings

from glue.core.contracts import contract
from glue.core.data import Component, Data
from glue.config import auto_refresh, data_factory
from glue.backends import get_timer
from glue.utils import as_list


__all__ = ['FileWatcher', 'LoadLog',
           'auto_data', 'data_label', 'find_factory',
           'has_extension', 'load_data',
           '_extension']


def _extension(path):
    # extract the extension type from a path
    #  test.fits -> fits
    #  test.gz -> fits.gz (special case)
    #  a.b.c.fits -> fits
    _, path = os.path.split(path)
    if '.' not in path:
        return ''
    stems = path.split('.')[1:]

    # special case: test.fits.gz -> fits.gz
    if len(stems) > 1 and any(x == stems[-1]
                              for x in ['gz', 'gzip', 'bz', 'bz2']):
        return '.'.join(stems[-2:])
    return stems[-1]


[docs]def has_extension(exts): """ A simple default filetype identifier function It returns a function that tests whether its input filename contains a particular extension Parameters ---------- exts : str A space-delimited string listing the extensions (e.g., 'txt', or 'txt csv fits') Returns ------- A function suitable as a factory identifier function """ def tester(x, **kwargs): return _extension(x) in set(exts.split()) return tester
[docs]class LoadLog(object): """ This class attaches some metadata to data created from load_data, so that the data can be re-constructed when loading saved state. It also watches the path for changes, and auto-reloads the data This is an internal class only meant to be used with load_data """ def __init__(self, path, factory, kwargs): self.path = os.path.abspath(path) self.factory = factory self.kwargs = kwargs self.components = [] self.data = [] if auto_refresh(): self.watcher = FileWatcher(path, self.reload) else: self.watcher = None def _log_component(self, component): self.components.append(component) def _log_data(self, data): self.data.append(data)
[docs] def log(self, obj): if isinstance(obj, Component): self._log_component(obj) elif isinstance(obj, Data): self._log_data(obj) obj._load_log = self
[docs] def id(self, component): return self.components.index(component)
[docs] def component(self, index): return self.components[index]
[docs] def reload(self): """ Re-read files, and update data """ try: d = load_data(self.path, factory=self.factory, **self.kwargs) except (OSError, IOError) as exc: warnings.warn("Could not reload %s.\n%s" % (self.path, exc)) if self.watcher is not None: self.watcher.stop() return log = as_list(d)[0]._load_log for dold, dnew in zip(self.data, as_list(d)): if dold.shape != dnew.shape: warnings.warn("Cannot refresh data -- data shape changed") return mapping = dict((c, log.component(self.id(c)).data) for c in dold._components.values() if c in self.components and type(c) == Component) dold.coords = dnew.coords dold.update_components(mapping)
def __gluestate__(self, context): return dict(path=self.path, factory=context.do(self.factory), kwargs=[list(self.kwargs.items())]) @classmethod def __setgluestate__(cls, rec, context): fac = context.object(rec['factory']) kwargs = dict(*rec['kwargs']) d = load_data(rec['path'], factory=fac, **kwargs) return as_list(d)[0]._load_log
[docs]class FileWatcher(object): """ Watch a path for modifications, and perform an action on change """ def __init__(self, path, callback, poll_interval=1000): """ :param path: The path to watch, str :param callback: A function to call when the path changes :param poll_interval: Time to wait between checks, in ms """ self.path = path self.callback = callback self.poll_interval = poll_interval self.watcher = get_timer()(poll_interval, self.check_for_changes) try: self.stat_cache = os.stat(path).st_mtime self.start() except OSError: # file probably gone, no use watching self.stat_cache = None
[docs] def stop(self): self.watcher.stop()
[docs] def start(self): self.watcher.start()
[docs] def check_for_changes(self): try: stat = os.stat(self.path).st_mtime except OSError: warnings.warn("Cannot access %s" % self.path) return if stat != self.stat_cache: self.stat_cache = stat self.callback()
@contract(path='string', factory='callable|None', returns='inst($Data)|list(inst($Data))')
[docs]def load_data(path, factory=None, **kwargs): """Use a factory to load a file and assign a label. This is the preferred interface for loading data into Glue, as it logs metadata about how data objects relate to files on disk. :param path: Path to a file :param factory: factory function to use. Defaults to :func:`auto_data` Extra keywords are passed through to factory functions """ from glue.qglue import parse_data def as_data_objects(ds, lbl): # pack other container types like astropy tables # into glue data objects for d in ds: if isinstance(d, Data): yield d continue for item in parse_data(d, lbl): yield item factory = factory or auto_data lbl = data_label(path) d = as_list(factory(path, **kwargs)) d = list(as_data_objects(d, lbl)) log = LoadLog(path, factory, kwargs) for item in d: if item.label is '': item.label = lbl log.log(item) # attaches log metadata to item for cid in item.primary_components: log.log(item.get_component(cid)) if len(d) == 1: # unpack single-length lists for user convenience return d[0] return d
[docs]def data_label(path): """Convert a file path into a data label, by stripping out slashes, file extensions, etc.""" _, fname = os.path.split(path) name, _ = os.path.splitext(fname) return name
@contract(extension='string', factory='callable') def set_default_factory(extension, factory): # pragma: no cover warnings.warn("set_default_factory is deprecated and no longer has any effect") @contract(extension='string', returns='callable|None') def get_default_factory(extension): # pragma: no cover warnings.warn("get_default_factory is deprecated and will always return None") return None @contract(filename='string')
[docs]def find_factory(filename, **kwargs): from glue.config import data_factory # We no longer try the 'default' factory first because we actually need to # try all identifiers and select the one to use based on the priority. This # allows us to define more specialized loaders take priority over more # general ones. For example, a FITS file that is a dendrogram should be # loaded as a dendrogram, not a plain FITS file. best_priority = None valid_formats = [] # Iterating over the data factory returns the formats sorted by decreasing # alphabetical order then by label (alphabetically) in order to be # deterministic. This is implemented in DataFactoryRegistry.__iter__. for df in data_factory: # Once we've found a match, and iterated through the rest of the # importers with the same priority, we can exit the loop. if best_priority is not None and df.priority < best_priority: break if df.function is auto_data: continue try: is_format = df.identifier(filename, **kwargs) except ImportError: # dependencies missing continue if is_format: valid_formats.append(df) best_priority = df.priority if len(valid_formats) == 0: return None elif len(valid_formats) > 1: labels = ["'{0}'".format(x.label) for x in valid_formats] warnings.warn("Multiple data factories matched the input: {0}. Choosing {1}.".format(', '.join(labels), labels[0])) func = valid_formats[0].function return func
@data_factory(label='Auto', identifier=lambda x: True) @contract(filename='string')
[docs]def auto_data(filename, *args, **kwargs): """Attempt to automatically construct a data object""" fac = find_factory(filename, **kwargs) if fac is None: raise KeyError("Don't know how to open file: %s" % filename) return fac(filename, *args, **kwargs)