"""
.. inheritance-diagram:: pyopus.parallel.mpi
:parts: 1
**A virtual machine based on MPI and mpi4py (PyOPUS subsystem name: MPI)**
Attempts to import :mod:`mpi4py`. If import fails the mpi4py wrapper for the
MPI library is not available on the computer and an arror is raised.
"""
# MPI launcher module
from ..misc.dbgprint import MessagePrinter
from ..misc.debug import DbgDefaultPrint, DbgSetDefaultPrinter, DbgMsgOut, DbgMsg
from . import base
from .base import VirtualMachine, HostID, TaskID, MsgTaskExit, MsgTaskResult, getNumberOfCores
# Try importing MPI support. If it fails, raise our own exception.
try:
from mpi4py import MPI as mpi
except ImportError:
raise PyOpusError(DbgMsg("MPI", "Failed to import mpi4py module. MPI not supported."))
if base.firstVM is not None:
raise PyOpusError(DbgMsg("MPI", "MPI does not coexist with other VM types."))
import sys, os, shutil
from time import time
from sys import exc_info
from traceback import format_exception
from ..misc.env import environ
from ..misc import identify
from .. import PyOpusError
__all__ = [ 'MPIHostID', 'MPITaskID', 'MPI' ]
[docs]class MPIHostID(HostID):
"""
A MPI host ID class based on the :class:`~pyopus.parallel.base.HostID`
class.
In the MPI library does not provide host IDs so we use hostnames. Invalid
host ID has ``None`` for hostname.
The actual hostname can be accessed as the :attr:`name` member.
See the :class:`~pyopus.parallel.base.HostID` class for more information.
"""
[docs] @staticmethod
def bad():
"""
A static member function. Called with ``MPIHostID.bad()``. Returns an
invalid host ID.
"""
return MPIHostID(None)
def __init__(self, name):
self.name=name
def __eq__(self, other):
if self is None or other is None:
return self is None and other is None
return self.name==other.name
def __ne__(self, other):
if self is None or other is None:
return not (self is None and other is None)
return self.name!=other.name
def __hash__(self):
return hash(self.name)
def __str__(self):
if self.valid():
return "%s" % (self.name)
else:
return "MPI_NOHOST"
def __repr__(self):
return "MPIHostID("+repr(self.name)+")"
[docs] def valid(self):
"""
Returns ``True`` if this :class:`MPIHostID` object is valid.
"""
if self.name is None:
return False
else:
return True
[docs]class MPITaskID(TaskID):
"""
A MPI task ID class based on the :class:`~pyopus.parallel.base.TaskID`
class.
A task ID is composed of MPI rank and task number which is assigned by the
spawner. Valid task IDs have nonnegative rank.
The actual rank and task number can be accessed as :attr:`rank`
and :attr:`number` members.
See the :class:`~pyopus.parallel.base.TaskID` class for more information.
"""
[docs] @staticmethod
def bad():
"""
A static member function. Called with ``MPITaskID.bad()``. Returns an
invalid task ID.
"""
return MPITaskID(-1)
def __init__(self, rank, number=None):
if rank is None or rank<0:
rank=-1
self.rank=rank
self.number=number
def __eq__(self, other):
return self.rank==other.rank and self.number==other.number
def __ne__(self, other):
return self.rank!=other.rank or self.number!=other.number
def __hash__(self):
return self.rank+self.number
def __nonzero__(self):
return self.rank is not None and self.rank>=0
def __str__(self):
if self.valid():
return "%d:%d" % (self.rank, self.number)
else:
return "MPI_NOTASK"
def __repr__(self):
return "MPITaskID("+repr(self.rank)+", "+repr(self.number)+")"
[docs] def valid(self):
"""
Returns ``True`` if this :class:`MPITaskID` object is valid.
"""
if self.rank<0:
return False
else:
return True
[docs]class MPI(VirtualMachine):
"""
A virtual machine class based on the mpi4py wrapper for MPI.
One task (rank 0) is the spawner and does all the spawning. This is the main
task that runs the main program.
Other tasks (rank>0) are workers. Workers run a spawn loop in which they
receive spawn requests from the spawner. A worker can run one spawned task
at a time. If the task fails (caught exception), the spawner is notified,
upon which the loop expects a new spawn requst. If a task finishes cleanly
the spawner is also notified. The point at which the spawner and the
workers separate is in the :meth:`spawnerBarrier` method (this is also the
place where the worker's spawn loop is entered. The spawner is the only
task that returns from :meth:`spawnerBarrier`. Workers exit the spawn loop
only when they receive an exit request from the spawner (i.e. only when
the MPI application is about to terminate).
Assumes homogeneous clusters. This means that LINUX32 and LINUX64 are not
homogeneous, as well as LINUX32 and WINDOWS32.
Inhomogenous clusters are possible if the undelying MPI library has
supports for this feature.
See the :class:`~pyopus.parallel.base.VirtualMachine` class for more
information on the constructor.
"""
# Static members - only once per every process that uses MPI.
# Next task number
nextTaskNumber=None
# Host name
host=mpi.Get_processor_name()
# Rank
rank=None
# Task number
taskNumber=None
# For spawner
vmStatus=None
"""
Static member that appears once per every Python process using the MPI
library. Available only on the spawner.
Represents the status of the MPI virtual machine.
"""
def __init__(self, debug=0, startupDir=None, translateStartupDir=True, mirrorMap=None, persistentStorage=False):
VirtualMachine.__init__(
self, debug, startupDir=startupDir,
translateStartupDir=translateStartupDir,
mirrorMap=mirrorMap,
persistentStorage=persistentStorage
)
self.setWorkerDebug(debug)
if debug>0:
DbgMsgOut("MPI", "MPI object created, rank=%d on %s." % (MPI.rank, MPI.host))
[docs] @staticmethod
def slots():
"""
Returns the number of slots for tasks in a virtual machine.
Works only on the spawner.
Calls mpi.COMM_WORLD.Get_size().
"""
if MPI.vmStatus is None:
raise PyOpusError(DbgMsg("MPI", "Task not configured as spawner."))
return mpi.COMM_WORLD.Get_size()
[docs] @staticmethod
def freeSlots():
"""
Returns the number of free slots for tasks in the virtual machine.
The slot bookkeeping is done by hand.
Works only on the spawner.
"""
if MPI.vmStatus is None:
raise PyOpusError(DbgMsg("MPI", "Task not configured as spawner."))
return mpi.COMM_WORLD.Get_size()-len(MPI.vmStatus['usedSlots'])
[docs] @staticmethod
def freeSlotsSet():
"""
Returns the slot numbers of free slots.
Works only on the spawner.
"""
return MPI.vmStatus['freeSlots']
[docs] @staticmethod
def hostsWithFreeSlots():
"""
Returns set of objects of class :class:`HostID` corresponding
to hosts with at least one free slot.
Works only on the spawner.
"""
return MPI.vmStatus['freeHosts']
[docs] @staticmethod
def hosts():
"""
Returns a list of :class:`MPIHostID` objects corresponding to all
hosts.
Works only on the spawner.
"""
if MPI.vmStatus is None:
raise PyOpusError(DbgMsg("MPI", "Task not configured as spawner."))
lst=[]
for name in MPI.vmStatus['hosts'].keys():
lst.append(MPIHostID(name))
return lst
[docs] @staticmethod
def taskID():
"""
Returns the :class:`MPITaskID` object corresponding to the calling task.
"""
return MPITaskID(MPI.rank, MPI.taskNumber)
[docs] @staticmethod
def hostID(taskID=None):
"""
Returns the :class:`MPIHostID` object corresponding to *task*.
If *task* is not specified, the :class:`MPIHostID` object corresponding
to the caller task is returned.
"""
if taskID is None:
return MPIHostID(MPI.host)
else:
return MPIHostID(MPI.vmStatus['slot2host'][taskID.rank])
[docs] @staticmethod
def parentTaskID():
"""
Returns the :class:`MPITaskID` object corresponding to the task that
spawned the caller task.
"""
if MPI.rank==0:
return MPITaskID(-1,-1)
else:
return MPITaskID(0,0)
[docs] def setWorkerDebug(self, debug=0):
"""
Sets the worker debug message level to *debug*.
"""
comm=mpi.COMM_WORLD
worldsize=comm.Get_size()
for rank in range(1, worldsize):
comm.send(
debug,
dest=rank,
tag=MPIMSGTAG_SET_DEBUG
)
[docs] def spawnFunction(self, function, args=(), kwargs={}, count=None, targetList=None, sendBack=True):
"""
Spawns *count* instances of a Python *function* on remote hosts and
passes *args* and *kwargs* to the function. Spawning a function
does not start a new Python interpreter. A worker is running is a spawn
loop where spawn requests are executed.
*targetList* specifies the hosts on which the tasks are started.
If *sendBack* is ``True`` the status and the return value of the
spawned function are sent back to the spawner with a
:class:`pyopus.parallel.base.MsgTaskResult` message when the spawned
function returns.
Invokes the :func:`runFunctionWithArgs` function on the remote host for
starting the spawned function.
If spawning succeeds, updates the :attr:`vmStatus` attribute.
Spawns only in free slots. Works only on the spawner.
Returns a list of :class:`MPITaskID` objects representing the spawned
tasks.
See the :meth:`~pyopus.parallel.base.spawnFunction` method of the
:class:`~pyopus.parallel.base.VirtualMachine` class for more
information.
"""
if MPI.vmStatus is None:
raise PyOpusError(DbgMsg("MPI", "Task not configured as spawner."))
# Get a list of free slots on specified hosts/all hosts
slotlist=[]
if targetList is None:
#for host in MPI.vmStatus['hosts'].keys():
# slotlist.extend(MPI.vmStatus['hosts'][host]['freeSlots'])
slotlist=MPI.vmStatus['freeSlots']
else:
for hostObj in targetList:
host=hostObj.name
slotlist.extend(MPI.vmStatus['hosts'][host]['freeSlots'])
# No tasks spawned for now
taskIDlist=[]
# Spawn if there is any place left
comm=mpi.COMM_WORLD
while len(slotlist)>0:
# Check number of spawned tasks
if count is not None and len(taskIDlist)>=count:
break
# Prepare task number
taskNumber=MPI.nextTaskNumber
MPI.nextTaskNumber+=1
# Prepare destination
slot=slotlist.pop()
# Spawn
descriptor=self.func2desc(function)
comm.send(
(self, descriptor, args, kwargs, sendBack, taskNumber),
dest=slot,
tag=MPIMSGTAG_SPAWN
)
# Update vmStatus
host=MPI.vmStatus['slots'][slot]['host']
MPI.vmStatus['hosts'][host]['freeSlots'].discard(slot)
MPI.vmStatus['slots'][slot]['taskNumber']=taskNumber
if len(MPI.vmStatus['hosts'][host]['freeSlots'])<=0:
MPI.vmStatus['freeHosts'].discard(MPIHostID(host))
MPI.vmStatus['usedSlots'].add(slot)
MPI.vmStatus['freeSlots'].discard(slot)
# Prepare MPITaskID object
taskIDlist.append(MPITaskID(slot, taskNumber))
return taskIDlist
[docs] def spawnerCall(self, function, args=(), kwargs={}, asynchr=False):
"""
Calls *function* with *args* and *kwargs* in the spawner task.
If *asynchr* is ``False`` waits for the call to finish and returns
the function's return value.
If called in a worker task delegates the call to the spawner.
If called in a spawner task, calls *function* locally.
This function is useful for sending data to a central point (spawner)
where it is processed (e.g. for database writes, logging, etc.).
"""
if MPI.vmStatus is not None:
# We are on spawner, run locally, return the return value
return function(*args, **kwargs)
else:
# Send a request to the spawner
# Function descriptor
descriptor=self.func2desc(function)
# Send
comm=mpi.COMM_WORLD
comm.send(
(self, descriptor, args, kwargs, asynchr),
dest=0, # spawner
tag=MPIMSGTAG_SPAWNER_CALL
)
# Await response
if not asynchr:
received=receiveMPIMessage(
timeout=-1.0,
msgTag=MPIMSGTAG_SPAWNER_CALL_RESULT,
fromRank=0
)
# None means error. No message.
if received is None:
return None
return received
[docs] def checkForIncoming(self, fromDestination=None):
"""
Returns ``True`` if there is a message waiting to be received.
*fromDestination* is the :class:`MPITaskID` of the message source.
``None`` implies any source.
"""
fromRank=-1 if fromDestination is None else fromDestination.rank
val, tag, rank = checkForIncoming(fromRank=fromRank)
return val
[docs] def receiveMessage(self, timeout=-1.0, fromDestination=None):
"""
Receives a *message* (a Python object) and returns a tuple
(*senderTaskId*, *message*)
The sender of the *message* can be identified through the
*senderTaskId* object of class :class:`MPITaskID`.
If *timeout* (seconds) is negative the function waits (blocks) until
some message arrives. If *timeout*>0 seconds pass without receiving a
message, an empty tuple is returned. Zero *timeout* performs a
nonblocking receive which returns an empty tuple if no message is
received. Note that timeout>0 is implemented a bit kludgy with a poll
loop.
In case of an error or discarded message returns ``None``.
Handles transparently all
* task exit notification messages
* spawned function return value messages
Discards all other low-level MPI messages that were not sent with the
:meth:`sendMessage` method.
"""
# Some messages are handled transparently
# MPIMSGTAG_EXIT
# MPIMSGTAG_DBGMSG
# all unknown messages
# On blocking receive we must receive messages until we get a
# non-transparently handled message.
# On timeout receive we must receive messages until timeout
# expires, upon which () is returned to indicate timeout.
# On nonblocking receive until first non transparent message is received
# or we are out of messages (we return ()).
# receive any message from any source
fromRank=-1 if fromDestination is None else fromDestination.rank
# Mark time for receives with timeout
if timeout>0:
mark=time()
timeoutLeft=timeout
while True:
# Receive it
received=receiveMPIMessage(timeoutLeft, fromRank=fromRank)
# None means error. No message.
if received is None:
return None
# Empty tuple means timeout in timeout mode or no message in nonblocking mode
if len(received)==0:
# Is this a timeout receive
if timeout>0:
# Recompute remaining timeout
timeoutLeft=timeout-(time()-mark)
# No more time remaining, return () indicating timeout
if timeoutLeft<0:
return ()
else:
# Nonblocking receive
# No more messages, return ()
return ()
# Unpack tuple
(source, msgTag, msg)=received
# Handle messages
# Transparently handling/discarding messages does not return.
# For all other handled messages we return.
if msgTag==MPIMSGTAG_EXIT:
# Request to exit spawn loop
# Ignore if we are the spawner
if vmStatus is None:
mpi.Finalize()
sys.exit()
elif msgTag==MPIMSGTAG_DBGMSG:
# Debug message
transparent=True
DbgDefaultPrint(msg)
elif msgTag==MPIMSGTAG_TASK_EXCEPTION:
# Exception raised in task
for line in msg[0]:
DbgMsgOut("MPI", line)
MPI.finalize()
raise PyOpusError(DbgMsg("MPI", "Exception in remote task."))
elif msgTag==MPIMSGTAG_MESSAGE:
# Unpack task number and message
(fromTaskNumber, toTaskNumber, message)=msg
# Discard transparently if toTaskNumber does not match our task number
if toTaskNumber!=MPI.taskNumber:
pass
else:
return (MPITaskID(source, fromTaskNumber), message)
elif msgTag==MPIMSGTAG_SPAWNER_CALL:
# Unpack
(descriptor, args, kwargs, asynchr)=msg
# Get function from descriptor
function=VirtualMachine.desc2func(descriptor)
# Call
retval=function(*args, **kwargs)
# Send back return value
if not asynchr:
sendMPIMessage(
source.rank,
MPIMSGTAG_SPAWNER_CALL_RESULT,
retval
)
elif msgTag==MPIMSGTAG_TASK_RETVAL:
# Unpack task number, success flag, and response
(taskNumber, success, response)=msg
return (MPITaskID(source, taskNumber), MsgTaskResult(success, response))
elif msgTag==MPIMSGTAG_TASK_EXIT:
# Unpack task number, success flag, and response
taskNumber=msg
if self.debug:
DbgMsgOut("MPI", "Task "+str(MPITaskID(source, taskNumber))+" exit detected.")
# Update vmStatus if message matches the taskNumber in vmStatus
if MPI.vmStatus['slots'][source]['taskNumber']==taskNumber:
MPI.vmStatus['slots'][source]['taskNumber']=-1
host=MPI.vmStatus['slots'][source]['host']
MPI.vmStatus['hosts'][host]['freeSlots'].add(source)
if len(MPI.vmStatus['hosts'][host]['freeSlots'])>0:
MPI.vmStatus['freeHosts'].add(MPIHostID(host))
MPI.vmStatus['usedSlots'].discard(source)
MPI.vmStatus['freeSlots'].add(source)
taskID=MPITaskID(source, taskNumber)
return (taskID, MsgTaskExit(taskID))
else:
# Unknown message, discard transparently
pass
[docs] def sendMessage(self, destination, message):
"""
Sends *message* (a Python object) to a task with :class:`MPITaskID`
*destination*. Returns ``True`` on success.
Sends also our own task number and the destination task number.
"""
return sendMPIMessage(
destination.rank, MPIMSGTAG_MESSAGE,
(MPI.taskNumber, destination.number, message)
)
def _broadcastNewPersistentObject(self, obj, oid):
"""
Broadcasts object *obj* associated with persistent object id
*id*. to all workers.
"""
# Use our custom pickler for object persistence
# mpi.pickle.dumps=base._dumps # Does not work under Windows, Python 3.7.4
# mpi.pickle.loads=base._loads
mpi.pickle.__init__(base._dumps, base._loads)
if MPI.rank!=0:
# Do nothing on workers
return
# raise PyOpusError(DbgMsg("VM", "Only master can broadcast persistent object management messages."))
for r in range(1, self.slots()):
if self.debug:
DbgMsgOut("MPI", ("Broadcasting persistent object 0x%x to rank %d" % (oid, r)))
sendMPIMessage(
r, MPIMSGTAG_ADD_PERSISTENT_OBJECT,
(obj, oid)
)
def _broadcastPersistentObjectDeletion(self, oid):
"""
Broadcasts a message to all workers that persistent object
with id *id* should be removed from teh table of persistent
objects.
"""
if MPI.rank!=0:
# Do nothing on workers
return
# raise PyOpusError(DbgMsg("VM", "Only master can broadcast persistent object management messages."))
if self.debug:
DbgMsgOut("MPI", ("Broadcasting deletion of persistent object 0x%x to workers." % oid))
for r in range(1, self.slots()):
sendMPIMessage(
r, MPIMSGTAG_REMOVE_PERSISTENT_OBJECT,
oid
)
[docs] @staticmethod
def finalize():
"""
Asks workers to exit politely. Then calls Finalize().
See :meth:`VirtualMachine.finalize` for more information.
"""
comm=mpi.COMM_WORLD
for rank in range(1, comm.Get_size()):
comm.send(0, dest=rank, tag=MPIMSGTAG_EXIT)
mpi.Finalize()
#
# Message tags
#
MPIMSGTAG_REQUEST_NCPU=1
"""
The message tag for a cpu count and hostname request.
"""
MPIMSGTAG_NCPU=2
"""
The message tag for a cpu count and hostname response.
"""
MPIMSGTAG_EXIT=3
"""
Exit spawn loop (application termination).
"""
MPIMSGTAG_SPAWN=4
"""
Spawn a task (to worker).
"""
MPIMSGTAG_TASK_RETVAL=5
"""
The status and return value of a task (to master).
"""
MPIMSGTAG_TASK_EXIT=6
"""
A task has finished (to master).
"""
MPIMSGTAG_SET_DEBUG=7
"""
Set the debug level of worker.
"""
MPIMSGTAG_DBGMSG=8
"""
Debug message that will be printed by rank 0 process.
"""
MPIMSGTAG_TASK_EXCEPTION=9
"""
Exception forwarding from worker to spawner.
"""
MPIMSGTAG_MESSAGE=100
"""
Message sent to a task.
"""
MPIMSGTAG_SPAWNER_CALL=101
"""
Request for master to run a function.
"""
MPIMSGTAG_SPAWNER_CALL_RESULT=102
"""
Response from master with function's return value.
"""
MPIMSGTAG_ADD_PERSISTENT_OBJECT=103
"""
Add a new object to the persistent object table.
"""
MPIMSGTAG_REMOVE_PERSISTENT_OBJECT=104
"""
Remove a persistent object from the persistent object table.
"""
#
# Message handling at MPI level.
#
def sendMPIMessage(rank, msgTag, msg):
"""
Sends a message (binary string *msg*) to the task with rank *rank*.
The message is tagged with an integer *msgTag*.
Should never be called by the user if the communication methods of
:class:`MPI` are used.
"""
try:
mpi.COMM_WORLD.send(
msg,
dest=rank,
tag=msgTag
)
return True
except KeyboardInterrupt:
DbgMsgOut("MPI", "keyboard interrupt")
raise
except:
return False
def receiveMPIMessage(timeout=-1.0, msgTag=-1, fromRank=-1):
"""
Receives a message (binary string) with tag *msgTag* (integer) from a
task with rank given by *fromRank* (integer). If *fromRank* is negative
messages from all tasks are accepted. If *msgTag* is negative messages
with any tag are accepted.
If the message is not received within *timout* seconds, returns an empty
tuple. Negative values of *timeout* stands for infinite timeout.
If a message is received a tuple (*source*, *tag*, *msg*) is returned
where *source* is the rank of the sender, *tag* is the message tag, and
*msg* is the message.
Should never be called by the user if the communication methods of
:class:`MPI` are used.
"""
if msgTag<0:
tag=mpi.ANY_TAG
else:
tag=msgTag
if fromRank<0:
rank=mpi.ANY_SOURCE
else:
rank=fromRank
# Communicator
comm=mpi.COMM_WORLD
rcvd=False
st=mpi.Status()
try:
if timeout>0.0:
# This is ugly. Don't use timeout with MPI.
mark=time()
while True:
val, tag1, rank1 = checkForIncoming(msgTag=tag, fromRank=rank)
if val:
msg=comm.recv(source=rank1, tag=tag1, status=st)
rcvd=True
break
if time()-mark>timeout:
break
elif timeout==0.0:
# Non-blocking
val, tag1, rank1 = checkForIncoming(msgTag=tag, fromRank=rank)
if val:
msg=comm.recv(source=rank1, tag=tag1, status=st)
rcvd=True
else:
# Blocking
msg=comm.recv(source=rank, tag=tag, status=st)
rcvd=True
except KeyboardInterrupt:
raise
except:
return None
if rcvd:
return (st.Get_source(), st.Get_tag(), msg)
elif timeout>=0.0:
# Timeout, return empty tuple
return ()
else:
# Error, return None
return None
def checkForIncoming(msgTag=-1, fromRank=-1):
"""
Returns a tuple (*b*, *tag*, *src*). *b* is ``True`` if a message with tag
*msgTag* is waiting to be received from *fromRank*. *tag* and *rank* are
the message tag and the rank of the sender.
Negative values of *msgTag* and *fromrank* match a message with an arbitrary
tag/source.
"""
if msgTag<0:
tag=mpi.ANY_TAG
else:
tag=msgTag
if fromRank<0:
rank=mpi.ANY_SOURCE
else:
rank=fromRank
try:
st=mpi.Status()
# Should be Improbe in the future, not implemented yet in OpenMPI
res=mpi.COMM_WORLD.Iprobe(source=rank, tag=tag, status=st)
tag=st.Get_tag()
rank=st.Get_source()
return (res!=0), tag, rank
except KeyboardInterrupt:
DbgMsgOut("MPI", "keyboard interrupt")
raise
except:
return False
class MPIMessagePrinter(MessagePrinter):
"""
MessagePrinter for sending a message to rank 0 for printing.
Buffers messages. Sends them when the :meth:`flush` method is invoked.
"""
def __init__(self):
MessagePrinter.__init__(self)
self.buf=[]
def write(self, message):
self.buf.append(message)
def flush(self):
txt="".join(self.buf)
self.buf=[]
# Send
try:
mpi.COMM_WORLD.send(
txt,
dest=0,
tag=MPIMSGTAG_DBGMSG
)
except:
raise
#
# These functions are used only at module import
#
def updateWorkerInfo():
"""
Updates the internal information of :class:`MPI` class used by a worker
task.
"""
# Get my rank
try:
MPI.rank=mpi.COMM_WORLD.Get_rank()
except KeyboardInterrupt:
DbgMsgOut("MPI", "Keyboard interrupt.")
raise
except:
MPI.rank=None
# Get my host
MPI.host=mpi.Get_processor_name()
# Clear vmStatus (we are not a spawner)
MPI.vmStatus=None
def updateSpawnerInfo():
"""
Updates the internal information used by a spawner.
:attr:`MPI.vmStatus` is a dictionary with three members:
* ``hosts`` - information about the hosts in the virtual machine
* ``slots`` - information about the task slots in the virtual machine
* ``freeSlots`` - set of free slots
* ``usedSlots`` - set of used slots,
* ``freeHosts`` - set of objects of class :class:`HostID` corresponding
* to hosts with at least one free slot
* ``slot2host`` - a dictionary for getting the hostname from slot number
``hosts`` is a dictionary with ``hostname`` for key and members that
are dictionaries with the following members:
* ``slots`` - the set of all slots on a host
* ``freeSlots`` - the set of free slots on a host
* ``ncpu`` - the number of CPU cores in the host
``slots`` is a dictionary with rank for key and members that are
dictionaries with the following members:
* ``host`` - the ``hostname`` of the host where this slot resides
* ``taskNumber`` - the task number of the task in slot (-1=no task)
The ``ncpu`` dictionary members of the ``hosts`` dictionary are
obtained with the :meth:`updateCoreCount` method.
"""
MPI.taskNumber=0
MPI.nextTaskNumber=1
comm=mpi.COMM_WORLD
if comm.Get_rank()!=0:
raise PyOpusError(DbgMsg("MPI", "Task not configured as spawner."))
# Handle spawner slot
MPI.vmStatus={
'hosts': {
MPI.host: {
'slots': set([0]),
'freeSlots': set(),
'ncpu': getNumberOfCores()
}
},
'slots': {
0: {
'host': MPI.host,
'taskNumber': 0,
'pid': identify.pid
}
} ,
'usedSlots': set([0]),
'freeSlots': set(),
'freeHosts': set(),
'slot2host': { 0: MPI.host }
}
# Requests the number of CPUs and hostname from every slot, except this one
worldsize=comm.Get_size()
for rank in range(1, worldsize):
comm.send(0, dest=rank, tag=MPIMSGTAG_REQUEST_NCPU)
# Receive responses
count=0
while count<worldsize-1:
st=mpi.Status()
msg=comm.recv(source=mpi.ANY_SOURCE, tag=MPIMSGTAG_NCPU, status=st)
src=st.Get_source()
(ncpu, hostname, pid)=msg
# hosts
if hostname not in MPI.vmStatus['hosts']:
MPI.vmStatus['hosts'][hostname]={'slots': set(), 'freeSlots': set(), 'ncpu': ncpu}
MPI.vmStatus['hosts'][hostname]['freeSlots'].add(src)
MPI.vmStatus['hosts'][hostname]['slots'].add(src)
# slots
# MPI.vmStatus['slots'][src]={ 'host': hostname, 'taskNumber': -1 }
MPI.vmStatus['slots'][src]={ 'host': hostname, 'taskNumber': -1, 'pid': pid }
# Slot to host conversion
MPI.vmStatus['slot2host'][src]=hostname
count+=1
# Initial set of free slots
MPI.vmStatus['freeSlots']=set(range(1, worldsize))
# Initial set of free hosts
for hostName, desc in MPI.vmStatus['hosts'].items():
if len(desc['freeSlots'])>0:
MPI.vmStatus['freeHosts'].add(MPIHostID(hostName))
#
# Mark that we are the first imported VM
#
if base.firstVM is None:
base.firstVM=MPI
# There is no portable way to detect whether we were started using mpirun/mpiexec.
#
# Spawner and worker go separate paths here.
# This happens at first import of MPI.
#
updateWorkerInfo()
# A set holding vmIndex values for which the initial mirror-cleanup was performed
firstSpawnDone=set()
# Infinite loop for worker, nothing special for spawner.
myRank=mpi.COMM_WORLD.Get_rank()
if myRank>0:
# Worker
comm=mpi.COMM_WORLD
# Set debug message printer
DbgSetDefaultPrinter(MPIMessagePrinter())
# Turn on worker debugging
workerDebug=0
if workerDebug:
DbgMsgOut("MPIWORKER", "Entering worker loop, rank=%d on %s" % (myRank, MPI.host))
# Await spawn requests in a loop
exitCode=0
while True:
st=mpi.Status()
msg=comm.recv(source=0, tag=mpi.ANY_TAG, status=st) # mpi.ANY_SOURCE
src=st.Get_source()
tag=st.Get_tag()
if tag==MPIMSGTAG_SPAWN:
(mpi_vm, functionDesc, args, kwargs, sendBack, taskNumber)=msg
if mpi_vm.vmIndex not in firstSpawnDone:
# This is the first time this vm index is spawning
firstSpawnDone.add(mpi_vm.vmIndex)
# If we are mirroring and using persistent storage
# remove existing local storage from previous runs with the same pid/vmIndex
if mpi_vm.mirrorList is not None and mpi_vm.persistentStorage:
mpi_vm.cleanupEnvironment(mpi_vm.localStoragePath(), forceCleanup=True)
if workerDebug:
DbgMsgOut("MPIWORKER", "Spawn request received, spawning MPI task id=%d:%d on %s." % (myRank, taskNumber, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
# Prepare function
function=VirtualMachine.desc2func(functionDesc)
# Prepare working environment
taskStorage=mpi_vm.prepareEnvironment()
# Set task number
MPI.taskNumber=taskNumber
# Assume spawn failed
success=False
response=None
# Call function
try:
response=function(*args, **kwargs)
success=True
ei=None
except KeyboardInterrupt:
DbgMsgOut("MPI", "Keyboard interrupt.")
ei=exc_info()
except:
if workerDebug:
DbgMsgOut("MPIWORKER", "MPI task %d:%d on %s failed." % (myRank, taskNumber, MPI.host))
ei=exc_info()
if ei is not None:
ex=format_exception(ei[0], ei[1], ei[2])
comm.send(
(ex,),
dest=0,
tag=MPIMSGTAG_TASK_EXCEPTION
)
success=False
response=None
if workerDebug:
DbgMsgOut("MPIWORKER", "MPI task %d:%d on %s finished." % (myRank, taskNumber, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
# Clear working environment
mpi_vm.cleanupEnvironment(taskStorage)
# Default response
if sendBack is not True:
response=None
# Flush stdout so parent will receive it.
sys.stdout.flush()
# Send back response if requested
if sendBack:
comm.send(
(taskNumber, success, response),
dest=0,
tag=MPIMSGTAG_TASK_RETVAL
)
if workerDebug:
DbgMsgOut("MPIWORKER", "MPI task %d:%d on %s results message sent." % (myRank, taskNumber, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
# Send back task exit message
comm.send(
taskNumber,
dest=0,
tag=MPIMSGTAG_TASK_EXIT
)
if workerDebug:
DbgMsgOut("MPIWORKER", "MPI task %d:%d on %s exit message sent." % (myRank, taskNumber, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
elif tag==MPIMSGTAG_REQUEST_NCPU:
if workerDebug:
DbgMsgOut("MPIWORKER", "CPU thread count request received.")
# Flush stdout so parent will receive it.
sys.stdout.flush()
comm.send((getNumberOfCores(), MPI.host, identify.pid), dest=src, tag=MPIMSGTAG_NCPU)
elif tag==MPIMSGTAG_EXIT:
if workerDebug:
DbgMsgOut("MPIWORKER", "Worker loop exit request received, rank=%d on %s." % (myRank, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
break
elif tag==MPIMSGTAG_SET_DEBUG:
debug=msg
workerDebug=debug
if workerDebug or debug:
DbgMsgOut("MPIWORKER", "Setting debug level to %d, rank=%d on %s" % (debug, myRank, MPI.host))
# Flush stdout so parent will receive it.
sys.stdout.flush()
elif tag==MPIMSGTAG_ADD_PERSISTENT_OBJECT:
obj, oid = msg
VirtualMachine._addPersistentObject(obj, oid)
if workerDebug:
DbgMsgOut("MPIWORKER", "Adding persistent object 0x%x, rank=%d on %s" % (oid, myRank, MPI.host))
# Use our custom pickler for object persistence
# mpi.pickle.dumps=base._dumps # Does not work under Windows, Python 3.7.4
# mpi.pickle.loads=base._loads
mpi.pickle.__init__(base._dumps, base._loads)
elif tag==MPIMSGTAG_REMOVE_PERSISTENT_OBJECT:
oid=msg
if workerDebug:
DbgMsgOut("MPIWORKER", "Removing persistent object 0x%x, rank=%d on %s" % (oid, myRank, MPI.host))
VirtualMachine._removePersistentObject(oid)
# At this point all MPIMSGTAG_MESSAGE messages are dropped.
# They are processed only in spawned functions.
if workerDebug:
DbgMsgOut("MPIWORKER", "Leaving worker loop, rank=%d on %s, exit code=%d." % (myRank, MPI.host, exitCode))
# Flush stdout so parent will receive it.
sys.stdout.flush()
# Worker exits here
mpi.Finalize()
sys.exit(exitCode)
else:
# Spawner
updateSpawnerInfo()