"""
.. inheritance-diagram:: pyopus.parallel.cooperative
:parts: 1
**Cooperative multitasking OS with task outsourcing (PyOPUS subsystem name: COS)**
This module is based on the :mod:`greenlet` module. Concurrent tasks can be created
in a UNIX-like fashion (with the :meth:`Spawn` method). The return value of a task
or multiple tasks is collected with the :meth:`Join` method. Joins can be blocking
or nonblocking.
The cooperative multitasking OS takes care of outsourcing the tasks to computing
nodes if it is permitted to do this and there are computing nodes available.
Outsourcing uses the virtual machine specified by calling the :meth:`setVM` method.
If no virtual machine is specified outsourcing is not possible.
COS makes it possible to implement multilevel parallelism and asynchronous
algorithms in a simple manner. Parallel algorithms can be run on a single CPU
by taking advantage of the :mod:`greenlet` module for providing the microthread
functionality. Every local task is a microthread that runs concurrently with
other microthreads. The microthreads are cooperatively scheduled by COS.
Of course such a run is slower than a real parallel run involving multiple
tasks across multiple processors.
"""
from greenlet import greenlet
from pyopus.parallel.base import MsgTaskResult, MsgTaskExit, translateToAbstractPath, translateToActualPath
from ..misc import identify
from ..misc.debug import DbgMsg, DbgMsgOut
from .. import PyOpusError
__all__ = [ 'cOS', 'Task', 'SysCall', 'Spawn', 'Yield', 'Join', 'Scheduler', 'OpusOS' ]
# System calls
# Base class
[docs]class SysCall(object):
"""
Base class for COS system calls.
"""
def handle(self, sched, task):
raise PyOpusError("COS: This is an abstract class.")
# Spawn a child task
[docs]class Spawn(SysCall):
"""
System call for spawning a child task.
* *func* - function defining the task
* *args* - positional arguments to the task's function
* *kwargs* - keyword arguments to the task's function
* *remote* - ``True`` if a remote spawn is requested. Spawns a local
task if the scheduler has no VM object or *remote* is ``False``.
* *targetHostID* - specifies the :class:`HostID` of the host where
the remote task should be started. ``None`` corresponds to any host.
* *block* - ``True`` if a remote spawn should block until a slot is
free. Has no effect for local spawns. A nonblocking remote spawn
returns tid=-1 on failure.
* *enqueue* - ``True`` if a remote spawn should be queued if no slot
is available. Setting *block* to ``True`` implies *enqueue* is also
``True`` regardless of the value passed as *enqueue*.
* *extendedReturnValue* - when set to ``True`` the return value is a
tuple of the form ``(tid, taskID, hostID)``. For local tasks
``taskID`` and ``hostII`` are always ``None``. For remote tasks
they contain the :class:`TaskID` and :class:`HostID` objects
corresponding to the remote task. If a task is enqueued for
remote execution with a nonblocking spawn ``taskID`` is ``None``
while ``hostID`` is equal to *targetHostID*.
Returns the tid of a spawned task.
"""
def __init__(self, func, args=[], kwargs={}, remote=False, targetHostID=None, block=True, enqueue=True, extendedReturnValue=False):
self.func=func
self.remote=remote
self.targetHostID=targetHostID
self.block=block
self.enqueue=enqueue or block is True
self.args=args
self.kwargs=kwargs
self.extendedReturnValue=extendedReturnValue
def handle(self, sched, task):
# Check if we can hope to spawn remotely, if not now then at least sometime in the future
if (
self.remote and # Remote spawn requested
sched.vm is not None and # Have a VM
sched.vm.freeSlots()+len(sched.remoteTasks)>0 # Have at least one slot
):
# Remote spawn
newTask=sched.new(parent=task, func=self.func, remote=True, targetHostID=self.targetHostID, args=self.args, kwargs=self.kwargs)
if newTask is None:
# Failed to spawn, no slots for a remote task
if self.enqueue:
# Blocking spawn
# Enqueue this spawn
sched.enqueueSpawn(self, task, self.targetHostID)
# Nonblocking enqueued spawn
if not self.block:
if self.extendedReturnValue:
task.sendval=newTask.tid, None, self.targetHostID
else:
task.sendval=newTask.tid
# Schedule spawner task
sched.schedule(task)
else:
# Do not enqueue
if self.extendedReturnValue:
task.sendval=-1, None, None
else:
task.sendval=-1
# Schedule spawner task
sched.schedule(task)
else:
# Success, return tid
if self.extendedReturnValue:
task.sendval=newTask.tid, newTask.remoteTaskID, newTask.remoteHostID
else:
task.sendval=newTask.tid
# Schedule spawner task
sched.schedule(task)
else:
# Local spawn
newTask=sched.new(parent=task, func=self.func, remote=False, args=self.args, kwargs=self.kwargs)
if self.extendedReturnValue:
task.sendval=newTask.tid, None, None
else:
task.sendval=newTask.tid
# Schedule spawner task
sched.schedule(task)
# Yield control to the scheduler
[docs]class Yield(SysCall):
"""
A system call for yielding the control to the scheduler.
The scheduler switches the context to the next scheduled microthread.
"""
def handle(self, sched, task):
sched.schedule(task)
# Join a child task
[docs]class Join(SysCall):
"""
A system call for joining a child task.
The oldest chid is joined (the one with the lowest tid).
Return the task's tid and its return value.
* *tidlist* - list of task IDs that we are waiting on.
Empty list waits on all child tasks.
* *block* - ``True`` if the call should block until a task can
be joined
Returns a dictionary with tid for key holding the return value
of the task that was joined. The dictionary has a single entry.
Returns an empty dictionary if there are no children.
Failed nonblocking join returns an empty dictionary.
"""
def __init__(self, tidList=[], block=True):
self.tidList=tidList
self.block=block
def handle(self, sched, task):
# Prepare an empty return value for the syscall
task.sendval={}
# No finished child found
finishedChild=None
# Empty tid list
if len(self.tidList)==0:
# No children
if task.nchildren+len(task.finishedChildren)==0:
# Schedule the task again
sched.schedule(task)
# Return an empty sendval
return
# A finished child exists
if len(task.finishedChildren)>0:
# Get the oldest child
# childTid=list(task.finishedChildren.keys())[0]
childTid=min(task.finishedChildren.keys())
finishedChild=task.finishedChildren[childTid]
else:
# Verify if listed children exist, verify if they are really children
for tid in self.tidList:
# Waiting on tid 0 (main task) is not allowed
if tid==0:
task.localThrow(Exception, "COS: Waiting on task 0 (main) is not allowed.")
if tid in sched.tasks:
# Child is running
child=sched.tasks[tid]
elif tid in task.finishedChildren:
# Child is finished, get it
child=task.finishedChildren[tid]
if finishedChild is None or finishedChild.tid>child.tid:
finishedChild=child
else:
task.localThrow(Exception, "COS: Child %d does not exist." % tid)
if child.ptid!=task.tid:
task.localThrow(Exception, "COS: Task %d is not a child of %d." % (tid, task.tid))
# Finished child found
if finishedChild is not None:
# Remove it
del task.finishedChildren[finishedChild.tid]
# Construct return value and schedule the task
task.sendval[finishedChild.tid]=finishedChild.retval
sched.schedule(task)
return
# If the call is nonblocking, return an empty sendval
if not self.block:
# Schedule the task again
sched.schedule(task)
# Return an empty sendval
return
else:
# No finished child found, stop task because the call is blocking
task.waitingOn=self.tidList
task.status=Task.Swaiting
# Task wrapper
[docs]class Task(object):
"""
Task wrapper object. Wraps one microthread or one remote task.
Arguments:
* *parent* - parent task. Should be ``None`` for teh main task.
* *greenlet* - greenlet of a local task
* *remoteTaskID* - :class:`TaskID` object of a remote task
* *remoteHostID* - :class:`HostID` object of a remote task
* *name* - task name. If ``None`` a name is generated.
from *greenlet* or *remoteTaskID*
* *args* - positional arguments to the greenlet passed at startup
* *kwargs* - keyword arguments to the greenlet passed at startup
If *args* and *kwargs* are ``None`` the greenlet is assumed to be
already running.
Members:
* ``tid`` - task id
* ``ptid`` - parent task id
* ``remoteTaskID`` - :class:`TaskID` object of a remote task
* ``remoteHostID`` - :class:`HostID` object of a renmote task
* ``nchildren`` - number of children
* ``finishedChildren`` - dictionary of finished children waiting to be
joined. The key is the tid of a child.
* ``sendval`` - return value passed to the task at switch
* ``retval`` - value returned by the finished task
* ``waitingOn`` - list of child task ids the task is waiting on
* ``status`` - 0 (created), 1 (running), 2 (stopped), 3 (finished)
* ``scheduled`` - number of times a local task is enqueued for execution.
Always 0 for remote tasks.
"""
Screated=0
Srunning=1
Swaiting=2
Sfinished=3
# 0 is the scheduler, 1 is the main task (created at scheduler startup)
taskid=1
def __init__(self, parent, greenlet=None, remoteTaskID=None, remoteHostID=None, name=None, func=None, args=[], kwargs={}):
if greenlet is not None:
# Local task
self.remoteTaskID=None
self.remoteHostID=None
self.greenlet=greenlet
if args is None and kwargs is None:
self.status=Task.Srunning
#args=[]
#kwargs={}
else:
self.status=Task.Screated
else:
# Remote task, assume it is already running
self.greenlet=None
self.remoteTaskID=remoteTaskID
self.remoteHostID=remoteHostID
self.name=name if name is not None else str(remoteTaskID)
# Remote tasks are already running
self.status=Task.Srunning
self.func=func
self.args=args
self.kwargs=kwargs
self.ptid=parent.tid if parent is not None else None
self.scheduled=0
if name is not None:
self.name=name
elif func is not None:
self.name=str(func)
elif greenlet is not None:
self.name=str(greenlet)
else:
self.name=str(remoteTaskID)
self.tid=Task.taskid
Task.taskid+=1
if parent is not None:
parent.nchildren+=1
self.nchildren=0
self.finishedChildren={}
self.sendval=None
self.retval=None
self.waitingOn=None
#def __repr__(self):
# return str(self.tid)+"("+str(self.remoteTaskID)+")"
[docs] def switchToLocal(self):
"""
Switch control to the local task (microthread) represented by this object.
"""
# Is the task running
if self.greenlet is None:
raise PyOpusError("COS: Cannot switch to remote task tid=%d." % self.tid)
if self.status==Task.Screated:
# Send arguments (task startup)
self.status=Task.Srunning
retval=self.greenlet.switch(*self.args, **self.kwargs)
else:
# Send the value to task
retval=self.greenlet.switch(self.sendval)
# Check if it is finished
if self.greenlet.dead:
self.status=Task.Sfinished
# Reset value that will be sent at next switch
self.sendval=None
return retval
[docs] def localThrow(self, exception, value):
"""
Throws an *exception* with value *value* in the local task represented
by this object.
"""
if self.greenlet is None:
raise PyOpusError("COS: Cannot switch to a remote task.")
self.greenlet.throw(exception, value)
# Scheduler
[docs]class Scheduler(object):
"""
Cooperative multitasking scheduler based on greenlets.
*vm* - virtual machine abstraction for spawning remote tasks.
The main loop of the scheduler is entered by calling
the scheduler object.
"""
# Initially main task is active
def __init__(self, vm=None, debug=0):
# Task ID of scheduler
self.tid=0
# The scheduler is the active task
self.activeTask=self.tid
# Local task queue
self.ready=[]
# All tasks that are running (i.e. not finished), tid for key
self.tasks={}
# Remote tasks that are not finished, remote task ID for key
self.remoteTasks={}
# A queue for spawn system calls waiting on a free remote slot
self.waitingOnSlot=[]
# A queue for spawn system calls waiting on a free remote slot
# on a given HostID
self.waitingOnSlotOnHost={}
# Set VM
self.setVM(vm)
# Debug level
self.debug=debug
[docs] def setDebug(self, debug=0):
"""
Set the debug message level to *debug*.
Setting this value to 0 turns off debugging.
"""
self.debug=debug
[docs] def countTasks(self):
"""
Returns the number of running tasks including the scheduler
and the main task.
"""
return len(self.tasks)
[docs] def countRemoteTasks(self):
"""
Returns the number of running remote tasks.
"""
return len(self.remoteTasks)
[docs] def countLocalTasks(self):
"""
Returns the number of running local tasks including the scheduler
and the main task.
"""
return len(self.tasks)-len(self.remoteTasks)
[docs] def setVM(self, vm):
"""
Sets the VM abstraction object used for spawning remote tasks.
Allowed only when no remote tasks are running.
Setting a VM object on remote task has no effect.
"""
if len(self.remoteTasks)>0:
raise PyOpusError("COS: Remote tasks running. Cannot replace VM.")
if vm is not None and vm.parentTaskID().valid():
# Valid parent task, this is a slave
# Do not allow spawning remote tasks
self.vm=None
elif vm is not None and vm.slots()<2:
# Need at lest 2 slots for remote spawning (1 for master and 1 for worker)
self.vm=None
else:
self.vm=vm
[docs] def new(self, parent, func, args=[], kwargs={}, remote=False, targetHostID=None):
"""
Create a new task.
Arguments:
* *parent* - parent task object
* *func* - function defining the task
* *args* - positional arguments to the task's function
* *kwargs* - keyword arguments to the task's function
* *remote* - ``True`` for a remote task
* *targetHostID* - :class:`HostID` object corresponding to the
host where a remote task should be started. ``None``
corresponds to any host
Returns a :class:`Task` object or ``None`` on failure.
"""
if not remote or self.vm is None:
# Create a greenlet
g=greenlet(func)
task=Task(parent, greenlet=g, func=func, args=args, kwargs=kwargs)
self.tasks[task.tid]=task
self.schedule(task)
else:
# Spawn a remote task
if self.vm.freeSlots()<=0:
return None
taskIDs=self.vm.spawnFunction(func,
args=args, kwargs=kwargs,
count=1,
targetList=[targetHostID] if targetHostID is not None else None,
sendBack=True
)
if len(taskIDs)>0:
remoteTaskID=taskIDs[0]
remoteHostID=self.vm.hostID(remoteTaskID)
else:
return None
task=Task(parent, remoteTaskID=remoteTaskID, remoteHostID=remoteHostID, func=func, args=args, kwargs=kwargs)
self.tasks[task.tid]=task
self.remoteTasks[remoteTaskID]=task
# Do not schedule a remote task
return task
[docs] def schedule(self, task):
"""
Schedules a local *task* for execution.
"""
if task.greenlet is None:
raise PyOpusError("Trying to schedule remote task tid=%d." % task.tid)
if task.status is not Task.Srunning and task.status is not Task.Screated:
raise PyOpusError("Trying to schedule task tid=%d that is not running." % task.tid)
# Avoid scheduling a task twice
if task.scheduled>0:
return
self.ready.append(task)
task.scheduled+=1
if self.debug>0:
DbgMsgOut("COS", "Task tid=%d scheduled." % (task.tid))
[docs] def enqueueSpawn(self, spawnSyscall, spawnerTask, targetHostID=None):
"""
Equeues a spawn system call waiting on a free slot.
If *targetHostID* is specified a spawn system call is enqueued
for handling when a slot is available on host given by
*targetHostID* which is an object of class :class:`HostID`.
"""
if targetHostID is None:
self.waitingOnSlot.append((spawnSyscall, spawnerTask))
else:
if targetHostID not in self.waitingOnSlotOnHost:
self.waitingOnSlotOnHost[targetHostID]=[]
self.waitingOnSlotOnHost[targetHostID].append((spawnSyscall, spawnerTask))
def __call__(self):
# Enqueue main task, receives tid=1
mainTaskGreenlet=greenlet.getcurrent().parent
if not mainTaskGreenlet:
raise PyOpusError("COS: Scheduler must run in a separate greenlet.")
mainTask=Task(parent=None, greenlet=mainTaskGreenlet, name="_main", args=None, kwargs=None)
self.tasks[mainTask.tid]=mainTask
self.schedule(mainTask)
# Main loop, exit when there are no tasks left
while self.tasks:
# Remote task
remoteTask=None
# Local task
localTask=None
# Handle messages from remote tasks
while self.vm is not None:
# If there are any tasks scheduled locally, do not block
if len(self.ready)>0:
if self.debug>1:
DbgMsgOut("COS", "Nonblocking receive in scheduler.")
recv=self.vm.receiveMessage(0)
else:
# Block because there are no local tasks scheduled
if self.debug>1:
DbgMsgOut("COS", "Blocking receive in scheduler.")
recv=self.vm.receiveMessage(-1)
if recv is not None and len(recv)==2:
# Valid message, handle it
(srcID, msg)=recv
# Check message type
if type(msg) is MsgTaskResult and srcID in self.remoteTasks:
# Result message
# Find task, set its return value
remoteTask=self.remoteTasks[srcID]
remoteTask.retval=msg.returnValue
if self.debug>0:
DbgMsgOut("COS", "Result message received from %s on %s, tid=%d." % (str(srcID), str(self.vm.hostID(srcID)), remoteTask.tid))
elif type(msg) is MsgTaskExit:
# Task exit message
if srcID in self.remoteTasks:
# Get remote task
remoteTask=self.remoteTasks[srcID]
# Remove it from list of remote tasks
del self.remoteTasks[srcID]
# Mark it as finished
remoteTask.status=Task.Sfinished
if self.debug>0:
DbgMsgOut("COS", "Task exit message received from %s on %s, tid=%d, task finished." % (str(srcID), str(self.vm.hostID(srcID)), remoteTask.tid))
# Get parent
parent=self.tasks[remoteTask.ptid]
# Remove task from the list of running tasks
del self.tasks[remoteTask.tid]
# Update children count of parent task
parent.nchildren-=1
# Is the parent waiting on the task (Join system call)?
if (
parent.status==Task.Swaiting and
(len(parent.waitingOn)==0 or remoteTask.tid in parent.waitingOn)
):
if self.debug>0:
DbgMsgOut("COS", "Parent task tid=%d of task tid=%d waiting on results (Join)." % (parent.tid, remoteTask.tid))
# Set return value for parent
parent.sendval={ remoteTask.tid: remoteTask.retval }
# Schedule parent
parent.status=Task.Srunning
self.schedule(parent)
else:
# Parent is not waiting on the task, add to parent's finished children dictionary
parent.finishedChildren[remoteTask.tid]=remoteTask
# We have a free slot.
# Assume we have no spawn call waiting
spawnSyscall=None
# Get spawn calls waiting on a specific host
needHosts=set(self.waitingOnSlotOnHost.keys())
if len(needHosts)>0:
# Get hosts with free slots
freeHosts=self.vm.hostsWithFreeSlots()
# Find first match
candidateHosts=freeHosts.intersection(needHosts)
if len(candidateHosts)>0:
candidateHost=candidateHosts.pop()
spawnSyscall, spawnerTask = self.waitingOnSlotOnHost[candidateHost].pop()
if len(self.waitingOnSlotOnHost[candidateHost])<=0:
del self.waitingOnSlotOnHost[candidateHost]
# No syscall yet?
if spawnSyscall is None:
# Look in syscalls waiting on any host
if len(self.waitingOnSlot)>0:
# Get it
spawnSyscall, spawnerTask = self.waitingOnSlot.pop(0)
# Do we have a syscall now?
if spawnSyscall is not None:
# Check if the spawner task is still running
if spawnerTask.tid in self.tasks:
# Handle it
if self.debug>0:
DbgMsgOut("COS", "Task tid=%d is waiting on a free slot. Handling Spawn." % (spawnerTask.tid))
spawnSyscall.handle(self, spawnerTask)
elif recv is not None and len(recv)==0:
# Empty tuple received (timeout in nonblocking receive)
# No more pending mesassages
if self.debug>1:
DbgMsgOut("COS", "No more pending messages.")
break
# No remote task status changed, but we have scheduled local tasks
# if remoteTask is None and len(self.ready)>0:
# Handle local tasks
if len(self.ready)>0:
# Get next local task
localTask=self.ready.pop(0)
localTask.scheduled-=1
if localTask.status is not Task.Srunning and localTask.status is not Task.Screated:
raise PyOpusError(DbgMsg("COS", "A local task that is not running was scheduled."))
# Switch to local task
identify.tid=localTask.tid
self.activeTask=localTask.tid
retval=localTask.switchToLocal()
self.activeTask=self.tid
identify.tid=self.tid
# Task is finished
if localTask.status==Task.Sfinished:
# Store return value
localTask.retval=retval
if self.debug>0:
DbgMsgOut("COS", "Local task tid=%d finished." % (localTask.tid))
# Get parent
parent=self.tasks[localTask.ptid]
# Remove task from the list of running tasks
del self.tasks[localTask.tid]
# Update children count of parent task
parent.nchildren-=1
# Is the parent waiting on the task (Join system call)?
if (
parent.status==Task.Swaiting and
(len(parent.waitingOn)==0 or localTask.tid in parent.waitingOn)
):
if self.debug>0:
DbgMsgOut("COS", "Parent task tid=%d of task tid=%d waiting on results (Join)." % (parent.tid, localTask.tid))
# Set return value for parent
parent.sendval={ localTask.tid: localTask.retval }
# Schedule parent
parent.status=Task.Srunning
self.schedule(parent)
else:
# Parent is not waiting on the task, add to parent's finished children dictionary
parent.finishedChildren[localTask.tid]=localTask
elif retval is not None and isinstance(retval, SysCall):
# Local task performed a system call
# Handle it
if self.debug>0:
DbgMsgOut("COS", "Handling %s system call from task tid=%d" % (str(type(retval)), localTask.tid))
retval.handle(self, localTask)
# Do not schedule the task, leave that to the syscall handler
# Cooperative OS wrapper
[docs]class OpusOS(object):
"""
Cooperative multitasking OS class.
The user should import the only instance of this class represented
by the :data:`cOS` variable.
*vm* - virtual machine abstraction for spawning remote tasks.
If *vm* is ``None`` remote spawning is disabled.
"""
scheduler=None
debug=0
def __init__(self, vm=None):
self._createScheduler(vm)
[docs] def setVM(self, vm=None):
"""
Sets the virtual machine object.
Allowed only when there are no remote tasks running.
This is not a system call and does not
yield execution the the scheduler.
"""
OpusOS.scheduler.setVM(vm)
[docs] @staticmethod
def setDebug(debug=0):
"""
Set the debug message level to *debug*.
Setting this value to 0 turns off debugging.
"""
OpusOS.debug=debug
OpusOS.scheduler.setDebug(debug)
def _createScheduler(self, vm):
if OpusOS.scheduler is not None:
raise PyOpusError("COS: There can be only one OpusOS object.")
# Create scheduler
OpusOS.scheduler=Scheduler(vm)
# Create scheduler greenlet
OpusOS.schedulerGreenlet=greenlet(OpusOS.scheduler)
# Swith to scheduler to start it
OpusOS.schedulerGreenlet.switch()
# For pickling at remote spawn
def __getstate__(self):
# Pack the state and the vm object
state=self.__dict__.copy()
return state, OpusOS.scheduler.vm
# For unpickling at remote spawn
def __setstate__(self, stateIn):
# Unpack the state and the vm object
state, vm = stateIn
# Set state
self.__dict__.update(state)
# Create scheduler object if there is none yet
if OpusOS.scheduler is None:
# Create scheduler if this is the first object of this type in this process
self._createScheduler(vm)
# Functions
[docs] @staticmethod
def freeSlots():
"""
Returns the number of free slots in a vm.
If there is no vm, returns -1.
A slot is used as soon as a task is spawned.
A slot is freed as soon as a task is finished,
The task does not have to be joind to free a slot.
This is not a system call and does not
yield execution the the scheduler.
"""
if OpusOS.scheduler.vm is not None:
return OpusOS.scheduler.vm.freeSlots()
return -1
[docs] @staticmethod
def finishedTasks():
"""
Returns the number of tasks that are finished but
not joined.
If there is no vm, returns -1.
This is not a system call and does not
yield execution the the scheduler.
"""
if OpusOS.scheduler.vm is not None:
tid=OpusOS.scheduler.activeTask
return len(OpusOS.scheduler.tasks[tid].finishedChildren)
return -1
[docs] @staticmethod
def slots():
"""
Returns the number of slots in a vm.
If there is no vm, returns -1.
This is not a system call and does not
yield execution the the scheduler.
"""
if OpusOS.scheduler.vm is not None:
return OpusOS.scheduler.vm.slots()
return -1
[docs] @staticmethod
def getTid():
"""
Returns the task id of the running microthread.
This is not a system call and does not
yield execution the the scheduler.
"""
return OpusOS.scheduler.activeTask
[docs] @staticmethod
def toAbstractPath(p):
"""
Translates path *p* to abstract path that can be
sent to a remote process and decoded there. Takes
into account parallel mirrored storage.
An abstract path comprises a parallel mirrored storage
entry index and a relative path suffix.
See :func:`pyopus.parallel.base.translateToAbstractPath`
for more information.
"""
return translateToAbstractPath(p)
[docs] @staticmethod
def toActualPath(abstractPath):
"""
Translates *abstractPath* comprising a parallel mirrored
storage entry index and a relative path suffix to actual
local path.
If entry index is ``None`` no translation takes place and
the relative path suffix is returned.
See :func:`pyopus.parallel.base.translateToActualPath`
for more information.
"""
index, relPath = abstractPath
if index is None:
return relPath
else:
return translateToActualPath(index, relPath)
[docs] @staticmethod
def spawnerCall(function, args=(), kwargs={}, asynchr=False):
"""
Calls *function* with *args* and *kwargs* in the spawner
task. If no VM object is set or this is the spawner task
the *function* is called locally.
IF *asynchr* is ``False`` waits for the remote call to
finish and returns its return value.
"""
vm=OpusOS.scheduler.vm
if vm is None:
# No vm, call locally
return function(*args, **kwargs)
else:
return vm.spawnerCall(function, args, kwargs, asynchr)
# System calls. These function yield execution to the scheduler.
[docs] @staticmethod
def Yield():
"""
Invokes the *Yield* system call.
See :class:`Yield`.
"""
return OpusOS.schedulerGreenlet.switch(Yield())
[docs] @staticmethod
def Spawn(*args, **kwargs):
"""
Invokes the *Spawn* system call.
See :class:`Spawn`.
"""
return OpusOS.schedulerGreenlet.switch(Spawn(*args, **kwargs))
[docs] @staticmethod
def Join(*args, **kwargs):
"""
Invokes the *Join* system call.
See :class:`Join`.
"""
return OpusOS.schedulerGreenlet.switch(Join(*args, **kwargs))
# Asynchronous dispatch
[docs] @staticmethod
def dispatch(jobList, collector=None, remote=True, buildResultList=None, collectHostIDs=False):
"""
Dispatches multiple jobs and collects the results.
If *remote* is ``True`` the jobs are dispatched
asynchronously across the available computing nodes.
A job is a tuple of the form
``(callable, args, kwargs, extra)``.
A job is evaluated by invoking the callable with given
``args`` and ``kwargs``. If ``kwargs`` is omitted only
positional arguments are passed. If ``args`` is also
omitted the ``callable`` is invoked without arguments.
The return value of the ``callable`` is the job result.
Extra data (``extra``) can be stored in the optional
entries after ``kwargs``. This data is not passed to
the callable.
*jobList* is an iterator object (i,e, list) that holds the
jobs. It can also be a generator that produces jobs.
A job may not be ``None``.
*collector* is an optional unprimed coroutine. When a
job is finished its result is sent to the *collector*
in the form of a tuple ``(index, job, result)`` where
``index`` is the index of the ``job``. Collector's
task is to collect the results in a user-defined data
structure.
If *collectHostIDs* is ``True`` the collector receives
tuples of the form ``(index, job, result, hostID)`` where
``hostID`` is an object of the class :class:`HostID` that
corresponds to the host where the job was run. For jobs
that were not spawned as remote jobs ``hostID`` is ``None``.
By catching the ``GeneratorExit`` exception in the
*collector* a postprocessing step can be performed on
the collected results.
Returns the list of results (in the order of generated
jobs). When a *collector* is specified the results are
not collected in a list unless *buildResultList* is set
to ``True``.
"""
# Prime the collector
if collector is not None:
next(collector)
# Prepare results list
# Collect results in a list
if collector is None or buildResultList is True:
resList=[]
jobs={}
taskIDs={}
hostIDs={}
ii=0
while True:
# Get next job
try:
# job=jobList.next()
job=next(jobList)
# Get parts
f=job[0]
args=job[1] if len(job)>1 else []
kwargs=job[2] if len(job)>2 else {}
# Spawn
tid, taskID, hostID = OpusOS.Spawn(f, args=args, kwargs=kwargs, remote=remote, block=True, extendedReturnValue=True)
taskIDs[tid]=taskID
hostIDs[tid]=hostID
jobs[tid]=ii,job
if OpusOS.debug>0:
DbgMsgOut("COS", "Spawned task tid=%d, running tasks: %s" % (tid, str(jobs.keys())))
ii+=1
except StopIteration:
job=None
# Join a job, block if one of the following holds
# - no more jobs to spawn
# - there are less than 2 slots in the VM
# - there are no free slots (with joined tasks) in the VM
block=(job is None) or (OpusOS.slots()<2) or (OpusOS.freeSlots()-OpusOS.finishedTasks()<=0)
jr=OpusOS.Join(block=block)
for tid, retval in jr.items():
# Extract job
jj,jjob=jobs[tid]
taskID=taskIDs[tid]
hostID=hostIDs[tid]
del jobs[tid]
if OpusOS.debug>0:
DbgMsgOut("COS", "Joined task tid=%d, running tasks: %s" % (tid, str(jobs.keys())))
# Send result to the collector
if collector is not None:
if collectHostIDs:
collector.send((jj, jjob, retval, hostID))
else:
collector.send((jj, jjob, retval))
# Collect results in a list
if collector is None or buildResultList is True:
# Make space
if len(resList)<=jj:
resList.extend([None]*(jj-len(resList)+1))
# Store
resList[jj]=retval
# Displatched all jobs and nothing to join left
if not jr and job is None:
break
# Shut down the collector
if collector is not None:
collector.close()
# Return result list
if collector is None or buildResultList is True:
return resList
else:
return None
[docs] @staticmethod
def dispatchSingle(function, args=[], kwargs={}, remote=True):
"""
Dispatches a single task defined by *function*, *args*, and
*kwargs*. If *remote* is ``True`` the task is dispatched to
a remote computing node.
This function is used for moving a task to a remote processor
and waits for the results. It is not very useful in the sense
that it does not introduce any parallelism.
Returns the return value of the function.
"""
tid=OpusOS.Spawn(function, args, kwargs, remote=remote, block=True)
return OpusOS.Join()[tid]
[docs] @staticmethod
def finalize():
"""
Performs cleanup. Calls the :meth:`finalize` method of the vm.
"""
if OpusOS.scheduler.vm is not None:
OpusOS.scheduler.vm.finalize()
# OS object
cOS=OpusOS()
"Cooperative multitasking OS instance for accessing its functionality."