10.2.4. Measuring the message delay and throughput

File 03-messaging.py in folder demo/parallel/vm/

# Measures the message delay and average transfer speed for messages of various sizes 

import sys
from pyopus.parallel.mpi import MPI as VM
from pyopus.parallel.base import MsgTaskExit, MsgTaskResult
import funclib
import os, time, sys
import numpy as np

if __name__=='__main__':
	# Set work direcotry on worker to be the same as on the spawner. 
	vm=VM(startupDir=os.getcwd(), debug=1)
	
	# Get hosts, find a non-local host
	myHostID=vm.hostID()
	
	# Find a remote host
	for hostID in vm.hosts():
		if hostID!=myHostID:
			break
	
	# See if we have at least one remote host. 
	if hostID==myHostID:
		print("\nWarning. Measuring local communication speed.")
	
	# Prepare data sizes
	dataSizes=[0, 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000]
	
	# Spawn bounceBack()
	taskIDs=vm.spawnFunction(funclib.bounceBack, kwargs={'vm': vm}, targetList=[hostID], count=1)
	
	# Check if it succeeded
	if len(taskIDs)<1:
		print("Failed to spawn bounceBack().")
		exit(-1)
		
	taskID=taskIDs[0]
	
	print("Task layout:")
	print(vm.formatSpawnerConfig())
	
	print("Measuring message delivery time and data throughput to "+str(hostID)+".")
	print("Bounce back task: "+str(taskID))
	
	# Go through data sizes
	total_time=0
	for dataSize in dataSizes:
		# Create data
		data=np.random.randint(0, 256, size=dataSize).astype(np.uint8)
		
		# How many times do we need to cycle send/receive for runtime=1s? 
		if total_time>0 and oldDataSize>0:
			# Calculate new repeats for 1 secons run
			repeats=int(repeats/total_time*2.0*oldDataSize/dataSize)
			if repeats==0:
				repeats=1
		else:
			# Initial repeats
			repeats=1000
		
		# Warm up
		for count in range(repeats):
			vm.sendMessage(taskID, data)
			dummy=vm.receiveMessage()
			
		# Time
		mark=time.time()
		for count in range(repeats):
			vm.sendMessage(taskID, data)
			dummy=vm.receiveMessage()
		total_time=time.time()-mark
		
		# Evaluate
		dt=total_time/2.0/repeats
		oldDataSize=dataSize
		tp=dataSize*8/dt
		
		# Print result
		print("Data size %9.3fkB, iterations=%6d, time=%7.0fus, speed=%5.3fMb/s" % (
			dataSize/1000.0, repeats, dt*1e6, tp/1e6
		))
		sys.stdout.flush()
		
	# Send None (will make bounceBack() exit. 
	vm.sendMessage(taskID, None)
	
	# Wait for MsgTaskExit message
	while True:
		(src, msg)=vm.receiveMessage()
		
		if type(msg) is MsgTaskExit:
			break
	
	vm.finalize()

The bounceBack() function is defined in the funclib module (file funclib.py in folder demo/parallel/vm/). This function bounces back every received message to its source.

def bounceBack(vm=None):
	# Enter a loop, receive messages and send them back. 
	# If a None is received, exit. 
	
	# Loop
	while True:
		recv=vm.receiveMessage()
		
		# Not an error and not timeout
		if recv is not None and len(recv)==2:
			# Get source and message
			(sourceID, msg)=recv
			# Check if we must exit
			if msg is None:
				break
			# Send back
			vm.sendMessage(sourceID, msg)

The output for communication between processes on the same host

Warning. Measuring local communication speed.
Task layout:
localhost ncpu=8 free slots: []
        slot=   0 task=   0
        slot=   1 task=   1

Measuring message delivery time and data throughput to localhost.
Bounce back task: 1:1
localhost_2538_0 VM: Changing working directory to '/mnt/data/Data/pytest/demo/parallel/vm'.
Data size     0.000kB, iterations= 10000, time=     37us, speed=0.000Mb/s
Data size     0.001kB, iterations= 10000, time=     37us, speed=0.216Mb/s
Data size     0.010kB, iterations=  2701, time=     38us, speed=2.118Mb/s
Data size     0.100kB, iterations=  2647, time=     38us, speed=21.308Mb/s
Data size     1.000kB, iterations=  2663, time=     40us, speed=199.249Mb/s
Data size    10.000kB, iterations=  2490, time=     48us, speed=1661.765Mb/s
Data size   100.000kB, iterations=  2077, time=    100us, speed=8023.758Mb/s
Data size  1000.000kB, iterations=  1002, time=    755us, speed=10601.659Mb/s
Data size 10000.000kB, iterations=   132, time=  12370us, speed=6467.453Mb/s
calypso_2537_0 MPI: Task 1:1 exit detected.