Source code for spux.executors.mpi4py.connectors.spawn
# # # # # # # # # # # # # # # # # # # # # # # # # #
# Connector class for spawning workers directly from the manager at the OS level
# using mpi4py bindings and MPI backend for distributed memory paralellization
#
# Jonas Sukys
# Eawag, Switzerland
# jonas.sukys@eawag.ch
# All rights reserved.
# # # # # # # # # # # # # # # # # # # # # # # # # #
from mpi4py import MPI
import os
import sys
[docs]class Spawn (object):
"""Class to establish workers MPI processes by spawning of new processes."""
def __init__ (self, verbosity = 0):
self.verbosity = verbosity
# connect manager with the number of requested workers by returning a port needed to connect to an inter-communicator
[docs] def bootup (self, contract, task, resource, root=0, verbosity=0):
"""Return means of inter-communication along a possible hierarchy of processes."""
directory, filename = os.path.split (os.path.realpath (__file__))
worker = os.path.join (directory, "worker.py")
info = MPI.Info.Create ()
info.Set ('wdir', os.getcwd ())
if verbosity:
print ("Spawning workers:", resource ['workers'])
workers = MPI.COMM_SELF.Spawn (sys.executable, args=[worker], maxprocs=resource ['workers'], info=info)
# n = MPI.COMM_SELF.Get_attr (MPI.UNIVERSE_SIZE)
# broadcast contract to workers
workers.bcast (contract, root=MPI.ROOT)
# open a port for workers to connect to
port = MPI.Open_port ()
# broadcast port, task template and the connector to workers
workers.bcast ((port, task, self), root=MPI.ROOT)
# disconnect from workers
workers.Disconnect ()
workers = None
return port
[docs] def barrier (self):
return None
[docs] def init (self, resources):
return None
[docs] @staticmethod
def shutdown (port, verbosity):
"""Finalize connector."""
MPI.Close_port (port)
[docs] @staticmethod
def connect (port, peers):
"""Establish connection on worker side."""
manager = peers.Connect (port)
return manager
[docs] @staticmethod
def accept (port, verbosity):
"""Establish connection."""
workers = MPI.COMM_SELF.Accept (port)
return workers
[docs] @staticmethod
def disconnect (workers, verbosity):
"""Interrupt connection."""
workers.Disconnect ()