Source code for spux.executors.balancers.balancer

# # # # # # # # # # # # # # # # # # # # # # # # # #
# Balancer base class
#
# Jonas Sukys
# Eawag, Switzerland
# jonas.sukys@eawag.ch
# All rights reserved.
# # # # # # # # # # # # # # # # # # # # # # # # # #

import math
import numpy

[docs]class Balancer (object): """Base class for balancing network traffic due to killing and cloning (resampling) of particles.""" verbosity = 0 # compute sources from routings
[docs] def sources (self, routings): """Compute sources for the particles to be resampled according to the specified routings.""" # determine the total number of particles ceiling = numpy.max ( [ numpy.max ( [ reindex for index, source, destination, reindex in routing ] ) for routing in routings ] ) count = ceiling + 1 sources = numpy.empty (count, dtype=int) for routing in routings: for index, source, destination, reindex in routing: sources [reindex] = index return sources
# compute traffic from routings
[docs] def traffic (self, routings): """Compute network traffic (moves, copies, etc.) from routing of particles.""" # determine the total number of particles ceiling = numpy.max ( [ numpy.max ( [ reindex for index, source, destination, reindex in routing ] ) for routing in routings ] ) + 1 count = ceiling + 1 moves = [{} for address, routing in enumerate (routings)] costs = [{} for address, routing in enumerate (routings)] copys = [{} for address, routing in enumerate (routings)] inits = [0 for address, routing in enumerate (routings)] kills = [0 for address, routing in enumerate (routings)] for address, routing in enumerate (routings): for index, source, destination, reindex in routing: moves [address][index] = 0 costs [address][index] = 0 copys [address][index] = 0 for address, routing in enumerate (routings): for index, source, destination, reindex in routing: if source is None: inits [address] += 1 continue if destination is None: kills [address] += 1 continue if destination == address: if source == address: copys [address][index] += 1 continue if source != address: # if particle has not been moved yet, move it if moves [address][index] == 0: moves [address][index] = 1 costs [address][index] += math.fabs (source - destination) continue # particles are moved only once, then always copied if moves [address][index] == 1: copys [address][index] += 1 continue # first copy is not performed for address, copy in enumerate (copys): for index in copy: if copy [index] > 0: copy [index] -= 1 moves_total = numpy.sum ( [ numpy.sum (list (moves[address].values())) for address, routing in enumerate (routings) ] ) costs_total = numpy.sum ( [ numpy.sum (list (costs[address].values())) for address, routing in enumerate (routings) ] ) copys_total = numpy.sum ( [ numpy.sum (list (copys[address].values())) for address, routing in enumerate (routings) ] ) traffic = {} traffic ["init"] = numpy.sum (inits) / float(count) traffic ["move"] = moves_total / float(count) traffic ["cost"] = costs_total / float(count) traffic ["copy"] = copys_total / float(count) traffic ["kill"] = numpy.sum (kills) / float(count) return traffic