Source code for computer.manager

[docs]class DiskDependency(object): def __init__(self, root, path, size): self.root = root self.path = path self.size = size
[docs] def satisfy_cost(self, server): if server.has_file(self.root, self.path): return 0 return self.size
[docs] def satisfy(self, server): if self.satisfy_cost(server) > 0: server.push_disk(self.root, self.path)
[docs]class Manager(object): def __init__(self): self.priorities = {} # { server: priority } self.processes = {} # { server: set(process) } self.callbacks = {} # { process: callback } self.waiting = Queue() # Q[(root, path, command, dependencies, callback)]
[docs] def add_server(self, server, priority): self.priorities[server] = priority self.processes[server] = []
[docs] def submit(self, root, path, command, dependencies, callback): """ Does not return process (so we can manage). callback only called on check(). """ server = prepare_server(dependencies) if server is None: self.waiting.put((root, path, command, dependencies, callback)) return process = server.start_process(root, path, command) self.processes[server].add(process) self.callbacks[process] = callback
[docs] def process_check(self): for server, processes in self.server_check(): for process in processes: if not process.is_running(): self.callbacks[process](server, process.logfile) process.close()
[docs] def server_check(self): for server in self.processes: processes = self.processes[server] if len(processes) == 0: continue yield server, processes
[docs] def prepare_server(self, dependencies, required=False): """ Finds a server with resources and minimal transfer requirements; transfers additional requirements as needed. Returns server. """ server = select_server(dependencies, required) if server is None: return None for dependency in dependencies: dependency.satisfy(server) return server
[docs] def select_server(self, dependencies, required): options = {} for server in self.priorities: priority = self.priorities[server] processes = self.servers[server] if len(processes) < server.cpus or required: cost = 1000 # Need for CPU penalization for dependency in dependencies: cost += dependency.satisfy_cost(server) # Penalize by the number of active processes cost *= 1 + float(len(processes)) / server.cpus options[server] = cost if len(options) == 0: return return sorted(options, key=newgrades.__getitem__)[0]
[docs] def run_command(self, root, path, command, dependencies): server = prepare_server(dependencies, required=True) return server.run_command(root, path, command)