import sys
import os
import shutil
import itertools
from functools import cmp_to_key
import re
import pickle
import numpy as np
import pandas as pd
if sys.version_info[0] == 2:
import ConfigParser as configparser
else:
import configparser
from mb.util.general import tostderr
#####################################
#
# GLOBAL CONSTANTS
#
#####################################
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
STATIC_RESOURCES_DIR = os.path.join(ROOT_DIR, 'static_resources')
DEFAULT_PATH = os.path.join(ROOT_DIR, 'config', '.defaults.ini')
CONFIG_PATH = os.path.join(ROOT_DIR, 'config', '.config.ini')
HISTORY_PATH = os.path.join(ROOT_DIR, 'config', '.hist.ini')
DEFAULT = configparser.ConfigParser()
DEFAULT.optionxform = str
DEFAULT.read(DEFAULT_PATH)
DEFAULT_SETTINGS = DEFAULT['settings']
if not os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, 'w') as f:
f.write('[settings]\n\n')
USER = configparser.ConfigParser()
USER.optionxform = str
USER.read(CONFIG_PATH)
USER_SETTINGS = USER['settings']
if not os.path.exists(HISTORY_PATH):
with open(HISTORY_PATH, 'w') as f:
f.write('[settings]\n\n')
HISTORY = configparser.ConfigParser()
HISTORY.optionxform = str
HISTORY.read(HISTORY_PATH)
HISTORY_SETTINGS = HISTORY['settings']
CFLAGS = USER_SETTINGS.get('c_flags', DEFAULT_SETTINGS['c_flags'])
DELIM = [
'.',
'-',
'_',
'+'
]
NON_VAR_CHARS = re.compile('\W')
PUNC = [
",",
".",
"``",
"`",
"--",
"''",
"'",
"...",
"?",
"!",
":",
";",
"(",
")",
"-RRB-",
"-LRB-",
"-LCB-",
"-RCB-"
]
DEFAULT_SEP = ' '
DEFAULT_NA = 'NaN'
#####################################
#
# UTILITY METHODS
#
#####################################
def normalize_class_name(name):
out = NON_VAR_CHARS.sub('', name)
if len(out) > 0:
out = out[0].upper() + out[1:]
return out
def create_classes_from_dir(directory, parent_name=''):
out = []
for static_file in os.listdir(directory):
if not static_file == '__init__.py':
parent_name = normalize_class_name(parent_name)
path = os.path.join(directory, static_file)
static_file_parts = static_file.split(DELIM[0])
if len(static_file_parts) > 1:
descr = ''.join(static_file_parts[:-1]) + '_' + static_file_parts[-1]
else:
descr = static_file_parts[0]
class_name = normalize_class_name(descr)
if os.path.isdir(path):
descr_long = descr + ' directory'
parent_name_cur = parent_name + class_name
out += create_classes_from_dir(path, parent_name=parent_name_cur)
else:
descr_long = descr + ' static file'
class_name = parent_name + class_name
attr_dict = {
'SUFFIX': os.path.basename(path),
'DEFAULT_LOCATION': path,
'DESCR_SHORT': descr,
'DESCR_LONG': descr_long
}
out.append(type(class_name, (StaticResource,), attr_dict))
return out
def read_data(path, read_mode='r', sep=DEFAULT_SEP):
data = None
if os.path.exists(path):
if os.path.isfile(path):
if read_mode == 'auto':
try:
data = data = pd.read_csv(path, sep=sep)
except pd.errors.ParserError:
try:
with open(path, 'r') as f:
data = f.readlines()
except UnicodeDecodeError:
try:
with open(path, 'rb') as f:
data = pickle.load(f)
except EOFError:
pass
elif read_mode == 'pandas':
data = pd.read_csv(path, sep=sep)
elif read_mode == 'r':
with open(path, read_mode) as f:
data = f.readlines()
elif read_mode == 'rb':
with open(path, read_mode) as f:
data = pickle.load(f)
elif read_mode is not None:
raise ValueError('Unrecognized read mode: %s/.' % read_mode)
else:
data = 'Directory target'
return data
def dump_data(data, buffer=None, write_mode='w', sep=' ', na_rep=DEFAULT_NA):
if write_mode is not None:
if write_mode == 'pandas':
data.to_csv(buffer, sep=sep, index=False, na_rep=na_rep)
else:
is_text = write_mode[-1] != 'b'
close_after = False
if buffer is not None:
if isinstance(buffer, str):
buffer = open(buffer, write_mode)
close_after = True
if is_text:
for l in data:
buffer.write(l)
else:
pickle.dump(data, buffer)
if close_after:
buffer.close()
def get_timestamp(path):
if os.path.exists(path):
t = os.path.getmtime(path)
if os.path.isdir(path):
for c in os.listdir(path):
t = max(t, get_timestamp(os.path.join(path, c)))
else:
t = -np.inf
return t
[docs]def increment_delimiters(s):
"""
Increase the depth of all delimiters in **s**.
:prm s: ``str``; the input string
:return: ``str``; **s** with deeper delimiters (shifted towards the end of DELIM)
"""
out = ''
for c in s:
try:
i = DELIM.index(c)
assert i < len(DELIM), 'Cannot increment delimiters for "%s" because it already contains the deepest delimiter "%s". Your target may involve too much nesting.' % (s, DELIM[-1])
out += DELIM[i+1]
except ValueError:
out += c
return out
[docs]def decrement_delimiters(s):
"""
Decrease the depth of all delimiters in **s**.
:prm s: ``str``; the input string
:return: ``str``; **s** with shallower delimiters (shifted towards the beginning of DELIM)
"""
out = ''
for c in s:
try:
i = DELIM.index(c)
assert i < len(DELIM), 'Cannot decrement delimiters for "%s" because it already contains the shallowest delimiter "%s". Your target may involve too much nesting.' % (s, DELIM[0])
out += DELIM[i-1]
except ValueError:
out += c
return out
def prereq_comparator(x, y):
if isinstance(x, str) and isinstance(y, str):
out = 0
elif isinstance(x, str):
out = 1
elif isinstance(y, str):
out = -1
else:
if x.has_shared_suffix and not y.has_shared_suffix:
out = 1
elif y.has_shared_suffix and not x.has_shared_suffix:
out = -1
else:
if x.has_shared_prefix and not y.has_shared_prefix:
out = 1
elif y.has_shared_prefix and not x.has_shared_prefix:
out = -1
else:
if len(x.stem) < len(y.stem):
out = -1
elif len(y.stem) < len(x.stem):
out = 1
else:
out = 0
return out
def other_prereq_type_err_msg(i, j):
return 'Index %d must be < the number of other prereqs (%d)' % (i, j)
def generate_doc(cls, indent=0, indent_size=4):
out = ''
for s in cls.descr_long().split('\n'):
out += ' ' * (indent) + s + '\n'
out += '\n'
if hasattr(cls, 'URL') and cls.url() is not None:
out += '**URL**: `%s <%s>`_\n\n' % (cls.url(), cls.url())
external_resources = [x for x in cls.static_prereq_types() if not isinstance(x, str) and issubclass(x, ExternalResource)]
if len(external_resources) > 0:
out += ' ' * indent + '**External resources**:\n\n'
for x in external_resources:
if isinstance(x, str):
name = x
else:
name = x.infer_paths()[0]
out += ' ' * (indent) + '- ``%s``\n\n' % name
prereqs = cls.stem_prereq_types() + cls.static_prereq_types() + cls.other_prereq_paths(None)
if len(prereqs) > 0:
out += '**Prerequisites**:\n\n'
for i, x in enumerate(prereqs):
if isinstance(x, str):
name = x
elif hasattr(x, 'infer_paths'):
name = x.infer_paths()[0]
else:
if x.is_abstract():
name = x.__name__
else:
name = x.syntax_str()
out += '- ``%s``' % name
if i == 0 and cls.repeatable_prereq():
out += ' (repeatable)'
out += '\n\n'
if not cls.is_abstract():
out += ' ' * indent + '**Syntax**:\n\n'
out += ' ' * (indent) + '``%s``' % cls.syntax_str()
out += '\n\n'
return out
#####################################
#
# METATYPES
#
#####################################
class typemb(type):
def __new__(meta, name, bases, dct):
cls = super(typemb, meta).__new__(meta, name, bases, dct)
cls.__doc__ = generate_doc(cls)
return cls
#####################################
#
# ABSTRACT MB TYPES
#
#####################################
[docs]class MBType(object, metaclass=typemb):
__metaclass__ = typemb
SUFFIX = ''
MANIP = ''
STEM_PREREQ_TYPES = []
STATIC_PREREQ_TYPES = []
ARG_TYPES = []
CONFIG_KEYS = []
FILE_TYPE = 'text' # one of ['text', 'table', 'obj', None], for text, pandas data table, python-readable binary (pickle), or other (non-python-readable) file, respectively
SEP = DEFAULT_SEP # Used managing separator conventions in tabular data
PRECIOUS = False
REPEATABLE_PREREQ = False
ALLOW_SHARED_PREFIX = True
ALLOW_SHARED_SUFFIX = True
DESCR_SHORT = 'data'
DESCR_LONG = (
"Abstract base class for ModelBlocks types."
)
def __init__(self, path):
path = os.path.normpath(path)
if path.endswith(self.suffix()):
if self.suffix() != '':
path = path[:-len(self.suffix())]
self.directory = os.path.dirname(path)
self.basename = os.path.basename(path)
self.stem = self.strip_suffix(self.basename)
self.path = os.path.join(self.directory, self.basename + self.suffix())
self.stem_prereqs_all_paths_src = []
self.stem_prereqs_src = []
self.static_prereqs_all_paths_src = []
self.static_prereqs_src = []
self.other_prereqs_all_paths_src = []
self.other_prereqs_src = []
self.dependents = set()
self.has_shared_prefix_src = False
self.has_shared_suffix_src = False
self.data = None
self.dump = self.precious() or os.path.exists(self.path)
self.graph = None
self.intermediate = not self.dump
self.fn_dry_run = None
self.fn = None
self.finished = False
self.process_scheduler = None
@property
def timestamp(self):
return get_timestamp(self.path)
@property
def max_timestamp(self):
max_timestamp = self.timestamp
for k, old, new in self.config_values():
if old != new:
max_timestamp = np.inf
break
if max_timestamp < np.inf:
for s in self.stem_prereqs_src + self.static_prereqs_src + self.other_prereqs_src:
max_timestamp = max(max_timestamp, s.max_timestamp)
if max_timestamp == self.timestamp == -np.inf:
max_timestamp = np.inf
# if max_timestamp > self.timestamp:
# max_timestamp = np.inf
return max_timestamp
@property
def graph_key(self):
return type(self), self.path, self.has_shared_prefix, self.has_shared_suffix
@property
def has_shared_prefix(self):
return self.has_shared_prefix_src
@property
def has_shared_suffix(self):
return self.has_shared_suffix_src
@property
def args(self):
args = self.parse_path(self.path, has_shared_prefix=self.has_shared_prefix, has_shared_suffix=self.has_shared_suffix)
if args is not None:
if 'basename' in args:
del args['basename']
if 'prereqs' in args:
del args['prereqs']
if 'static_prereqs' in args:
del args['static_prereqs']
return args
@property
def concurrent(self):
return self.process_scheduler is not None
@classmethod
def suffix(cls):
return cls.SUFFIX
@classmethod
def manip(cls):
return cls.MANIP
@classmethod
def stem_prereq_types(cls):
return cls.STEM_PREREQ_TYPES
@classmethod
def augment_prereq(cls, i, path):
return ''
@classmethod
def has_multiple_stem_prereqs(cls):
return len(cls.stem_prereq_types()) > 1 or (len(cls.stem_prereq_types()) == 1 and cls.repeatable_prereq())
@classmethod
def static_prereq_types(cls):
return cls.STATIC_PREREQ_TYPES
@classmethod
def other_prereq_paths(cls, path):
return []
@classmethod
def other_prereq_type(cls, i, path):
return MBType
@classmethod
def other_prereq_types(cls, paths=None):
out = []
for i, path in enumerate(paths):
out.append(cls.other_prereq_type(i, path))
return out
@classmethod
def repeatable_prereq(cls):
return cls.REPEATABLE_PREREQ
@classmethod
def allow_shared_prefix(cls):
return cls.ALLOW_SHARED_PREFIX
@classmethod
def allow_shared_suffix(cls):
return cls.ALLOW_SHARED_SUFFIX
@classmethod
def arg_types(cls):
return cls.ARG_TYPES
@classmethod
def config_keys(cls):
out = []
for x in cls.CONFIG_KEYS:
try:
key, default = x
except TypeError:
key = x
default = None
out.append((key, default))
return out
@classmethod
def config_values(cls):
out = []
for key, default in cls.config_keys():
val_prev = HISTORY_SETTINGS.get(key, None)
if val_prev is not None and key.endswith('_path'):
val_prev = os.path.normpath(val_prev)
val_cur = USER_SETTINGS.get(
key,
DEFAULT_SETTINGS.get(
key,
default
)
)
if val_cur is not None and key.endswith('_path'):
val_cur = os.path.normpath(val_cur)
out.append((key, val_prev, val_cur))
return out
@classmethod
def read_mode(cls):
file_type = cls.file_type()
if file_type == 'text':
return 'r'
elif file_type == 'table':
return 'pandas'
elif file_type == 'python':
return 'rb'
elif file_type == 'auto':
return 'auto'
elif file_type == None:
return None
else:
raise ValueError("Unrecognized file type %s. Must be one of ['text', 'table', 'python', 'auto', None]." % file_type)
@classmethod
def write_mode(cls):
file_type = cls.file_type()
if file_type == 'text':
return 'w'
elif file_type == 'table':
return 'pandas'
elif file_type == 'python':
return 'wb'
elif file_type == 'auto':
return 'auto'
elif file_type == None:
return None
else:
raise ValueError("Unrecognized file type %s. Must be one of ['text', 'table', 'python', 'auto', None]." % file_type)
@classmethod
def file_type(cls):
return cls.FILE_TYPE
@classmethod
def sep(cls):
return cls.SEP
@classmethod
def precious(cls):
return cls.PRECIOUS
@classmethod
def is_text(cls):
return cls.FILE_TYPE == 'text'
@classmethod
def descr_short(cls):
return cls.DESCR_SHORT
@classmethod
def descr_long(cls):
return cls.DESCR_LONG
@classmethod
def assemble(cls, match):
return ''.join(match) + cls.SUFFIX
@classmethod
def inheritors(cls):
out = set()
for c in cls.__subclasses__():
out.add(c)
out |= c.inheritors()
return out
@classmethod
def is_abstract(cls):
return cls.__name__ == 'MBType'
@classmethod
def match(cls, path):
out = not cls.is_abstract()
if out:
suffix = cls.manip() + cls.suffix()
out = path.endswith(suffix)
if out:
prereq_types = cls.stem_prereq_types()
if len(prereq_types) == 0:
out = len(os.path.basename(path)) == len(suffix)
elif len(prereq_types) > 1 or cls.repeatable_prereq():
basenames = path[:-len(suffix)].split(DELIM[0])
prereq_types = prereq_types[:]
if cls.repeatable_prereq():
while len(prereq_types) < len(basenames):
prereq_types.insert(0, prereq_types[0])
out = 0 <= len(basenames) - len(prereq_types) <= 2
return out
@classmethod
def strip_suffix(cls, path):
suffix = cls.manip() + cls.suffix()
if suffix != '':
name_new = path[:-len(suffix)]
else:
name_new = path
return name_new
@classmethod
def parse_args(cls, path):
basename = cls.strip_suffix(path)
out = {'basename': basename}
if len(cls.arg_types()) > 0:
directory = os.path.dirname(basename)
basename_split = os.path.basename(basename).split(DELIM[0])
basename = DELIM[0].join(basename_split[:-1])
out = {'basename': os.path.join(directory, basename)}
argstr = basename_split[-1]
argstr = argstr.split(DELIM[1])
args = [a for a in cls.arg_types() if a.positional]
kwargs = [a for a in cls.arg_types() if not a.positional]
assert len(args) <= len(argstr), 'Expected %d positional arguments, saw %d.' % (len(args), len(argstr))
for arg in args:
s = argstr.pop(0)
out[arg.key] = arg.read(s)
for s in argstr:
out_cur = None
for i in range(len(kwargs)):
kwarg = kwargs[i]
r = kwarg.read(s)
if r is not None:
out_cur = {kwarg.key: r}
kwargs.pop(i)
break
assert out_cur is not None, 'Unrecognized keyword argument %s' % s
out.update(out_cur)
for kwarg in kwargs:
out[kwarg.key] = kwarg.default
return out
@classmethod
def parse_path(cls, path, has_shared_prefix=False, has_shared_suffix=False):
out = None
path = os.path.normpath(path)
if cls.match(path):
out = cls.parse_args(path)
out['prereqs'] = []
prereqs = []
if cls.has_multiple_stem_prereqs():
basename = out['basename']
directory = os.path.dirname(basename)
basename = os.path.basename(basename)
basenames = basename.split(DELIM[0])
if has_shared_prefix:
shared_prefix = decrement_delimiters(basenames[0])
basenames = basenames[1:]
else:
shared_prefix = ''
if len(basenames) < len(cls.stem_prereq_types()):
out = None
else:
if has_shared_suffix:
shared_suffix = decrement_delimiters(basenames[-1])
basenames = basenames[:-1]
else:
shared_suffix = ''
if len(basenames) < len(cls.stem_prereq_types()):
out = None
else:
basenames = [os.path.join(directory, DELIM[0].join(
[y for y in (shared_prefix, decrement_delimiters(x), shared_suffix) if y != '']
)) for x in basenames]
prereq_types = cls.stem_prereq_types()[:]
if cls.repeatable_prereq():
while len(prereq_types) < len(basenames):
prereq_types.insert(0, prereq_types[0])
for b, p in zip(basenames, prereq_types):
prereq_path = b
prereq_path += cls.augment_prereq(0, path)
prereq_path += p.suffix()
prereqs.append(prereq_path)
out['prereqs'] = prereqs
elif len(cls.stem_prereq_types()) == 1:
prereq_path = out['basename']
prereq_path += cls.augment_prereq(0, path)
prereq_path += cls.stem_prereq_types()[0].suffix()
prereqs.append(prereq_path)
out['prereqs'] = prereqs
if out is not None:
out['has_shared_prefix'] = has_shared_prefix
out['has_shared_suffix'] = has_shared_suffix
return out
@classmethod
def syntax_str(cls):
if hasattr(cls, 'infer_paths'):
out = cls.infer_paths()[0]
else:
out = []
if cls.has_multiple_stem_prereqs() and cls.allow_shared_prefix():
out.append('(<SHARED_PRE>)')
for i, x in enumerate(cls.stem_prereq_types()):
name = x.__name__
if i == 0 and cls.repeatable_prereq():
s = '<%s>(.<%s>)*' % (name, name)
else:
s = '<%s>' % name
out.append(s)
if cls.has_multiple_stem_prereqs() and cls.allow_shared_suffix():
out.append('(<SHARED_POST>)')
out = '(<DIR>/)' + DELIM[0].join(out)
if len(cls.arg_types()) > 0:
out += DELIM[0]
arg_str = ['<%s>' % a.syntax_str() for a in cls.arg_types()]
out += DELIM[1].join(arg_str)
out += cls.manip()
out += cls.suffix()
return out
def stem_prereqs_all_paths(self):
return self.stem_prereqs_all_paths_src
def stem_prereqs(self):
return self.stem_prereqs_src
def other_prereqs_all_paths(self):
return self.other_prereqs_all_paths_src
def other_prereqs(self):
return self.other_prereqs_src
def static_prereqs(self):
return self.static_prereqs_src
def set_data(self, data=None):
read_mode = self.read_mode()
if read_mode is not None:
if data is None:
data = read_data(self.path, read_mode=read_mode, sep=self.sep())
self.data = data
def body(self):
print(self)
raise NotImplementedError
def body_args(self):
out = self.stem_prereqs() + self.static_prereqs() + self.other_prereqs()
args = self.args
for a in self.arg_types():
out.append(args[a.key])
out = tuple(out)
return out
def get_garbage(self, force=False):
garbage = set()
build = force or (self.max_timestamp > self.timestamp)
if build:
for s in self.stem_prereqs() + self.other_prereqs():
garbage |= s.get_garbage(force=force)
if self.intermediate and self.dump and not self.precious():
garbage.add(self.path)
return garbage
def get_stale_nodes(self, force=False):
stale_nodes = set()
if force or (self.max_timestamp > self.timestamp):
stale_nodes.add(self.graph_key)
for p in self.stem_prereqs() + self.static_prereqs() + self.other_prereqs():
stale_nodes |= p.get_stale_nodes(force=force)
return stale_nodes
def get(self, dry_run=False, stale_nodes=None, report_up_to_date=False):
if stale_nodes is None:
stale_nodes = set()
if self.graph_key in stale_nodes and not self.finished:
body = self.body()
args = []
for x in self.body_args():
if issubclass(type(x), MBType):
args.append(x.get(dry_run=dry_run, stale_nodes=stale_nodes))
else:
args.append(x)
if not dry_run:
self.set_data()
if isinstance(self.body(), str):
descr = body
mode = self.read_mode()
if dry_run:
def fn(body, descr, dump, path, mode, *args):
tostderr(descr + '\n')
return None
else:
def fn(body, descr, dump, path, mode, *args):
tostderr(descr + '\n')
returncode = os.system(body)
assert returncode == 0, 'Shell execution failed with return code %s' % returncode
data = read_data(path, read_mode=mode, sep=self.sep())
return data
elif hasattr(body, '__call__'):
if self.dump:
descr = 'Computing and dumping %s' % self.path
else:
descr = 'Computing and storing %s' % self.path
mode = self.write_mode()
if dry_run:
def fn(body, descr, dump, path, mode, *args):
tostderr(descr + '\n')
return None
else:
def fn(body, descr, dump, path, mode, *args):
tostderr(descr + '\n')
data = body(*args)
if dump:
dump_data(data, buffer=path, write_mode=mode)
return data
else:
descr = None
mode = None
body = self.data
def fn(body, descr, dump, path, mode, *args):
return body
if self.concurrent:
fn = self.process_scheduler.remote(fn).remote
data = fn(
body,
descr,
self.dump,
self.path,
mode,
*args
)
self.set_data(data)
self.finished = True
else:
if dry_run:
def fn(path, data, report_up_to_date=False):
if report_up_to_date:
tostderr('%s is up to date.\n' % path)
return None
else:
def fn(path, data, report_up_to_date=False):
if report_up_to_date:
tostderr('%s is up to date.\n' % path)
return data
if self.concurrent:
fn = self.process_scheduler.remote(fn).remote
if self.data is None and not dry_run:
self.set_data()
data = fn(self.path, self.data, report_up_to_date=report_up_to_date)
return data
def update_history(self):
for x in self.static_prereqs_src + self.stem_prereqs() + self.other_prereqs():
x.update_history()
for k, _, v in self.config_values():
HISTORY_SETTINGS[k] = v
def set_stem_prereqs(self, prereqs):
self.stem_prereqs_all_paths_src = prereqs
# assert self.stem_prereqs_all_paths_src is not None, 'No recipe to make %s' % self.path
prereqs = []
if self.stem_prereqs_all_paths_src is not None:
for p in self.stem_prereqs_all_paths_src:
# Update dependents
for _p in p:
if issubclass(_p.__class__, MBType):
_p.dependents.add(self)
# Select winning candidate
candidates = sorted(list(p), key=cmp_to_key(prereq_comparator))
if len(candidates) > 0:
prereqs.append(candidates[0])
else:
prereqs.append(None)
self.stem_prereqs_src = prereqs
def set_static_prereqs(self, prereqs):
self.static_prereqs_all_paths_src = prereqs
prereqs = []
if self.static_prereqs_all_paths_src is not None:
for p in self.static_prereqs_all_paths_src:
assert len(p) <= 1, 'Static prereqs cannot be ambiguous, but multiple options were found for %s: %s' % (self, p)
# Update dependents
for _p in p:
if issubclass(_p.__class__, MBType):
_p.dependents.add(self)
candidates = sorted(list(p), key=cmp_to_key(prereq_comparator))
prereqs.append(candidates[0])
self.static_prereqs_src = prereqs
def set_other_prereqs(self, prereqs):
self.other_prereqs_all_paths_src = prereqs
# assert self.other_prereqs_all_paths_src is not None, 'No recipe to make %s' % self.path
prereqs = []
if self.other_prereqs_all_paths_src is not None:
for p in self.other_prereqs_all_paths_src:
# Update dependents
for _p in p:
if issubclass(_p.__class__, MBType):
_p.dependents.add(self)
# Select winning candidate
candidates = sorted(list(p), key=cmp_to_key(prereq_comparator))
prereqs.append(candidates[0])
self.other_prereqs_src = prereqs
def set_dump(self):
dump = False
if isinstance(self.body(), str): # is a shell command and needs to be dumped
dump = True
self.dump = dump
if dump:
for p in self.stem_prereqs_all_paths_src:
for q in p:
q.dump = True
for q in self.static_prereqs_src:
q.dump = True
for p in self.other_prereqs_all_paths_src:
for q in p:
q.dump = True
def exists(self):
return self.data.data is not None
def directories_to_make(self, force=False):
out = set()
build = force or (self.max_timestamp > self.timestamp)
if build:
if self.dump and len(self.directory) > 0 and not os.path.exists(self.directory):
out.add(self.directory)
for p in self.stem_prereqs() + self.static_prereqs() + self.other_prereqs():
out |= p.directories_to_make(force=force)
return out
[docs]class StaticResource(MBType):
FILE_TYPE = 'auto'
DEFAULT_LOCATION = ''
DESCR_LONG = (
"Abstract base class for dependency to a static resource.\n"
)
def __init__(self, path):
super(StaticResource, self).__init__(path)
@property
def max_timestamp(self):
# return self.timestamp
return -np.inf
@classmethod
def default_location(cls):
return cls.DEFAULT_LOCATION
@classmethod
def infer_paths(cls):
path = os.path.normpath(cls.default_location())
return path, path, path
@classmethod
def match(cls, path):
return os.path.exists(os.path.normpath(path))
def body(self):
return self.data
[docs]class ExternalResource(StaticResource):
URL = None
PARENT_RESOURCE = None
DESCR_LONG = (
"Abstract base class for dependency to external resource.\n"
)
def __init__(self):
super(ExternalResource, self).__init__(self.default_location())
self.path, self.rel_path_cur, self.rel_path_prev = self.infer_paths()
self.basename = os.path.basename(self.path)
self.stem = self.strip_suffix(self.basename)
self.directory = os.path.dirname(self.path)
@property
def timestamp(self):
paths = [self.path]
times = []
if self.parent_resource() is not None:
paths.append(self.parent_resource().infer_paths()[0])
for path in paths:
if os.path.exists(path):
t = os.path.getmtime(path)
else:
t = -np.inf
times.append(t)
out = max(times)
return out
@classmethod
def default_location(cls):
return os.path.normpath(DEFAULT_SETTINGS.get(cls.__name__ + '_path', cls.DEFAULT_LOCATION))
@classmethod
def url(cls):
return cls.URL
@classmethod
def parent_resource(cls):
return cls.PARENT_RESOURCE
@classmethod
def config_keys(cls):
out = [(cls.__name__ + '_path', cls.default_location())]
for x in cls.CONFIG_KEYS:
try:
key, default = x
except TypeError:
key = x
default = None
out.append((key, default))
return out
@classmethod
def infer_paths(cls):
path = ''
_, rel_path_prev, rel_path_cur = cls.config_values()[0]
if cls.parent_resource() is not None:
path = os.path.join(path, cls.parent_resource().infer_paths()[0])
path = os.path.join(path, rel_path_cur)
return path, rel_path_cur, rel_path_prev
@classmethod
def ancestor_exists(cls):
c = cls
p = cls.parent_resource()
while p is not None:
c = p
p = p.parent_resource()
return os.path.exists(c.infer_paths()[0])
@classmethod
def is_abstract(cls):
return cls.__name__ == 'ExternalResource'
@classmethod
def match(cls, path):
out = os.path.abspath(path) == os.path.abspath(cls.infer_paths()[0])
return out
[docs]class Repo(ExternalResource):
URL = ''
GIT_URL = ''
DESCR_SHORT = 'the Natural Stories Corpus'
DESCR_LONG = (
'A corpus of naturalistic stories meant to contain varied,\n'
'low-frequency syntactic constructions. There are a variety of annotations\n'
'and psycholinguistic measures available for the stories.'
)
@property
def max_timestamp(self):
max_timestamp = self.timestamp
return max_timestamp
@classmethod
def url(cls):
return cls.URL
@classmethod
def git_url(cls):
return cls.GIT_URL
def body(self):
if self.git_url():
return 'git clone %s %s' % (self.git_url(), self.path)
def out(*args):
warn_str = (
'%s does not exist at the default path (%s),\n'
'but it is not publicly available and cannot be downloaded automatically.'
'You must first acquire it from the source.'
) % USER_SETTINGS.get(
type(self).__name__ + '_path',
DEFAULT_SETTINGS[type(self).__name__ + '_path']
)
tostderr(warn_str)
raise NotImplementedError
return out
#####################################
#
# GENERAL MB TYPES
#
#####################################
[docs]class ParamFile(MBType):
SUFFIX = 'prm.ini'
PRECIOUS = True
DESCR_SHORT = 'param file'
DESCR_LONG = "File containing configuration parameters for building targets."
@classmethod
def match(cls, path):
out = (
path.endswith(cls.suffix()) and
not (
os.path.basename(os.path.dirname(path)) == 'prm' and
os.path.basename(os.path.dirname(os.path.dirname(path))) == 'static_resources'
)
)
return out
@classmethod
def parse_path(cls, path, has_shared_prefix=False, has_shared_suffix=False):
out = None
path = os.path.normpath(path)
if cls.match(path):
out = cls.parse_args(path)
out['prereqs'] = []
out['src'] = os.path.join(
ROOT_DIR,
'static_resources',
'prm',
os.path.basename(out['basename']) + cls.suffix()
)
return out
@classmethod
def other_prereq_paths(cls, path):
if path is None:
return ['(DIR/)<NAME>%s<TYPE>prm%sini' % (DELIM[0], DELIM[0])]
out = [cls.parse_path(path)['src']]
return out
def body(self):
out = 'cp %s %s' % (
self.other_prereqs()[0].path,
self.path
)
return out
[docs]class MBFailure(MBType):
DESCR_SHORT = 'failed target'
DESCR_LONG = (
"Class to represent build failures.\n"
)
def __init__(self, path, cls):
super(MBFailure, self).__init__(path)
self.cls = cls
def cls_suffix(self):
return self.cls.suffix()
def cls_manip(self):
return self.cls.manip()
def cls_stem_prereq_types(self):
return self.cls.stem_prereq_types()
def cls_static_prereq_types(self):
return self.cls.static_prereq_types()
def cls_other_prereq_paths(self):
return self.cls.cls_other_prereq_paths(self.path)
#####################################
#
# OTHER TYPES
#
#####################################
[docs]class Arg(object):
"""
Object representing an argument to a data transform (positional or keyword).
Arguments parameterize all aspects of the transform except the input data,
which should be computed as prerequisites in the dependency graph.
Positional arguments are treated as obligatory, keyword arguments are treated as optional.
:prm key: ``str``; name of the argument
:prm dtype: ``type``; data type of the argument
:prm positional: ``bool``; whether the argument is positional
:prm default: default value for the argument
"""
def __init__(
self,
key,
dtype=str,
positional=False,
default=None,
descr=None
):
self.key = key
self.dtype = dtype
self.positional = positional
self.default = default
if descr is None:
descr = key
self.descr = descr
def read(self, s):
if not self.positional:
if not s.startswith(self.key):
return None
s = s[len(self.key):]
return self.dtype(s)
def syntax_str(self):
if self.positional:
return self.key.upper()
return self.key + 'VAL'
[docs]class SuccessSet(set):
[docs] def add(self, other):
assert not isinstance(other, MBFailure), 'Cannot add a failed target to SuccessSet object'
super(SuccessSet, self).add(other)
[docs]class FailureSet(set):
[docs] def add(self, other):
assert isinstance(other, MBFailure), 'FailureSet object can only contain members of type MBFailure'
super(FailureSet, self).add(other)
class Graph(object):
def __init__(self, target_paths=None, process_scheduler=None):
self.target_paths = None
self.targets_all_paths = None
self.targets = None
self.failed_targets_all_paths = None
self.failed_targets_all_paths = None
self.process_scheduler = process_scheduler
self.nodes = {}
self.build(target_paths)
def __iter__(self):
return self.nodes.__iter__()
def __setitem__(self, key, value):
self.nodes[key] = value
def __getitem__(self, key):
return self.nodes.get(key, None)
@property
def concurrent(self):
return self.process_scheduler is not None
def build(self, targets):
if self.target_paths is None:
self.target_paths = []
if targets is not None:
if isinstance(targets, str):
targets = targets.split()
else:
targets = list(targets)
targets = [os.path.normpath(t) for t in targets]
targets += self.target_paths
targets = sorted(list(set(targets)))
self.target_paths = targets
self.build_graph()
if len(self.failed_targets_all_paths) > 0:
report = 'The following targets failed and will be skipped:\n'
max_num_len = len(str(len(self.failed_target_paths)))
for i, t in enumerate(self.failed_target_paths):
num_pad = max_num_len - len(str(i)) + 1
report += ' ' + '%d. ' % (i + 1) + ' ' * num_pad + '%s\n' % t
report += '\nAttempted dependency paths:\n'
report += '(numbers index dependencies, dashes delimit alternatives)\n'
report += self.report_failure(self.failed_targets_all_paths, indent=2) + '\n'
tostderr(report)
def build_graph(self):
# Compute all dependency paths to target set
targets_all_paths = []
for target_path in self.target_paths:
successes = SuccessSet()
failures = FailureSet()
inheritors = MBType.inheritors() - {MBFailure, StaticResource}
for c in inheritors:
if not c.is_abstract():
if c.has_multiple_stem_prereqs():
parse_settings = []
if c.allow_shared_prefix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
if c.allow_shared_suffix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
parse_settings = list(itertools.product(*parse_settings))
else:
parse_settings = [(False, False)]
for has_shared_prefix_cur, has_shared_suffix_cur in parse_settings:
subgraph = self.build_subgraph(
c,
target_path,
has_shared_prefix=has_shared_prefix_cur,
has_shared_suffix=has_shared_suffix_cur
)
stem = subgraph['stem_prereqs_all_paths']
stat = subgraph['static_prereqs_all_paths']
oth = subgraph['other_prereqs_all_paths']
success_ratio_cur = subgraph['success_ratio']
match_cur = subgraph['match']
if success_ratio_cur == 1:
p = self[(c, target_path, has_shared_prefix_cur, has_shared_suffix_cur)]
if p is None:
if issubclass(c, ExternalResource):
p = c()
else:
p = c(target_path)
p.set_stem_prereqs(stem)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
self.add_node(p)
p.dump = True
p.set_dump()
p.intermediate = False
successes.add(p)
elif match_cur:
p = MBFailure(target_path, c)
p.set_stem_prereqs(stem)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
failures.add(p)
if len(successes) > 0:
to_append = successes
else:
if os.path.exists(target_path):
to_append = SuccessSet({StaticResource(target_path)})
elif len(failures) > 0:
to_append = failures
else:
to_append = FailureSet([target_path])
targets_all_paths.append(to_append)
# Filter out failed dependency paths, report problems if none succeed
exists = [True if isinstance(x, SuccessSet) else False for x in targets_all_paths]
targets_all_paths_tmp = targets_all_paths
targets_all_paths = []
failed_targets_all_paths = []
failed_target_paths = []
for i, x in enumerate(exists):
if x:
targets_all_paths.append(targets_all_paths_tmp[i])
else:
failed_targets_all_paths.append(targets_all_paths_tmp[i])
failed_target_paths.append(self.target_paths[i])
self.targets_all_paths = targets_all_paths
self.targets = []
for t in self.targets_all_paths:
candidates = sorted(list(t), key=cmp_to_key(prereq_comparator))
self.targets.append(candidates[0])
self.failed_targets_all_paths = failed_targets_all_paths
self.failed_target_paths = failed_target_paths
def build_subgraph(self, cls, path, has_shared_prefix=False, has_shared_suffix=False, downstream_paths=None):
if downstream_paths is None:
downstream_paths = set()
downstream_paths.add(path)
stem_prereqs_all_paths = None
static_prereqs_all_paths = None
other_prereqs_all_paths = None
parsed = cls.parse_path(path, has_shared_prefix=has_shared_prefix, has_shared_suffix=has_shared_suffix)
if parsed is not None:
# STATIC PREREQS
static_prereq_types = cls.static_prereq_types()[:]
static_prereqs_all_paths = []
for c in static_prereq_types:
successes = SuccessSet()
failures = FailureSet()
if isinstance(c, str):
prereq_path = os.path.join(STATIC_RESOURCES_DIR, os.path.normpath(c))
c = StaticResource
elif issubclass(c, ExternalResource):
prereq_path = c.infer_paths()[0]
else:
raise ValueError('Class %s is not a valid static prereq but has been requested as one. Fix the type definition.' % c)
if prereq_path not in downstream_paths:
subgraph = self.build_subgraph(
c,
prereq_path,
downstream_paths=downstream_paths.copy()
)
pat = subgraph['stem_prereqs_all_paths']
stat = subgraph['static_prereqs_all_paths']
oth = subgraph['other_prereqs_all_paths']
success_ratio_cur = subgraph['success_ratio']
match_cur = subgraph['match']
if success_ratio_cur == 1:
p = self[(c, prereq_path, False, False)]
if p is None:
if issubclass(c, ExternalResource):
p = c()
else:
p = c(prereq_path)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
self.add_node(p)
p.set_dump()
successes.add(p)
elif match_cur:
p = MBFailure(prereq_path, c)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
failures.add(p)
if len(successes) > 0:
to_append = successes
else:
if os.path.exists(prereq_path):
to_append = SuccessSet({StaticResource(prereq_path)})
elif len(failures) > 0:
to_append = failures
else:
if prereq_path not in downstream_paths:
name = prereq_path
else:
name = prereq_path + ' (cyclic dependency)'
to_append = FailureSet([name])
static_prereqs_all_paths.append(to_append)
has_buildable_prereqs = (len(cls.stem_prereq_types()) > 0) or (len(cls.other_prereq_paths(path)) > 0)
if has_buildable_prereqs:
# PATTERN PREREQS
stem_prereq_types = cls.stem_prereq_types()[:]
if cls.repeatable_prereq():
while len(stem_prereq_types) < len(parsed['prereqs']):
stem_prereq_types.insert(0, stem_prereq_types[0])
stem_prereqs_all_paths = []
for (P, prereq_path) in zip(stem_prereq_types, parsed['prereqs']):
successes = SuccessSet()
failures = FailureSet()
inheritors = P.inheritors()
if not P.is_abstract():
inheritors |= {P}
inheritors -= {MBFailure, StaticResource}
if prereq_path not in downstream_paths:
for c in inheritors:
if not c.is_abstract():
if c.has_multiple_stem_prereqs():
parse_settings = []
if c.allow_shared_prefix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
if c.allow_shared_suffix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
parse_settings = list(itertools.product(*parse_settings))
else:
parse_settings = [(False, False)]
for has_shared_prefix_cur, has_shared_suffix_cur in parse_settings:
subgraph = self.build_subgraph(
c,
prereq_path,
has_shared_prefix=has_shared_prefix_cur,
has_shared_suffix=has_shared_suffix_cur,
downstream_paths=downstream_paths.copy()
)
pat = subgraph['stem_prereqs_all_paths']
stat = subgraph['static_prereqs_all_paths']
oth = subgraph['other_prereqs_all_paths']
success_ratio_cur = subgraph['success_ratio']
match_cur = subgraph['match']
if success_ratio_cur == 1:
p = self[(c, prereq_path, has_shared_prefix_cur, has_shared_suffix_cur)]
if p is None:
if issubclass(c, ExternalResource):
p = c()
else:
p = c(prereq_path)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
self.add_node(p)
p.set_dump()
successes.add(p)
elif match_cur:
p = MBFailure(prereq_path, c)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
failures.add(p)
if len(successes) > 0:
to_append = successes
else:
if os.path.exists(prereq_path):
to_append = SuccessSet({StaticResource(prereq_path)})
elif len(failures) > 0:
to_append = failures
else:
if prereq_path not in downstream_paths:
name = prereq_path
else:
name = prereq_path + ' (cyclic dependency)'
to_append = FailureSet([name])
stem_prereqs_all_paths.append(to_append)
# OTHER PREREQS
other_prereq_paths = cls.other_prereq_paths(path)
other_prereq_types = cls.other_prereq_types(other_prereq_paths)
other_prereqs_all_paths = []
for (P, prereq_path) in zip(other_prereq_types, other_prereq_paths):
prereq_path = os.path.normpath(prereq_path)
successes = SuccessSet()
failures = FailureSet()
inheritors = P.inheritors()
if not P.is_abstract():
inheritors |= {P}
inheritors -= {MBFailure, StaticResource}
if prereq_path not in downstream_paths:
for c in inheritors:
if not c.is_abstract():
if c.has_multiple_stem_prereqs():
parse_settings = []
if c.allow_shared_prefix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
if c.allow_shared_suffix():
parse_settings.append((False, True))
else:
parse_settings.append((False,))
parse_settings = list(itertools.product(*parse_settings))
else:
parse_settings = [(False, False)]
for has_shared_prefix_cur, has_shared_suffix_cur in parse_settings:
subgraph = self.build_subgraph(
c,
prereq_path,
has_shared_prefix=has_shared_prefix_cur,
has_shared_suffix=has_shared_suffix_cur,
downstream_paths=downstream_paths.copy()
)
pat = subgraph['stem_prereqs_all_paths']
stat = subgraph['static_prereqs_all_paths']
oth = subgraph['other_prereqs_all_paths']
success_ratio_cur = subgraph['success_ratio']
match_cur = subgraph['match']
if success_ratio_cur == 1:
p = self[(c, prereq_path, has_shared_prefix_cur, has_shared_suffix_cur)]
if p is None:
if issubclass(c, ExternalResource):
p = c()
else:
p = c(prereq_path)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
self.add_node(p)
p.set_dump()
successes.add(p)
elif match_cur:
p = MBFailure(prereq_path, c)
p.set_stem_prereqs(pat)
p.set_static_prereqs(stat)
p.set_other_prereqs(oth)
p.has_shared_prefix_src = has_shared_prefix_cur
p.has_shared_suffix_src = has_shared_suffix_cur
failures.add(p)
if len(successes) > 0:
to_append = successes
else:
if os.path.exists(prereq_path):
to_append = SuccessSet({StaticResource(prereq_path)})
elif len(failures) > 0:
to_append = failures
else:
if prereq_path not in downstream_paths:
name = prereq_path
else:
name = prereq_path + ' (cyclic dependency)'
to_append = FailureSet([name])
other_prereqs_all_paths.append(to_append)
else:
stem_prereqs_all_paths = []
other_prereqs_all_paths = []
success_ratio_num = 0
success_ratio_denom = 0
for x in static_prereqs_all_paths + stem_prereqs_all_paths + other_prereqs_all_paths:
if isinstance(x, SuccessSet):
success_ratio_num += 1
success_ratio_denom += 1
if success_ratio_denom > 0:
success_ratio = success_ratio_num / success_ratio_denom
else:
success_ratio = 1
else:
success_ratio = 0
match = cls.match(path)
return {
'stem_prereqs_all_paths': stem_prereqs_all_paths,
'static_prereqs_all_paths': static_prereqs_all_paths,
'other_prereqs_all_paths': other_prereqs_all_paths,
'success_ratio': success_ratio,
'match': match
}
def add_node(self, node):
k = node.graph_key
assert not k in self.nodes, 'Attempted to re-insert an existing key: %s' % str(k)
v = node
self[k] = v
node.graph = self
node.process_scheduler = self.process_scheduler
def report_failure(self, targets, indent=0):
out = ''
max_num_len = len(str(len(targets)))
for i, x in enumerate(targets):
num_pad = max_num_len - len(str(i)) + 1
if isinstance(x, FailureSet):
for j, y in enumerate(x):
if isinstance(y, str):
out += ' ' * (indent) + '%d.' % (i + 1) + ' ' * num_pad + '- FAIL: ' + y + '\n'
out += ' ' * (indent + max_num_len + 10) + 'Path does not match any existing constructor\n'
else:
type_name = [y.cls.__name__]
if y.has_shared_prefix:
type_name.append('prefix')
if y.has_shared_suffix:
type_name.append('suffix')
type_name = ', '.join(type_name)
if j == 0:
out += ' ' * (indent) + '%d.' % (i+1) + ' ' * num_pad + '- FAIL: ' + y.path + ' (%s)\n' % type_name
else:
out += ' ' * (indent) + ' ' * (max_num_len + 2) + '- FAIL: ' + y.path + ' (%s)\n' % type_name
out += self.report_failure(y.stem_prereqs_all_paths() + y.other_prereqs_all_paths(), indent=indent + num_pad + 4)
elif isinstance(x, SuccessSet):
if len(x) > 0:
for j, y in enumerate(x):
type_name = [y.__class__.__name__]
if y.has_shared_prefix:
type_name.append('prefix')
if y.has_shared_suffix:
type_name.append('suffix')
type_name = ', '.join(type_name)
if j == 0:
out += ' ' * (indent) + '%d.' % (i+1) + ' ' * num_pad + '- PASS: ' + y.path + ' (%s)\n' % type_name
else:
out += ' ' * (indent) + '-' + ' ' * (max_num_len + 2) + '- PASS: ' + y.path + ' (%s)\n' % type_name
elif isinstance(x, str):
out += ' ' * (indent) + '%d.' % (i+1) + ' ' * num_pad + '- FAIL: ' + x + '\n'
out += ' ' * (indent + max_num_len + 10) + 'Path does not match any existing constructor\n'
return out
def compute_paths(self, node=None):
prereq_sets = set()
if node is not None:
prereqs_all_paths = self.targets_all_paths
else:
prereqs_all_paths = node.prereqs_all
for alt in prereqs_all_paths:
if len(alt) == 0:
return {(node, None)}
deps = []
for dep in alt:
paths = self.compute_paths(dep)
deps.append(paths)
if len(deps) > 1:
deps = set(itertools.product(*deps))
else:
deps = {(x,) for x in deps[0]}
deps = {(node, x) for x in deps}
prereq_sets |= deps
out = prereq_sets
return out
def pretty_print_path(self, path, indent=0):
out = '-' * (indent + 1) + ' ' + type(path[0]).__name__ + ': ' + path[0].path + '\n'
if path[1] is not None:
for p in path[1]:
out += self.pretty_print_path(p, indent=indent + 1)
return out
def pretty_print_paths(self, paths=None, node=None):
assert not (paths is None and node is None), 'pretty_print_paths requires at least 1 non-null argument (paths or node)'
if paths is None:
paths = self.compute_paths(node)
out = ''
for i, p in enumerate(paths):
out += str(i+1) + ')\n'
out += self.pretty_print_path(p)
out += '\n'
return out
def get_stale_nodes(self, force=False, downstream=False):
stale_nodes = set()
for t in self.targets:
stale_nodes |= t.get_stale_nodes(force=force)
if downstream:
downstream_stale_nodes = set()
for k in stale_nodes:
if not k in downstream_stale_nodes:
downstream_stale_nodes |= self.get_downstream_stale_nodes(self[k])
stale_nodes |= downstream_stale_nodes
return stale_nodes
def get_downstream_stale_nodes(self, node, stale_nodes=None):
if stale_nodes is None:
stale_nodes = set()
key = node.graph_key
if key in self:
stale_nodes.add(key)
for d in node.dependents:
stale_nodes |= self.get_downstream_stale_nodes(d, stale_nodes=stale_nodes)
return stale_nodes
def topological_sort_stale_node(self, stale_nodes):
out = []
while len(stale_nodes) > 0:
out_cur = []
for n in list(stale_nodes):
source = True
node = self[n]
for p in node.stem_prereqs() + node.static_prereqs() + node.other_prereqs():
if p.graph_key in stale_nodes:
source = False
break
if source:
out_cur.append(n)
stale_nodes.remove(n)
out_cur = sorted(out_cur, key=lambda x: x[1])
out += out_cur
return out
def get_build_plan(self, directories_to_make, stale_nodes, garbage, indent=0):
out = ' ' * indent + 'Build plan:\n'
out_directories = ''
if len(directories_to_make) > 0:
out_directories += ' ' * (indent + 2) + 'Create directories:\n'
for d in directories_to_make:
out_directories += ' ' * (indent + 4) + '%s\n' % d
out_directories += '\n'
out_stale = ''
if len(stale_nodes) > 0:
out_stale += ' ' * (indent + 2) + 'Compute targets:\n'
sorted_nodes = self.topological_sort_stale_node(stale_nodes.copy())
for n in sorted_nodes:
out_stale += ' ' * (indent + 4) + '%s\n' % n[1]
out_stale += '\n'
out_garbage = ''
if len(garbage) > 0:
out_garbage += ' ' * (indent + 2) + 'Clean up targets:\n'
for g in sorted(list(garbage)):
out_garbage += ' ' * (indent + 4) + '%s\n' % g
out_garbage += '\n'
if out_directories or out_stale or out_garbage:
out += out_directories
out += out_stale
out += out_garbage
else:
out += ' ' * (indent + 2) + 'Nothing to do here!\n'
out += '\n'
return out
def get_garbage(self, force=False):
garbage = set()
for t in self.targets:
garbage |= t.get_garbage(force=force)
return garbage
def update_history(self):
for target in self.targets:
target.update_history()
with open(HISTORY_PATH, 'w') as f:
HISTORY.write(f)
def get(self, dry_run=False, force=False, downstream=True, interactive=False):
# Compute set of stale nodes
stale_nodes = self.get_stale_nodes(force=force, downstream=downstream)
# Compute set of directories to make
directories_to_make = set()
for t in self.targets:
directories_to_make |= t.directories_to_make(force=force)
directories_to_make = sorted(list(directories_to_make))
# Compute list of garbage to collect
garbage = sorted(list(self.get_garbage(force=force)))
if interactive:
tostderr(self.get_build_plan(
directories_to_make,
stale_nodes,
garbage
))
cont = None
while cont is None or (not (not cont.strip() or cont.strip().lower() == 'y') and not cont.strip().lower() == 'n'):
cont = input('Continue? [y]/n > ')
if cont.strip().lower() != 'n':
cont = True
else:
cont = False
tostderr('\n')
else:
cont = True
if cont:
# Run targets
try:
if len(directories_to_make) > 0:
tostderr('Making directories:\n')
for d in directories_to_make:
tostderr(' %s\n' % d)
if not dry_run:
os.makedirs(d)
out = [x.get(dry_run=dry_run, stale_nodes=stale_nodes, report_up_to_date=True) for x in self.targets]
# Update history (links to external resources)
if not dry_run:
self.update_history()
if self.concurrent:
out = self.process_scheduler.get(out)
except BaseException as e:
out = None
tostderr('\n\nModelBlocks runtime error:\n')
tostderr(str(e) + '\n\n\n')
for x in self.targets:
x.intermediate = True
x.PRECIOUS = False
# Clean up intermediate targets
if len(garbage) > 0:
tostderr('Garbage collecting intermediate files:\n')
for p in garbage:
p_str = ' %s' % p
if os.path.isdir(p):
p_str += ' (entire directory)\n'
if dry_run or not os.path.exists(p):
rm = lambda x: x
else:
rm = shutil.rmtree
else:
p_str += '\n'
if dry_run or not os.path.exists(p):
rm = lambda x: x
else:
rm = os.remove
tostderr(p_str)
rm(p)
# Remove any side effects
if os.path.exists(p + DELIM[0] + 'summary'):
if dry_run or not os.path.exists(p):
rm = lambda x: x
else:
rm = os.remove
tostderr(' ' + p + DELIM[0] + 'summary\n')
rm(p + DELIM[0] + 'summary')
if os.path.exists(p + DELIM[0] + 'log'):
if dry_run or not os.path.exists(p):
rm = lambda x: x
else:
rm = os.remove
tostderr(' ' + p + DELIM[0] + 'log\n')
rm(p + DELIM[0] + 'log')
tostderr('\n')
garbage_directories = set()
for d in directories_to_make:
if os.path.exists(d) and not len(os.listdir(d)):
garbage_directories.add(d)
garbage_directories = sorted(list(garbage_directories))
if len(garbage_directories) > 0:
tostderr('Garbage collecting intermediate directories:\n')
for d in garbage_directories:
d_str = ' %s\n' % d
if dry_run or not os.path.exists(d):
rm = lambda x: x
else:
rm = os.rmdir
tostderr(d_str)
rm(d)
else:
out = None
return out