X-Git-Url: http://xvm.mit.edu/gitweb/invirt/packages/invirt-web.git/blobdiff_plain/92bbb6ddedd2d02abb7e1289f0a50827429593d5..1b3bce36430d275a1b9148fb63ba601b43c4c1db:/templates/validation.py?ds=sidebyside diff --git a/templates/validation.py b/templates/validation.py new file mode 100644 index 0000000..7c0c3da --- /dev/null +++ b/templates/validation.py @@ -0,0 +1,201 @@ +#!/usr/bin/python + +import getafsgroups +import re +import string +from sipb_xen_database import Machine, NIC +from webcommon import InvalidInput, g + +MAX_MEMORY_TOTAL = 512 +MAX_MEMORY_SINGLE = 256 +MIN_MEMORY_SINGLE = 16 +MAX_DISK_TOTAL = 50 +MAX_DISK_SINGLE = 50 +MIN_DISK_SINGLE = 0.1 +MAX_VMS_TOTAL = 10 +MAX_VMS_ACTIVE = 4 + +def getMachinesByOwner(user, machine=None): + """Return the machines owned by the same as a machine. + + If the machine is None, return the machines owned by the same + user. + """ + if machine: + owner = machine.owner + else: + owner = user.username + return Machine.select_by(owner=owner) + +def maxMemory(user, machine=None, on=True): + """Return the maximum memory for a machine or a user. + + If machine is None, return the memory available for a new + machine. Else, return the maximum that machine can have. + + on is whether the machine should be turned on. If false, the max + memory for the machine to change to, if it is left off, is + returned. + """ + if not on: + return MAX_MEMORY_SINGLE + machines = getMachinesByOwner(user, machine) + active_machines = [x for x in machines if g.uptimes[x]] + mem_usage = sum([x.memory for x in active_machines if x != machine]) + return min(MAX_MEMORY_SINGLE, MAX_MEMORY_TOTAL-mem_usage) + +def maxDisk(user, machine=None): + machines = getMachinesByOwner(user, machine) + disk_usage = sum([sum([y.size for y in x.disks]) + for x in machines if x != machine]) + return min(MAX_DISK_SINGLE, MAX_DISK_TOTAL-disk_usage/1024.) + +def cantAddVm(user): + machines = getMachinesByOwner(user) + active_machines = [x for x in machines if g.uptimes[x]] + if len(machines) >= MAX_VMS_TOTAL: + return 'You have too many VMs to create a new one.' + if len(active_machines) >= MAX_VMS_ACTIVE: + return ('You already have the maximum number of VMs turned on. ' + 'To create more, turn one off.') + return False + +def validAddVm(user): + reason = cantAddVm(user) + if reason: + raise InvalidInput('create', True, reason) + return True + +def haveAccess(user, machine): + """Return whether a user has adminstrative access to a machine""" + if user.username == 'moo': + return True + if user.username in (machine.administrator, machine.owner): + return True + if getafsgroups.checkAfsGroup(user.username, machine.administrator, + 'athena.mit.edu'): #XXX Cell? + return True + if getafsgroups.checkLockerOwner(user.username, machine.owner): + return True + return owns(user, machine) + +def owns(user, machine): + """Return whether a user owns a machine""" + if user.username == 'moo': + return True + return getafsgroups.checkLockerOwner(user.username, machine.owner) + +def validMachineName(name): + """Check that name is valid for a machine name""" + if not name: + return False + charset = string.ascii_letters + string.digits + '-_' + if name[0] in '-_' or len(name) > 22: + return False + for x in name: + if x not in charset: + return False + return True + +def validMemory(user, memory, machine=None, on=True): + """Parse and validate limits for memory for a given user and machine. + + on is whether the memory must be valid after the machine is + switched on. + """ + try: + memory = int(memory) + if memory < MIN_MEMORY_SINGLE: + raise ValueError + except ValueError: + raise InvalidInput('memory', memory, + "Minimum %s MB" % MIN_MEMORY_SINGLE) + if memory > maxMemory(user, machine, on): + raise InvalidInput('memory', memory, + 'Maximum %s MB' % maxMemory(user, machine)) + return memory + +def validDisk(user, disk, machine=None): + """Parse and validate limits for disk for a given user and machine.""" + try: + disk = float(disk) + if disk > maxDisk(user, machine): + raise InvalidInput('disk', disk, + "Maximum %s G" % maxDisk(user, machine)) + disk = int(disk * 1024) + if disk < MIN_DISK_SINGLE * 1024: + raise ValueError + except ValueError: + raise InvalidInput('disk', disk, + "Minimum %s GB" % MIN_DISK_SINGLE) + return disk + +def testMachineId(user, machine_id, exists=True): + """Parse, validate and check authorization for a given user and machine. + + If exists is False, don't check that it exists. + """ + if machine_id is None: + raise InvalidInput('machine_id', machine_id, + "Must specify a machine ID.") + try: + machine_id = int(machine_id) + except ValueError: + raise InvalidInput('machine_id', machine_id, "Must be an integer.") + machine = Machine.get(machine_id) + if exists and machine is None: + raise InvalidInput('machine_id', machine_id, "Does not exist.") + if machine is not None and not haveAccess(user, machine): + raise InvalidInput('machine_id', machine_id, + "You do not have access to this machine.") + return machine + +def testAdmin(user, admin, machine): + if admin in (None, machine.administrator): + return None + if admin == user.username: + return admin + if getafsgroups.checkAfsGroup(user.username, admin, 'athena.mit.edu'): + return admin + if getafsgroups.checkAfsGroup(user.username, 'system:'+admin, + 'athena.mit.edu'): + return 'system:'+admin + return admin + +def testOwner(user, owner, machine): + if owner in (None, machine.owner): + return None + value = getafsgroups.checkLockerOwner(user.username, owner, verbose=True) + if not value: + return owner + raise InvalidInput('owner', owner, value) + +def testContact(user, contact, machine=None): + if contact in (None, machine.contact): + return None + if not re.match("^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}$", contact, re.I): + raise InvalidInput('contact', contact, "Not a valid email.") + return contact + +def testDisk(user, disksize, machine=None): + return disksize + +def testName(user, name, machine=None): + if name in (None, machine.name): + return None + if not Machine.select_by(name=name): + return name + raise InvalidInput('name', name, "Name is already taken.") + +def testHostname(user, hostname, machine): + for nic in machine.nics: + if hostname == nic.hostname: + return hostname + # check if doesn't already exist + if NIC.select_by(hostname=hostname): + raise InvalidInput('hostname', hostname, + "Already exists") + if not re.match("^[A-Z0-9-]{1,22}$", hostname, re.I): + raise InvalidInput('hostname', hostname, "Not a valid hostname; " + "must only use number, letters, and dashes.") + return hostname