Source code for spux.executors.balancers.adaptive

# # # # # # # # # # # # # # # # # # # # # # # # # #
# Adaptive balancer class
# For particle filtering based on
# Kattwinkel & Reichert, EMS 2017.
#
# Strategy: move excess work to the closest available worker
#
# Jonas Sukys
# Eawag, Switzerland
# jonas.sukys@eawag.ch
# All rights reserved.
# # # # # # # # # # # # # # # # # # # # # # # # # #

import math

import numpy

from .balancer import Balancer

[docs]class Adaptive (Balancer): """Derived class to establish particle routings.""" # distribute initial particles to particle ensembles for each worker - ensembles = groups to workers
[docs] def ensembles (self, indices, workers): """Initially distribute particles to ensembles according to how many 'workers' are available.""" if workers < 1 or workers is None: import sys sys.exit ('Gotten self.workers < 1 or None in ensembles. This is a bug.') exit () ensembles = numpy.array_split (indices, workers) return ensembles
# compute routings from current particle ensembles and specified indices
[docs] def routings (self, ensembles, indices): """Compute routings of particles from current particle 'ensembles' and specified 'indices'""" if self.verbosity: print ('Routings ensembles:', ensembles) if self.verbosity: print ('Routings indices:', indices) workers = len (ensembles) if self.verbosity: print ('Routings workers:', workers) # maximal number of particles per ensemble limit = math.ceil(float(len(indices)) / workers) if self.verbosity: print ('Routings limit:', limit) # construct sources dictionary based on indices from ensembles sources = {} # source indexes ensembles, tells which subset of ensembles we process for source, ensemble in enumerate(ensembles): for index in ensemble: sources[index] = source if self.verbosity: print ('Routings sources', sources) # reset ensembles ensembles = [[] for worker in range(workers)] # initialize current loads loads = numpy.zeros(workers) # process all indices (sorted, to enable caching) # REMARK: does not include particle removal routings = [[] for worker in range(workers)] # first traversal for particles that do NOT need to be moved # this is because indices contains only particles that survive # and we check loads, i.e, we do not move survived particles # up to the saturation of the worker they belonged to remaining = [] # index here is id of future particles for reindex, index in enumerate(sorted(indices)): # determine particle source: worker that runned particle "index" source = sources[index] # check load of the source if loads[source] < limit: # don't move particle destination = source # append routing for this particle - reindex is new particle id routings[source] += [(index, source, destination, reindex)] # increase destination-worker load: dest = source loads[destination] += 1 # update ensembles ensembles[destination] += [reindex] # otherwise, store particle index for the second traversal else: remaining += [(reindex, index)] if self.verbosity: print ('Routings intermediate loads', loads) print ('Routings intermediate ensembles', ensembles) print ('Routings intermediate routings', routings) # second traversal for particles that DO need to be moved cached_index = None cached_destination = None for reindex, index in remaining: # determine particle source source = sources[index] # if particle index was already proccessed and cached destination # is not yet full, use it - minimize moves to different workers if index == cached_index and loads[cached_destination] < limit: destination = cached_destination # otherwise, find a new destination for this particle, closest to the source else: for i in range(int(math.floor(workers / 2))): right = (source + i + 1) % workers left = (source - i - 1) % workers if loads[right] < limit: destination = right break if loads[left] < limit: destination = left break # cache particle index and destination cached_index = index cached_destination = destination # append routing information for the worker that is a source for this particle routings [source] += [(index, source, destination, reindex)] # append routing information for the worker that is a destination for this particle routings [destination] += [(index, source, destination, reindex)] # increase destination worker load loads [destination] += 1 # update ensembles ensembles [destination] += [reindex] if self.verbosity: print ('Routings final loads', loads) print ('Routings final ensembles', ensembles) print ('Routings final routings', routings) return ensembles, routings