blob: 55fa808b5563c266078368e750e6f3d8587d12e9 [file] [log] [blame]
"""
Database models for the TestSuite databases themselves.
These are a bit magical because the models themselves are driven by the test
suite metadata, so we only create the classes at runtime.
"""
import datetime
import json
import os
import itertools
import aniso8601
import sqlalchemy
import flask
from sqlalchemy import Float, String, Integer, Column, ForeignKey, Binary, DateTime
from sqlalchemy.orm import relation
from sqlalchemy.orm.exc import ObjectDeletedError
from lnt.util import logger
from . import testsuite
import lnt.testing.profile.profile as profile
import lnt
from lnt.server.ui.util import convert_revision
def _dict_update_abort_on_duplicates(base_dict, to_merge):
'''This behaves like base_dict.update(to_merge) but asserts that none
of the keys in to_merge is present in base_dict yet.'''
for key, value in to_merge.items():
assert base_dict.get(key, None) is None
base_dict[key] = value
class MachineInfoChanged(ValueError):
pass
class TestSuiteDB(object):
"""
Wrapper object for an individual test suites database tables.
This wrapper is somewhat special in that it handles specializing the
metatable instances for the given test suite.
Clients are expected to only access the test suite database tables by going
through the model classes constructed by this wrapper object.
"""
def __init__(self, v4db, name, test_suite):
self.v4db = v4db
self.name = name
self.test_suite = test_suite
# Save caches of the various fields.
self.machine_fields = list(self.test_suite.machine_fields)
self.order_fields = list(self.test_suite.order_fields)
self.run_fields = list(self.test_suite.run_fields)
self.sample_fields = list(sorted(self.test_suite.sample_fields,
key=lambda s: s.schema_index))
self.machine_to_latest_order_cache = {}
sample_field_indexes = dict()
for i, field in enumerate(self.sample_fields):
sample_field_indexes[field.name] = i
self.sample_field_indexes = sample_field_indexes
self.base = sqlalchemy.ext.declarative.declarative_base()
# Create parameterized model classes for this test suite.
class ParameterizedMixin(object):
# Class variable to allow finding the associated test suite from
# model instances.
testsuite = self
# Class variable (expected to be defined by subclasses) to allow
# easy access to the field list for parameterized model classes.
fields = None
def get_field(self, field):
return getattr(self, field.name)
def set_field(self, field, value):
return setattr(self, field.name, value)
def get_fields(self):
result = dict()
for field in self.fields:
value = self.get_field(field)
if value is None:
continue
result[field.name] = value
return result
def set_fields_pop(self, data_dict):
for field in self.fields:
value = data_dict.pop(field.name, None)
self.set_field(field, value)
db_key_name = self.test_suite.db_key_name
class Machine(self.base, ParameterizedMixin):
__tablename__ = db_key_name + '_Machine'
__table_args__ = {'mysql_collate': 'utf8_bin'}
DEFAULT_BASELINE_REVISION = v4db.baseline_revision
fields = self.machine_fields
id = Column("ID", Integer, primary_key=True)
name = Column("Name", String(256), index=True)
# The parameters blob is used to store any additional information
# reported by the run but not promoted into the machine record.
# Such data is stored as a JSON encoded blob.
parameters_data = Column("Parameters", Binary)
# Dynamically create fields for all of the test suite defined
# machine fields.
class_dict = locals()
for item in fields:
iname = item.name
if iname in class_dict:
raise ValueError("test suite defines reserved key %r" % (
iname))
item.column = testsuite.make_machine_column(iname)
class_dict[iname] = item.column
def __init__(self, name_value):
self.id = None
self.name = name_value
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.id, self.name))
@property
def parameters(self):
"""dictionary access to the BLOB encoded parameters data"""
return dict(json.loads(self.parameters_data))
@parameters.setter
def parameters(self, data):
self.parameters_data = json.dumps(sorted(data.items())).encode("utf-8")
def get_baseline_run(self, session):
ts = Machine.testsuite
user_baseline = ts.get_users_baseline(session)
if user_baseline:
return self.get_closest_previously_reported_run(
session, user_baseline.order)
else:
mach_base = Machine.DEFAULT_BASELINE_REVISION
# If we have an int, convert it to a proper string.
if isinstance(mach_base, int):
mach_base = '% 7d' % mach_base
return self.get_closest_previously_reported_run(
session, ts.Order(llvm_project_revision=mach_base))
def get_closest_previously_reported_run(self, session,
order_to_find):
"""
Find the closest previous run to the requested order, for which
this machine also reported.
"""
# FIXME: Scalability! Pretty fast in practice, but still.
ts = Machine.testsuite
# Search for best order.
best_order = None
for order in session.query(ts.Order).\
join(ts.Run).\
filter(ts.Run.machine_id == self.id).distinct():
if order >= order_to_find and \
(best_order is None or order < best_order):
best_order = order
# Find the most recent run on this machine that used
# that order.
closest_run = None
if best_order:
closest_run = session.query(ts.Run)\
.filter(ts.Run.machine_id == self.id)\
.filter(ts.Run.order_id == best_order.id)\
.order_by(ts.Run.start_time.desc()).first()
return closest_run
def set_from_dict(self, data):
data_name = data.pop('name', None)
# This function is not meant for renaming. Abort on mismatch.
if data_name is not None and data_name != self.name:
raise ValueError("Mismatching machine name")
data.pop('id', None)
self.set_fields_pop(data)
self.parameters = data
def __json__(self):
result = dict()
result['name'] = self.name
result['id'] = self.id
_dict_update_abort_on_duplicates(result, self.get_fields())
_dict_update_abort_on_duplicates(result, self.parameters)
return result
class Order(self.base, ParameterizedMixin):
__tablename__ = db_key_name + '_Order'
# We guarantee that our fields are stored in the order they are
# supposed to be lexicographically compared, the rich comparison
# methods rely on this.
fields = sorted(self.order_fields,
key=lambda of: of.ordinal)
id = Column("ID", Integer, primary_key=True)
# Define two common columns which are used to store the previous
# and next links for the total ordering amongst run orders.
next_order_id = Column("NextOrder", Integer, ForeignKey(id))
previous_order_id = Column("PreviousOrder", Integer,
ForeignKey(id))
# This will implicitly create the previous_order relation.
backref = sqlalchemy.orm.backref('previous_order', uselist=False,
remote_side=id)
join = 'Order.previous_order_id==Order.id'
next_order = relation("Order", backref=backref, primaryjoin=join,
uselist=False)
order_name_cache = {}
# Dynamically create fields for all of the test suite defined order
# fields.
class_dict = locals()
for item in self.order_fields:
if item.name in class_dict:
raise ValueError("test suite defines reserved key %r" % (
item.name,))
class_dict[item.name] = item.column = Column(
item.name, String(256))
def __init__(self, previous_order_id=None, next_order_id=None,
**kwargs):
self.previous_order_id = previous_order_id
self.next_order_id = next_order_id
# Initialize fields (defaulting to None, for now).
for item in self.fields:
self.set_field(item, kwargs.get(item.name))
def __repr__(self):
fields = dict((item.name, self.get_field(item))
for item in self.fields)
return '%s_%s(%r, %r, **%r)' % (
db_key_name, self.__class__.__name__,
self.previous_order_id, self.next_order_id, fields)
def as_ordered_string(self):
"""Return a readable value of the order object by printing the
fields in lexicographic order."""
# If there is only a single field, return it.
if len(self.fields) == 1:
return self.get_field(self.fields[0])
# Otherwise, print as a tuple of string.
return '(%s)' % (
', '.join(self.get_field(field)
for field in self.fields),)
@property
def name(self):
return self.as_ordered_string()
def _get_comparison_discriminant(self, b):
"""Return a representative pair of converted revision from self
and b. Order of the element on this pair is the same as the
order of self relative to b.
"""
# SA occasionally uses comparison to check model instances
# versus some sentinels, so we ensure we support comparison
# against non-instances.
if self.__class__ is not b.__class__:
return (0, 1)
# Pair converted revision from self and b.
converted_revisions = map(
lambda item: (
convert_revision(
self.get_field(item), cache=Order.order_name_cache
),
convert_revision(
b.get_field(item), cache=Order.order_name_cache
),
),
self.fields,
)
# Return the first unequal pair, or (0, 0) otherwise.
return next(
itertools.dropwhile(lambda x: x[0] == x[1], converted_revisions),
(0, 0),
)
def __hash__(self):
converted_fields = map(
lambda item: convert_revision(
self.get_field(item), cache=Order.order_name_cache
),
self.fields,
)
return hash(tuple(converted_fields))
def __eq__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] == discriminant[1]
def __ne__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] != discriminant[1]
def __lt__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] < discriminant[1]
def __le__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] <= discriminant[1]
def __gt__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] > discriminant[1]
def __ge__(self, b):
discriminant = self._get_comparison_discriminant(b)
return discriminant[0] >= discriminant[1]
def __json__(self, include_id=True):
result = {}
if include_id:
result['id'] = self.id
_dict_update_abort_on_duplicates(result, self.get_fields())
return result
class Run(self.base, ParameterizedMixin):
__tablename__ = db_key_name + '_Run'
fields = self.run_fields
id = Column("ID", Integer, primary_key=True)
machine_id = Column("MachineID", Integer, ForeignKey(Machine.id),
index=True)
order_id = Column("OrderID", Integer, ForeignKey(Order.id),
index=True)
imported_from = Column("ImportedFrom", String(512))
start_time = Column("StartTime", DateTime)
end_time = Column("EndTime", DateTime)
simple_run_id = Column("SimpleRunID", Integer)
# The parameters blob is used to store any additional information
# reported by the run but not promoted into the machine record.
# Such data is stored as a JSON encoded blob.
parameters_data = Column("Parameters", Binary, index=False,
unique=False)
machine = relation(Machine)
order = relation(Order)
# Dynamically create fields for all of the test suite defined run
# fields.
#
# FIXME: We are probably going to want to index on some of these,
# but need a bit for that in the test suite definition.
class_dict = locals()
for item in fields:
iname = item.name
if iname in class_dict:
raise ValueError("test suite defines reserved key %r" %
(iname,))
item.column = testsuite.make_run_column(iname)
class_dict[iname] = item.column
def __init__(self, new_id, machine, order, start_time, end_time):
self.id = new_id
self.machine = machine
self.order = order
self.start_time = start_time
self.end_time = end_time
self.imported_from = None
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.id, self.machine, self.order,
self.start_time, self.end_time))
@property
def parameters(self):
"""dictionary access to the BLOB encoded parameters data"""
return dict(json.loads(self.parameters_data))
@parameters.setter
def parameters(self, data):
self.parameters_data = json.dumps(sorted(data.items())).encode("utf-8")
def __json__(self, flatten_order=True):
result = {
'id': self.id,
'start_time': self.start_time,
'end_time': self.end_time,
}
# Leave out: machine_id, simple_run_id, imported_from
if flatten_order:
_dict_update_abort_on_duplicates(
result, self.order.__json__(include_id=False))
result['order_by'] = \
','.join([f.name for f in self.order.fields])
result['order_id'] = self.order_id
else:
result['order_id'] = self.order_id
_dict_update_abort_on_duplicates(result, self.get_fields())
_dict_update_abort_on_duplicates(result, self.parameters)
return result
Machine.runs = relation(Run, back_populates='machine',
cascade="all, delete-orphan")
Order.runs = relation(Run, back_populates='order',
cascade="all, delete-orphan")
class Test(self.base, ParameterizedMixin):
__tablename__ = db_key_name + '_Test'
# utf8_bin for case sensitive compare
__table_args__ = {'mysql_collate': 'utf8_bin'}
id = Column("ID", Integer, primary_key=True)
name = Column("Name", String(256), unique=True, index=True)
def __init__(self, name):
self.id = None
self.name = name
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.id, self.name))
def __json__(self, include_id=True):
result = {'name': self.name}
if include_id:
result['id'] = self.id
return result
class Profile(self.base):
__tablename__ = db_key_name + '_Profile'
id = Column("ID", Integer, primary_key=True)
created_time = Column("CreatedTime", DateTime)
accessed_time = Column("AccessedTime", DateTime)
filename = Column("Filename", String(256))
counters = Column("Counters", String(512))
def __init__(self, encoded, config, testid):
self.created_time = datetime.datetime.now()
self.accessed_time = datetime.datetime.now()
if config is not None:
profileDir = config.config.profileDir
prefix = 't-%s-s-' % os.path.basename(testid)
self.filename = \
profile.Profile.saveFromRendered(encoded,
profileDir=profileDir,
prefix=prefix)
p = profile.Profile.fromRendered(encoded)
s = ','.join('%s=%s' % (k, v)
for k, v in p.getTopLevelCounters().items())
self.counters = s[:512]
def getTopLevelCounters(self):
d = dict()
for i in self.counters.split('='):
k, v = i.split(',')
d[k] = v
return d
def load(self, profileDir):
return profile.Profile.fromFile(os.path.join(profileDir,
self.filename))
class Sample(self.base, ParameterizedMixin):
__tablename__ = db_key_name + '_Sample'
fields = list(
sorted(self.sample_fields,
key=lambda x: self.sample_field_indexes[x.name])
)
id = Column("ID", Integer, primary_key=True)
# We do not need an index on run_id, this is covered by the
# compound (Run(ID),Test(ID)) index we create below.
run_id = Column("RunID", Integer, ForeignKey(Run.id), index=True)
test_id = Column("TestID", Integer, ForeignKey(Test.id),
index=True)
profile_id = Column("ProfileID", Integer, ForeignKey(Profile.id))
run = relation(Run)
test = relation(Test)
profile = relation(Profile)
@staticmethod
def get_primary_fields():
"""
get_primary_fields() -> [SampleField*]
Get the primary sample fields (those which are not associated
with some other sample field).
"""
status_fields = set(s.status_field
for s in self.Sample.fields
if s.status_field is not None)
for field in self.Sample.fields:
if field not in status_fields:
yield field
@staticmethod
def get_metric_fields():
"""
get_metric_fields() -> [SampleField*]
Get the sample fields which represent some kind of metric, i.e.
those which have a value that can be interpreted as better or
worse than other potential values for this field.
"""
for field in Sample.fields:
if field.type.name in ['Real', 'Integer']:
yield field
@staticmethod
def get_hash_of_binary_field():
"""
get_hash_of_binary_field() -> SampleField
Get the sample field which represents a hash of the binary
being tested. This field will compare equal iff two binaries
are considered to be identical, e.g. two different compilers
producing identical code output.
Returns None if such a field isn't available.
"""
for field in self.Sample.fields:
if field.name == 'hash':
return field
return None
# Dynamically create fields for all of the test suite defined
# sample fields.
#
# FIXME: We might want to index some of these, but for a different
# reason than above. It is possible worth it to turn the compound
# index below into a covering index. We should evaluate this once
# the new UI is up.
class_dict = locals()
for item in self.sample_fields:
iname = item.name
if iname in class_dict:
raise ValueError("test suite defines reserved key %r" %
(iname,))
item.column = testsuite.make_sample_column(iname,
item.type.name)
class_dict[iname] = item.column
def __init__(self, run, test, **kwargs):
self.id = None
self.run = run
self.test = test
# Initialize sample fields (defaulting to 0, for now).
for item in self.fields:
self.set_field(item, kwargs.get(item.name, None))
def __repr__(self):
fields = dict((item.name, self.get_field(item))
for item in self.fields)
return '%s_%s(%r, %r, %r, **%r)' % (
db_key_name, self.__class__.__name__,
self.id, self.run, self.test, fields)
def __json__(self, flatten_test=False, include_id=True):
result = {}
if include_id:
result['id'] = self.id
# Leave out: run_id
# TODO: What about profile/profile_id?
if flatten_test:
_dict_update_abort_on_duplicates(
result, self.test.__json__(include_id=False))
else:
result['test_id'] = self.test_id
_dict_update_abort_on_duplicates(result, self.get_fields())
return result
Run.samples = relation(Sample, back_populates='run',
cascade="all, delete-orphan")
class FieldChange(self.base, ParameterizedMixin):
"""FieldChange represents a change in between the values
of the same field belonging to two samples from consecutive runs.
"""
__tablename__ = db_key_name + '_FieldChangeV2'
id = Column("ID", Integer, primary_key=True)
old_value = Column("OldValue", Float)
new_value = Column("NewValue", Float)
start_order_id = Column("StartOrderID", Integer,
ForeignKey(Order.id), index=True)
end_order_id = Column("EndOrderID", Integer, ForeignKey(Order.id))
test_id = Column("TestID", Integer, ForeignKey(Test.id))
machine_id = Column("MachineID", Integer, ForeignKey(Machine.id))
field_id = Column("FieldID", Integer,
ForeignKey(testsuite.SampleField.id))
# Could be from many runs, but most recent one is interesting.
run_id = Column("RunID", Integer, ForeignKey(Run.id))
start_order = relation(Order, primaryjoin='FieldChange.'
'start_order_id==Order.id')
end_order = relation(Order, primaryjoin='FieldChange.'
'end_order_id==Order.id')
test = relation(Test)
machine = relation(Machine)
field = relation(testsuite.SampleField)
run = relation(Run)
def __init__(self, start_order, end_order, machine,
test, field_id):
self.start_order = start_order
self.end_order = end_order
self.machine = machine
self.test = test
self.field_id = field_id
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.start_order, self.end_order,
self.test, self.machine, self.field))
def __json__(self):
return {
'id': self.id,
'old_value': self.old_value,
'new_value': self.new_value,
'start_order_id': self.start_order_id,
'end_order_id': self.end_order_id,
'test_id': self.test_id,
'machine_id': self.machine_id,
'field_id': self.field_id,
'run_id': self.run_id,
}
Machine.fieldchanges = relation(FieldChange, back_populates='machine',
cascade="all, delete-orphan")
Run.fieldchanges = relation(FieldChange, back_populates='run',
cascade="all, delete-orphan")
class Regression(self.base, ParameterizedMixin):
"""Regressions hold data about a set of RegressionIndices."""
__tablename__ = db_key_name + '_Regression'
id = Column("ID", Integer, primary_key=True)
title = Column("Title", String(256), unique=False, index=False)
bug = Column("BugLink", String(256), unique=False, index=False)
state = Column("State", Integer)
def __init__(self, title, bug, state):
self.title = title
self.bug = bug
self.state = state
def __repr__(self):
"""String representation of the Regression for debugging.
Sometimes we try to print deleted regressions: in this case
don't die, and return a deleted """
try:
return '{}_{}:"{}"'.format(db_key_name,
self.__class__.__name__,
self.title)
except ObjectDeletedError:
return '{}_{}:"{}"'.format(db_key_name,
self.__class__.__name__,
"<Deleted>")
def __json__(self):
return {
'id': self.id,
'title': self.title,
'bug': self.bug,
'state': self.state,
}
class RegressionIndicator(self.base, ParameterizedMixin):
"""Relates a regression to a fieldchange."""
__tablename__ = db_key_name + '_RegressionIndicator'
id = Column("ID", Integer, primary_key=True)
regression_id = Column("RegressionID", Integer,
ForeignKey(Regression.id), index=True)
field_change_id = Column("FieldChangeID", Integer,
ForeignKey(FieldChange.id))
regression = relation(Regression)
field_change = relation(FieldChange)
def __init__(self, regression, field_change):
self.regression = regression
self.field_change = field_change
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.id, self.regression,
self.field_change))
def __json__(self):
return {
'RegressionIndicatorID': self.id,
'Regression': self.regression,
'FieldChange': self.field_change
}
FieldChange.regression_indicators = \
relation(RegressionIndicator, back_populates='field_change',
cascade="all, delete-orphan")
class ChangeIgnore(self.base, ParameterizedMixin):
"""Changes to ignore in the web interface."""
__tablename__ = db_key_name + '_ChangeIgnore'
id = Column("ID", Integer, primary_key=True)
field_change_id = Column("ChangeIgnoreID", Integer,
ForeignKey(FieldChange.id))
field_change = relation(FieldChange)
def __init__(self, field_change):
self.field_change = field_change
def __repr__(self):
return '%s_%s%r' % (db_key_name, self.__class__.__name__,
(self.id, self.field_change))
class Baseline(self.base, ParameterizedMixin):
"""Baselines to compare runs to."""
__tablename__ = db_key_name + '_Baseline'
__table_args__ = {'mysql_collate': 'utf8_bin'}
id = Column("ID", Integer, primary_key=True)
name = Column("Name", String(32), unique=True)
comment = Column("Comment", String(256))
order_id = Column("OrderID", Integer, ForeignKey(Order.id),
index=True)
order = relation(Order)
def __str__(self):
return "Baseline({})".format(self.name)
self.Machine = Machine
self.Run = Run
self.Test = Test
self.Profile = Profile
self.Sample = Sample
self.Order = Order
self.FieldChange = FieldChange
self.Regression = Regression
self.RegressionIndicator = RegressionIndicator
self.ChangeIgnore = ChangeIgnore
self.Baseline = Baseline
# Create the compound index we cannot declare inline.
sqlalchemy.schema.Index("ix_%s_Sample_RunID_TestID" % db_key_name,
Sample.run_id, Sample.test_id)
def create_tables(self, engine):
self.base.metadata.create_all(engine)
def get_baselines(self, session):
return session.query(self.Baseline).all()
def get_users_baseline(self, session):
try:
baseline_key = lnt.server.ui.util.baseline_key(self.name)
session_baseline = flask.session.get(baseline_key)
except RuntimeError:
# Sometimes this is called from outside the app context.
# In that case, don't get the user's session baseline.
return None
if session_baseline:
return session.query(self.Baseline).get(session_baseline)
return None
def _getIncompatibleFields(self, existing_machine, new_machine):
incompatible_fields = set()
for field in self.machine_fields:
existing_value = existing_machine.get_field(field)
new_value = new_machine.get_field(field)
if new_value is None or existing_value == new_value:
continue
if existing_value is not None:
incompatible_fields.add(field.name)
existing_parameters = existing_machine.parameters
for key, new_value in new_machine.parameters.items():
existing_value = existing_parameters.get(key, None)
if new_value is None or existing_value == new_value:
continue
if existing_value is not None:
incompatible_fields.add(key)
return incompatible_fields
def _updateMachine(self, existing_machine, new_machine):
for field in self.machine_fields:
new_value = new_machine.get_field(field)
if new_value is None:
continue
existing_machine.set_field(field, new_value)
parameters = existing_machine.parameters
for key, new_value in new_machine.parameters.items():
if new_value is None and parameters.get(key, None) is not None:
continue
parameters[key] = new_value
existing_machine.parameters = parameters
def _getOrCreateMachine(self, session, machine_data, select_machine):
"""
_getOrCreateMachine(data, select_machine) -> Machine
Add or create (and insert) a Machine record from the given machine data
(as recorded by the test interchange format).
select_machine strategies:
'match': Abort if the existing machine doesn't match the new machine
data.
'update': Update the existing machine in cases where the new machine
data doesn't match the existing data.
'split': On parameter mismatch create a new machine with a `$NN` suffix
added, or choose an existing compatible machine with such a
suffix.
"""
assert select_machine == 'match' or select_machine == 'update' \
or select_machine == 'split'
# Convert the machine data into a machine record.
machine_parameters = machine_data.copy()
name = machine_parameters.pop('name')
machine = self.Machine(name)
machine_parameters.pop('id', None)
for item in self.machine_fields:
value = machine_parameters.pop(item.name, None)
machine.set_field(item, value)
machine.parameters = machine_parameters
# Look for an existing machine.
existing_machines = session.query(self.Machine) \
.filter(self.Machine.name == name) \
.order_by(self.Machine.id.desc()) \
.all()
# No existing machine? Add one.
if len(existing_machines) == 0:
session.add(machine)
return machine
# Search for a compatible machine.
existing_machine = None
incompatible_fields_0 = []
for m in existing_machines:
incompatible_fields = self._getIncompatibleFields(m, machine)
if len(incompatible_fields) == 0:
existing_machine = m
break
if len(incompatible_fields_0) == 0:
incompatible_fields_0 = incompatible_fields
# All existing machines are incompatible?
if existing_machine is None:
if select_machine == 'split':
# Add a new machine.
session.add(machine)
return machine
if select_machine == 'match':
raise MachineInfoChanged("'%s' on machine '%s' changed." %
(', '.join(incompatible_fields_0),
name))
else:
assert select_machine == 'update'
# Just pick the first and update it below.
existing_machine = existing_machines[0]
self._updateMachine(existing_machine, machine)
return existing_machine
def _getOrCreateOrder(self, session, run_parameters):
"""
_getOrCreateOrder(data) -> Order
Add or create (and insert) an Order record based on the given run
parameters (as recorded by the test interchange format).
The run parameters that define the order will be removed from the
provided ddata argument.
"""
query = session.query(self.Order)
order = self.Order()
# First, extract all of the specified order fields.
for item in self.order_fields:
value = run_parameters.pop(item.name, None)
if value is None:
# We require that all of the order fields be present.
raise ValueError("Supplied run is missing parameter: %r" %
(item.name))
query = query.filter(item.column == value)
order.set_field(item, value)
# Execute the query to see if we already have this order.
existing = query.first()
if existing is not None:
return existing
# If not, then we need to insert this order into the total ordering
# linked list.
# Add the new order and commit, to assign an ID.
session.add(order)
session.commit()
# Load all the orders and sort them to form the total ordering.
orders = sorted(session.query(self.Order))
# Find the order we just added.
index = orders.index(order)
# Insert this order into the linked list which forms the total
# ordering.
if index > 0:
previous_order = orders[index - 1]
previous_order.next_order_id = order.id
order.previous_order_id = previous_order.id
if index + 1 < len(orders):
next_order = orders[index + 1]
next_order.previous_order_id = order.id
order.next_order_id = next_order.id
return order
def _getOrCreateRun(self, session, run_data, machine, merge):
"""
_getOrCreateRun(session, run_data, machine, merge) -> Run, bool
Add a new Run record from the given data (as recorded by the test
interchange format).
merge comes into play when there is already a run with the same order
fields:
- 'reject': Reject submission (raise ValueError).
- 'replace': Remove the existing submission(s), then add the new one.
- 'append': Add new submission.
The boolean result indicates whether the returned record was
constructed or not.
"""
# Extra the run parameters that define the order.
run_parameters = run_data.copy()
# Ignore incoming ids; we will create our own
run_parameters.pop('id', None)
# Added by REST API, we will replace as well.
run_parameters.pop('order_by', None)
run_parameters.pop('order_id', None)
run_parameters.pop('machine_id', None)
run_parameters.pop('imported_from', None)
run_parameters.pop('simple_run_id', None)
# Find the order record.
order = self._getOrCreateOrder(session, run_parameters)
new_id = None
if merge != 'append':
existing_runs = session.query(self.Run) \
.filter(self.Run.machine_id == machine.id) \
.filter(self.Run.order_id == order.id) \
.all()
if len(existing_runs) > 0:
if merge == 'reject':
raise ValueError("Duplicate submission for '%s'" %
order.name)
elif merge == 'replace':
for previous_run in existing_runs:
logger.info("Duplicate submission for order %r: "
"deleting previous run %r" %
(order, previous_run))
# Keep the latest ID so the URL is still valid on replace
new_id = previous_run.id
session.delete(previous_run)
else:
raise ValueError('Invalid Run mergeStrategy %r' % merge)
# We'd like ISO8061 timestamps, but will also accept the old format.
try:
start_time = aniso8601.parse_datetime(run_data['start_time'])
except ValueError:
start_time = datetime.datetime.strptime(run_data['start_time'],
"%Y-%m-%d %H:%M:%S")
run_parameters.pop('start_time')
try:
end_time = aniso8601.parse_datetime(run_data['end_time'])
except ValueError:
end_time = datetime.datetime.strptime(run_data['end_time'],
"%Y-%m-%d %H:%M:%S")
run_parameters.pop('end_time')
run = self.Run(new_id, machine, order, start_time, end_time)
# First, extract all of the specified run fields.
for item in self.run_fields:
value = run_parameters.pop(item.name, None)
run.set_field(item, value)
# Any remaining parameters are saved as a JSON encoded array.
run.parameters = run_parameters
session.add(run)
return run
def _importSampleValues(self, session, tests_data, run, config):
# Load a map of all the tests, which we will extend when we find tests
# that need to be added.
# Downcast to str, so we match on MySQL.
test_cache = dict((test.name, test)
for test in session.query(self.Test))
field_dict = dict([(f.name, f) for f in self.sample_fields])
all_samples_to_add = []
is_profile_only = lambda td : len(td) == 2 and 'profile' in td
for test_data in tests_data:
if is_profile_only(test_data):
# Ignore for now profile data without other metrics
continue
name = test_data['name']
test = test_cache.get(name)
if test is None:
test = self.Test(test_data['name'])
test_cache[name] = test
session.add(test)
samples = []
for key, values in test_data.items():
if key == 'name' or key == "id" or key.endswith("_id"):
continue
field = field_dict.get(key)
if field is None and key != 'profile':
raise ValueError("test %s: Metric '%s' unknown in suite " %
(name, key))
if not isinstance(values, list):
values = [values]
while len(samples) < len(values):
sample = self.Sample(run, test)
samples.append(sample)
all_samples_to_add.append(sample)
for sample, value in zip(samples, values):
if key == 'profile':
sample.profile = self.Profile(value, config, name)
else:
sample.set_field(field, value)
for test_data in tests_data:
if not is_profile_only(test_data):
continue
name = test_data['name']
test = test_cache.get(name)
tests = [test_cache[test_name] for test_name in test_cache \
if test_name.startswith(name + '.test:')]
if test is not None:
tests.append(test)
value = test_data['profile']
new_profile = self.Profile(value, config, name)
count = 0
for test in tests:
sample_exist = False
for sample in all_samples_to_add:
if sample.test == test:
if sample.profile is None:
sample.profile = new_profile
count += 1
sample_exist = True
else:
logger.warning('Test %s already contains the profile data. ' \
'Profile %s was ignored.', test.name, name)
if not sample_exist:
logger.warning('The test %s is invalid. It contains the profile, ' \
'but no any samples. Consider removing it.', test.name)
if count == 0:
logger.warning('Cannot find test(s) for the profile %s', name)
else:
logger.info('The profile %s was added to %d test(s).', name, count)
session.add_all(all_samples_to_add)
def importDataFromDict(self, session, data, config, select_machine,
merge_run):
"""
importDataFromDict(session, data, config, select_machine, merge_run)
-> Run (or throws ValueError exception)
Import a new run from the provided test interchange data, and return
the constructed Run record. May throw ValueError exceptions in cases
like mismatching machine data or duplicate run submission with
merge_run == 'reject'.
"""
machine = self._getOrCreateMachine(session, data['machine'],
select_machine)
run = self._getOrCreateRun(session, data['run'], machine, merge_run)
self._importSampleValues(session, data['tests'], run, config)
return run
# Simple query support (mostly used by templates)
def machines(self, session, name=None):
q = session.query(self.Machine)
if name:
q = q.filter_by(name=name)
return q
def getMachine(self, session, id):
return session.query(self.Machine).filter_by(id=id).one()
def getRun(self, session, id):
return session.query(self.Run).filter_by(id=id).one()
def get_adjacent_runs_on_machine(self, session, run, N, direction=-1):
"""
get_adjacent_runs_on_machine(run, N, direction=-1) -> [Run*]
Return the N runs which have been submitted to the same machine and are
adjacent to the given run.
The actual number of runs returned may be greater than N in situations
where multiple reports were received for the same order.
The runs will be reported starting with the runs closest to the given
run's order.
The direction must be -1 or 1 and specified whether or not the
preceeding or following runs should be returned.
"""
assert N >= 0, "invalid count"
assert direction in (-1, 1), "invalid direction"
if N == 0:
return []
# The obvious algorithm here is to step through the run orders in the
# appropriate direction and yield any runs on the same machine which
# were reported at that order.
#
# However, this has one large problem. In some cases, the gap between
# orders reported on that machine may be quite high. This will be
# particularly true when a machine has stopped reporting for a while,
# for example, as there may be large gap between the largest reported
# order and the last order the machine reported at.
#
# In such cases, we could end up executing a large number of individual
# SA object materializations in traversing the order list, which is
# very bad.
#
# We currently solve this by instead finding all the orders reported on
# this machine, ordering those programatically, and then iterating over
# that. This performs worse (O(N) instead of O(1)) than the obvious
# algorithm in the common case but more uniform and significantly
# better in the worst cast, and I prefer that response times be
# uniform. In practice, this appears to perform fine even for quite
# large (~1GB, ~20k runs) databases.
# Find all the orders on this machine, then sort them.
#
# FIXME: Scalability! However, pretty fast in practice, see elaborate
# explanation above.
all_machine_orders = sorted(
session.query(self.Order)
.join(self.Run)
.filter(self.Run.machine == run.machine)
.distinct()
.all()
)
# Find the index of the current run.
index = all_machine_orders.index(run.order)
# Gather the next N orders.
if direction == -1:
orders_to_return = all_machine_orders[max(0, index - N):index]
else:
orders_to_return = all_machine_orders[index+1:index+N]
# Get all the runs for those orders on this machine in a single query.
ids_to_fetch = [o.id
for o in orders_to_return]
if not ids_to_fetch:
return []
runs = session.query(self.Run).\
filter(self.Run.machine == run.machine).\
filter(self.Run.order_id.in_(ids_to_fetch)).all()
# Sort the result by order, accounting for direction to satisfy our
# requirement of returning the runs in adjacency order.
#
# Even though we already know the right order, this is faster than
# issueing separate queries.
runs.sort(key=lambda r: r.order, reverse=(direction == -1))
return runs
def get_previous_runs_on_machine(self, session, run, N):
return self.get_adjacent_runs_on_machine(session, run, N, direction=-1)
def get_next_runs_on_machine(self, session, run, N):
return self.get_adjacent_runs_on_machine(session, run, N, direction=1)
def __repr__(self):
return "TestSuiteDB('%s')" % self.name
def getNumMachines(self, session):
return session.query(sqlalchemy.func.count(self.Machine.id)).scalar()
def getNumRuns(self, session):
return session.query(sqlalchemy.func.count(self.Run.id)).scalar()
def getNumSamples(self, session):
return session.query(sqlalchemy.func.count(self.Sample.id)).scalar()
def getNumTests(self, session):
return session.query(sqlalchemy.func.count(self.Test.id)).scalar()
def get_field_index(self, sample_field):
return self.sample_field_indexes[sample_field.name]