Source code for spux.executors.mpi4py.connectors.split

# # # # # # # # # # # # # # # # # # # # # # # # # #
# Connector class for subdivision of MPI.COMM_WORLD into manager and workers
# using mpi4py bindings and MPI backend for distributed memory paralellization
#
# Jonas Sukys
# Eawag, Switzerland
# jonas.sukys@eawag.ch
# All rights reserved.
# # # # # # # # # # # # # # # # # # # # # # # # # #

from mpi4py import MPI
import sys, socket
from .utils import universe_address

# compute worker ranks for the current level of resources
[docs]def get_ranks (resource, root=None, manager=0): """Compute worker ranks for the current level of resources""" stride = resource ['resources'] count = resource ['workers'] ranks = [manager + i * stride for i in range (count)] if root is not None: offset = 0 for level in root: if level ['address'] is not None: offset += 1 + level ['address'] * level ['resources'] ranks = [offset + rank for rank in ranks] return ranks
[docs]class Split (object): """Class to establish workers MPI processes by using server/client mode through ports.""" def __init__ (self, verbosity=0): self.size = MPI.COMM_WORLD.Get_size () self.rank = universe_address () self.verbosity = verbosity self.root = ( self.rank == 0 ) if verbosity: hostnm = 'unknown' try: hostnm = socket.gethostname() except: pass print("I AM RANK: ",self.rank,"OF: ",self.size, " on host: ", hostnm) mpiver = MPI.Get_version() print("using MPI version:",mpiver) mpilibver = MPI.Get_library_version() print("using MPI libversion:",mpilibver) self.cwrank = self.rank #handy sys.stdout.flush() # barrier for pool slots: get resources, split pool accordingly, and wait for incoming tasks
[docs] def barrier (self): """Split workers from manager, split workers into pools, wait for tasks.""" if not self.root: # get bcast resources from the pool root self.resources = MPI.COMM_WORLD.bcast (None) # split away pool root from the pool slots self.pool = MPI.COMM_WORLD.Split (self.root) self.rank = self.pool.Get_rank () # split pool (recursively) - sets self.peers intra-communicators self.split () # barrier for completion MPI.COMM_WORLD.Barrier () # wait for port and establish connection port = MPI.COMM_WORLD.recv (source=MPI.ANY_SOURCE) manager = self.connect (port, self.peers) # get contract and work according to it contract = manager.bcast (None) contract (manager, self.peers) # exit once finished sys.exit ()
# initialization for pool root: bcast resources and split away from pool slots
[docs] def init (self, resources): """Initialization for manager: bcast resources and split away from pool slots.""" self.resources = resources # manager sends out requests for communicators if self.root: # bcast resources throughout the pool MPI.COMM_WORLD.bcast (self.resources) # split away pool root from the pool slots MPI.COMM_WORLD.Split (self.root) # barrier for completion MPI.COMM_WORLD.Barrier ()
# split pool (recursively)
[docs] def split (self): """Split workers recursively into several pools of workers.""" # if no additional resources are needed, skip the splitting if self.resources [0] ['manager'] == 0 and self.resources [0] ['workers'] == 1: del self.resources [0] if len (self.resources) > 0: self.split () # get ranks according to the requested resources ranks = get_ranks (self.resources [0]) # worker ranks form pool intra-communicator if self.rank in set (ranks): self.peers = self.pool.Split (color=len(ranks)) self.rank = self.peers.Get_rank () if (self.peers == MPI.COMM_NULL): print("Fatal. Exiting split without a self.peers intracomm (1). This is a bug.") self.peers.Abort() self.pool.Abort() return # remaining groups of pool slots (inbetween workers) form separate pools else: if self.resources [0] ['resources'] == 1: if (len(self.resources) != 1): self.peers.Abort() self.pool.Abort() if (self.peers == MPI.COMM_NULL): print("Fatal. Exiting split without a self.peers intracomm (2). This is a bug.") self.peers.Abort() self.pool.Abort() return color = self.rank // self.resources [0] ['resources'] if ( color == len(ranks) ): print("Fatal. Color of worker is equal to color of level-managers. This is a bug or you are using too many resources.") print(color,len(ranks),ranks,self.rank) self.pool.Abort() self.pool = self.pool.Split (color) self.rank = self.pool.Get_rank () del self.resources [0] if len (self.resources) > 0: self.split () else: if (self.peers == MPI.COMM_NULL): print("Fatal. Exiting split without a self.peers intracomm (3). This is a bug.") self.peers.Abort() self.pool.Abort() print("Exiting cause self.resources has length 0: ",self.peers) print("rank ",self.rank," getting out with color: ", len(ranks) ) print("rank ",self.rank," has resources at exit: ",self.resources) sys.stdout.flush() return
# connect manager with the number of requested workers by returning a port needed to connect to an inter-communicator
[docs] def bootup (self, contract, task, resource, root, verbosity): """Inter-connect manager with the number of requested workers by returning a port.""" # get ranks according to specified resources ranks = get_ranks (resource, root, manager=1) # open a port for workers to connect to port = MPI.Open_port () # contact specified ranks and form the inter-comm requests = [] for rank in ranks: requests += [ MPI.COMM_WORLD.isend (port, dest=rank) ] MPI.Request.waitall (requests) workers = self.accept (port, verbosity) # broadcast contract to all workers workers.bcast (contract, root=MPI.ROOT) # broadcast port, task template and the connector to workers workers.bcast ((port, task, self), root=MPI.ROOT) # disconnect from workers workers.Disconnect () workers = None return port
[docs] @staticmethod def shutdown (port, verbosity): """Finalize connector.""" MPI.Close_port (port)
[docs] @staticmethod def connect (port, peers): """Establish connection on worker side.""" manager = peers.Connect (port) return manager
[docs] @staticmethod def accept (port, verbosity): """Establish connection on manager side.""" workers = MPI.COMM_SELF.Accept (port) return workers
[docs] @staticmethod def disconnect (workers, verbosity): """Interrupt connection.""" workers.Disconnect ()