# -*- coding: utf-8 -*-
# written by Ralf Biehl at the Forschungszentrum Jülich ,
# Jülich Center for Neutron Science 1 and Institute of Complex Systems 1
# Jscatter is a program to read, analyse and plot data
# Copyright (C) 2015-2024 Ralf Biehl
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Functions for parallel computations on a single multicore machine using the standard library multiprocessing
and some related stuff.
"""
from math import log, floor, ceil, fmod
import scipy.constants as constants
import multiprocessing as mp
from multiprocessing import shared_memory
import numpy as np
try:
from .libs import fscatter
useFortran = True
except ImportError:
useFortran = False
_Nrand = np.random.random_integers
__all__ = ['haltonSequence', 'randomPointsOnSphere', 'randomPointsInCube',
'fibonacciLatticePointsOnSphere', 'sphereAverage', 'doForList',
'shared_create', 'shared_recover', 'shared_close']
[docs]
def haltonSequence(size, dim, skip=0):
"""
Pseudo random numbers from the Halton sequence in interval [0,1].
To use them as coordinate points transpose the array.
Parameters
----------
size : int
Samples from the sequence
dim : int
Dimensions
skip : int
Number of points to skip in Halton sequence .
Returns
-------
array
Notes
-----
The visual difference between pseudorandom and random in 2D.
See [2]_ for more details.
.. image:: ../../examples/images/comparisonRandom-Pseudorandom.jpg
:align: center
:height: 300px
:alt: comparisonRandom-Pseudorandom
Examples
--------
::
import jscatter as js
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i,color in enumerate(['b','g','r','y']):
# create halton sequence and shift it to needed shape
pxyz=js.formel.haltonSequence(400,3).T*2-1
ax.scatter(pxyz[:,0],pxyz[:,1],pxyz[:,2],color=color,s=20)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_zlim([-1,1])
plt.tight_layout()
plt.show(block=False)
References
----------
.. [1] https://mail.python.org/pipermail/scipy-user/2013-June/034741.html
Author: Sebastien Paris, Josef Perktold translation from c
.. [2] https://en.wikipedia.org/wiki/Low-discrepancy_sequence
"""
if useFortran:
return fscatter.pseudorandom.halton_sequence(skip + 1, skip + size, dim)
else:
size = size + skip
h = np.empty(size * dim)
h.fill(np.nan)
p = np.empty(size)
p.fill(np.nan)
P = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101]
logsize = log(size + 1)
for i in range(dim):
b = P[i]
n = int(ceil(logsize / log(b)))
for t in range(n):
p[t] = pow(b, -(t + 1))
for j in range(size):
d = j + 1
sum_ = fmod(d, b) * p[0]
for t in range(1, n):
d = floor(d / b)
sum_ += fmod(d, b) * p[t]
h[j * dim + i] = sum_
return h.reshape(size, dim).T[:, skip:]
[docs]
def randomPointsOnSphere(NN, r=1, skip=0):
r"""
N quasi random points on sphere of radius r based on low-discrepancy sequence.
For numerical integration quasi random numbers are better than random samples as
the error drops faster [1]_. Here we use the Halton sequence to generate the sequence.
Skipping points makes the sequence additive and does not repeat points.
Parameters
----------
NN : int
Number of points to generate.
r : float
Radius of sphere
skip : int
Number of points to skip in Halton sequence .
Returns
-------
array of [r,phi,theta] pairs in radians
References
----------
.. [1] https://en.wikipedia.org/wiki/Low-discrepancy_sequence
Examples
--------
A random sequence of points on sphere surface.
::
import jscatter as js
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i,color in enumerate(['b','g','r','y']):
points=js.formel.randomPointsOnSphere(400,skip=400*i)
points=points[points[:,1]>0,:]
pxyz=js.formel.rphitheta2xyz(points)
ax.scatter(pxyz[:,0],pxyz[:,1],pxyz[:,2],color=color,s=20)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_zlim([-1,1])
fig.axes[0].set_title('random points on sphere (half shown)')
plt.tight_layout()
plt.show(block=False)
#fig.savefig(js.examples.imagepath+'/randomPointsOnSphere.jpg')
.. image:: ../../examples/images/randomPointsOnSphere.jpg
:align: center
:height: 300px
:alt: randomPointsOnSphere
"""
NN = abs(NN)
seq = haltonSequence(NN + skip, 2)
return np.c_[(r * np.ones_like(seq[0]),
2 * np.pi * seq[0] - np.pi,
np.arccos(2 * seq[1] - 1))]
[docs]
def randomPointsInCube(NN, skip=0, dim=3):
r"""
N quasi random points in cube of edge 1 based on low-discrepancy sequence.
For numerical integration quasi random numbers are better than random samples as
the error drops faster [1]_. Here we use the Halton sequence to generate the sequence.
Skipping points makes the sequence additive and does not repeat points.
Parameters
----------
NN : int
Number of points to generate.
skip : int
Number of points to skip in Halton sequence .
dim : int, default 3
Dimension of the cube.
Returns
-------
array of [x,y,z]
References
----------
.. [1] https://en.wikipedia.org/wiki/Low-discrepancy_sequence
Examples
--------
The visual difference between pseudorandom and random in 2D.
See [1]_ for more details.
::
import jscatter as js
import matplotlib.pyplot as pyplot
fig = pyplot.figure(figsize=(10, 5))
fig.add_subplot(1, 2, 1, projection='3d')
fig.add_subplot(1, 2, 2, projection='3d')
js.sf.randomLattice([2,2],3000).show(fig=fig, ax=fig.axes[0])
fig.axes[0].set_title('random lattice')
js.sf.pseudoRandomLattice([2,2],3000).show(fig=fig, ax=fig.axes[1])
fig.axes[1].set_title('pseudo random lattice \n less holes more homogeneous')
fig.axes[0].view_init(elev=85, azim=10)
fig.axes[1].view_init(elev=85, azim=10)
#fig.savefig(js.examples.imagepath+'/comparisonRandom-Pseudorandom.jpg')
.. image:: ../../examples/images/comparisonRandom-Pseudorandom.jpg
:align: center
:height: 300px
:alt: comparisonRandom-Pseudorandom
Random cubes of random points in cube.
::
# random cubes of random points in cube
import jscatter as js
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
N=30
cubes=js.formel.randomPointsInCube(20)*3
for i,color in enumerate(['b','g','r','y','k']*3):
points=js.formel.randomPointsInCube(N,skip=N*i).T
pxyz=points*0.3+cubes[i][:,None]
ax.scatter(pxyz[0,:],pxyz[1,:],pxyz[2,:],color=color,s=20)
ax.set_xlim([0,3])
ax.set_ylim([0,3])
ax.set_zlim([0,3])
plt.tight_layout()
plt.show(block=False)
#fig.savefig(js.examples.imagepath+'/randomRandomCubes.jpg')
.. image:: ../../examples/images/randomRandomCubes.jpg
:align: center
:height: 300px
:alt: randomRandomCubes
"""
return haltonSequence(abs(NN), dim, skip).T
[docs]
def fibonacciLatticePointsOnSphere(NN, r=1):
"""
Fibonacci lattice points on a sphere with radius r (default r=1)
This can be used to integrate efficiently over a sphere with well distributed points.
Parameters
----------
NN : integer
number of points = 2*N+1
r : float, default 1
radius of sphere
Returns
-------
list of [r,phi,theta] pairs in radians
phi azimuth -pi<phi<pi; theta polar angle 0<theta<pi
References
----------
.. [1] Measurement of Areas on a Sphere Using Fibonacci and Latitude–Longitude Lattices
Á. González Mathematical Geosciences 42, 49-64 (2009)
Examples
--------
::
import jscatter as js
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
points=js.formel.fibonacciLatticePointsOnSphere(1000)
pp=list(filter(lambda a:(a[1]>0) & (a[1]<np.pi/2) & (a[2]>0) & (a[2]<np.pi/2),points))
pxyz=js.formel.rphitheta2xyz(pp)
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(pxyz[:,0],pxyz[:,1],pxyz[:,2],color="k",s=20)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_zlim([-1,1])
plt.tight_layout()
plt.show(block=False)
points=js.formel.fibonacciLatticePointsOnSphere(1000)
pp=list(filter(lambda a:(a[2]>0.3) & (a[2]<1) ,points))
v=js.formel.rphitheta2xyz(pp)
R=js.formel.rotationMatrix([1,0,0],np.deg2rad(30))
pxyz=np.dot(R,v.T).T
#points in polar coordinates
prpt=js.formel.xyz2rphitheta(np.dot(R,pxyz.T).T)
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(pxyz[:,0],pxyz[:,1],pxyz[:,2],color="k",s=20)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_zlim([-1,1])
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
plt.tight_layout()
plt.show(block=False)
"""
NN = int(NN)
return np.c_[(r * np.ones(2 * NN + 1),
np.mod((2 * np.pi * np.r_[-NN:NN + 1] / constants.golden) + np.pi, 2 * np.pi) - np.pi,
np.arcsin(2 * np.r_[-NN:NN + 1] / (2 * NN + 1.)) + np.pi / 2)]
def rphitheta2xyz(RPT):
"""
Transformation spherical coordinates [r,phi,theta] to cartesian coordinates [x,y,z]
Parameters
----------
RPT : array Nx3
| dim Nx3 with [r,phi,theta] coordinates
| r : float length
| phi : float azimuth -pi < phi < pi
| theta : float polar angle 0 < theta < pi
Returns
-------
Array with same dimension as RPT.
"""
rpt = np.array(RPT, ndmin=2)
xyz = np.zeros(rpt.shape)
xyz[:, 0] = rpt[:, 0] * np.cos(rpt[:, 1]) * np.sin(rpt[:, 2])
xyz[:, 1] = rpt[:, 0] * np.sin(rpt[:, 1]) * np.sin(rpt[:, 2])
xyz[:, 2] = rpt[:, 0] * np.cos(rpt[:, 2])
return np.array(xyz.squeeze(), ndmin=np.ndim(RPT))
# noinspection PyIncorrectDocstring
[docs]
def sphereAverage(funktion, relError=300, *args, **kwargs):
"""
Vectorized spherical average of funktion, parallel or non-parallel.
A Fibonacci lattice or Monte Carlo integration with pseudo random grid is used.
Parameters
----------
funktion : function
Function to evaluate.
Funktion first argument gets cartesian coordinate [x,y,z] of point on unit sphere.
relError : float, default 300
Determines how points on sphere are selected
- >1 Fibonacci Lattice with relError*2+1 points
- 0<1 Pseudo random points on sphere (see randomPointsOnSphere).
Stops if relative improvement in mean is less than relError
(uses steps of 40 or (20ncpu) new points).
Final error is (stddev of N points) /sqrt(N) as for Monte Carlo methods
even if it is not a correct 1-sigma error in this case.
This has to be used with care as convergence might be slow if *std* is
intrinsically large in calculated values.
ncpu : int, optional
Number of cpus in the pool if started as main process (otherwise single process).
- not given or 0 -> default, all cpus are used
- int>0 min (ncpu, mp.cpu_count)
- int<0 ncpu not to use
Please check if multiprocessing is really useful.
For simple calculations it's faster not to use it (avoiding pickle overhead).
The below first example is around 10times faster on a single core (ncpu=1).
Think about passPoints.
passPoints : bool
If the function accepts a Nx3 array of points these will be passed instead of single points.
The function return value should index the points result in axis 0.
This might speed up dependent on the function (e.g. formfactor prism) .
Sets ncpu=1 .
arg,kwargs :
forwarded to function
Returns
-------
array
Values from function and appended Monte Carlo error estimates.
- If an array of values is returned in function, the result is also an array with double size.
To separate use `values, error = res.reshape(2,-1)`
- If list of array is returned a list is returned with values and error.
Notes
-----
- Works also on single core machines and with single process (avoiding creation of a pool of workers).
- For integration over a continuous function as a form factor in scattering the random
points are not statistically independent. Think of neighbouring points on an isosurface
which are correlated and therefore the standard deviation is biased. In this case the
Fibonacci lattice is the better choice as the standard deviation in a random sample
is not a measure of error but more a measure of the differences on the isosurface.
Examples
--------
Parallel sphere average
Conversion to rpt coordinates and sum results in average value of these coordinate in result.
Error of radius is machine precision, phi,theta errors are typical Monte Carlo error
**The function should be used in a module** to work on Windows. Usage in a module is recomended like demonstarted in
:py:func:`doForList`
::
import jscatter as js
def f(x,r):
# return a list of values to integrate
return [js.formel.xyz2rphitheta(x)*r]
val, error = js.formel.sphereAverage(f,relError=500,r=1)
# => [1, 0, pi/2 ]
js.formel.sphereAverage(f,relError=0.01,r=1)
::
import jscatter as js
import numpy as np
def fp(singlepoint):
# return a list of values to integrate
return js.formel.xyz2rphitheta(singlepoint)[1:]
val,error = js.formel.sphereAverage(fp,relError=500).reshape(2,-1)
val,error = js.formel.sphereAverage(fp,relError=0.1).reshape(2,-1)
Pass all points to one process, not parallel.
::
def fps(allpoints,r):
res = js.formel.xyz2rphitheta(allpoints)
return res
val,error = js.formel.sphereAverage(fps,r=1,passPoints=1).reshape(2,-1)
"""
passPoints = kwargs.pop('passPoints', False)
if relError < 0:
relError = abs(relError)
if relError == 0:
print('Try again with relError > 0')
return
ncpu = mp.cpu_count()
# except force to something else
if 'ncpu' in kwargs:
if kwargs['ncpu'] < 0:
ncpu = max(ncpu + kwargs['ncpu'], 1)
elif kwargs['ncpu'] > 0:
ncpu = min(ncpu, kwargs['ncpu'])
del kwargs['ncpu']
if 'debug' in kwargs:
_ = kwargs.pop('debug', None)
ncpu=1
if (ncpu == 1
or mp.current_process().name != 'MainProcess'
or passPoints
or 'fork' not in mp.get_all_start_methods()):
# non parallel
if 0 < relError < 1:
steps = 40
points = rphitheta2xyz(randomPointsOnSphere(NN=steps, skip=0))
npoints = steps
if passPoints:
results = np.array(funktion(points, *args, **kwargs), ndmin=1)
else:
results = np.r_[[np.array(funktion(point, *args, **kwargs), ndmin=1) for point in points]]
prevmean = results.mean(axis=0).real
while 1:
points = rphitheta2xyz(randomPointsOnSphere(NN=steps, skip=npoints))
npoints += steps
if passPoints:
result = np.array(funktion(points, *args, **kwargs), ndmin=1)
else:
result = np.r_[[np.array(funktion(point, *args, **kwargs), ndmin=1) for point in points]]
results = np.r_[results, result]
mean = results.mean(axis=0).real
# var = results.var(axis=0,ddof=1)**0.5
if np.all(abs(mean - prevmean) < relError * abs(mean)):
break
prevmean = mean
elif relError >= 1:
qfib = fibonacciLatticePointsOnSphere(max(relError, 7), 1)
points = rphitheta2xyz(qfib) # to cartesian
if passPoints:
results = np.array(funktion(points, *args, **kwargs), ndmin=2)
else:
results = np.r_[[np.array(funktion(point, *args, **kwargs), ndmin=1) for point in points]]
return np.r_[results.mean(axis=0), results.std(axis=0) / np.sqrt(results.shape[0] - 1)]
else:
mode = 'fork' if 'fork' in mp.get_all_start_methods() and 0 else 'spawn'
with mp.get_context(mode).Pool(processes=ncpu) as pool:
steps = ncpu * 20
if 0 < abs(relError) < 1:
points = rphitheta2xyz(randomPointsOnSphere(NN=steps))
npoints = steps
jobs = [pool.apply_async(funktion, (point,) + args, kwargs) for point in points]
result = [np.asarray(job.get(0xFFFF)) for job in jobs]
prevmean = np.r_[result].mean(axis=0).real
# prevstd =result.std(axis=0).real
while 1:
points = rphitheta2xyz(randomPointsOnSphere(NN=steps, skip=npoints))
npoints += steps
jobs = [pool.apply_async(funktion, (point,) + args, kwargs) for point in points]
result += [np.asarray(job.get(0xFFFF)) for job in jobs]
results = np.array(result)
mean = results.mean(axis=0).real
# std=results.std(axis=0).real
if np.all(abs(1 - abs(prevmean / mean)) < relError):
# abs(results[:,0].std()/results[:,0].mean()/np.sqrt(results.shape[0]-1))<relError
# and results.shape[0]>42:
break
prevmean = mean
# prevstd = std
elif relError > 1:
qfib = fibonacciLatticePointsOnSphere(relError, 1)
points = rphitheta2xyz(qfib) # to cartesian
jobs = [pool.apply_async(funktion, (point,) + args, kwargs) for point in points]
results = np.r_[[np.asarray(job.get(0xFFFF)) for job in jobs]]
return np.r_[results.mean(axis=0), results.std(axis=0) / np.sqrt(np.shape(results[:, 0])[0])]
# noinspection PyIncorrectDocstring
[docs]
def doForList(funktion, looplist, *args, **kwargs):
"""
Parallel execution of a function in a pool of workers to speed up (multiprocessing).
Apply function with values in looplist in a pool of workers in parallel using multiprocessing.
Like multiprocessing map_async but distributes automatically all given arguments.
Parameters
----------
funktion : function
Function to process with arguments (args, loopover[i]=looplist[j,i], kwargs)
Return value of function should contain parameters or at least the loopover value to allow a check, if desired.
loopover : list of string, default= None
Names of arguments to use for (sync) looping over with values in looplist.
- If not given the first funktion argument is used.
- If loopover is single argument this gets looplist[i,:]
- Two arguments to loopover : ``loopover=['q', 'iq']``
looplist : list or list of tuples with len(loopover)
List of values to loop over.
- single argument just use a list ``[0.01,0.1,0.2,0.3,0.4]``
- for above 'q', 'iq' maybe ``looplist=[(q, i) for i, q in enumerate([0.01,0.1,0.2,0.3,0.4])]``
ncpu : int, optional
Number of cpus in the pool if started as main process (otherwise single process).
- not given or 0 -> all cpus are used
- int>0 min (ncpu, mp.cpu_count)
- int<0 ncpu not to use
cb : None, function
Callback after each calculation.
debug : int
debug > 0 allows serial output for testing.
output : bool
If False no output is shown.
Returns
-------
list : list of function return values as [result1,result2,.....]
The order of return values is not explicitly synced to looplist.
The return array may be prepended with the value looplist[i] as reference.
E.g.::
def f(x,a,b,c,d):
result = x+a+b+c+d
return [x, result]
Notes
-----
Functions for parallel computations on a single multicore machine using the standard library multiprocessing.
Not the programming details, but the way how to speed up some things.
- If your computation is already fast (e.g. <1s) go on without parallelisation.
In an optimal case you gain a speedup as the number of cpu cores.
- If you want to use a cluster with all cpus, this is not the way (you need MPI).
Parallelisation is no magic and this module is for convenience for non-specialist of parallel computing.
The main thing is to pass additional parameters to the processes (a pool of workers)
and loop only over one parameter given as list.
Opening and closing of the pool is hidden in the function.
In this way we can use a multicore machine with all cpus.
During testing, I found that shared memory does not really speed up in most cases,
if we just want to calculate a function e.g. for a list of different
Q values dependent on model parameters. Here the pickling of numpy arrays is
efficient enough compared to the computation we do.
The amount of data pickled should not be too large as each process
gets a copy and pickling needs time.
If speed is an issue and shared memory gets important i advice using Fortran with OpenMP
as used for ff.cloudScattering with parallel computation and shared memory.
For me this was easier than the different solutions around.
We use here only non-modified input data and return a new dataset,
so we don't need to care about what happens if one process changes the data needed
in another process (race conditions,...), anyway it's not shared.
Please keep this in mind and don't complain if you find a way to modify input data.
For easier debugging (to find the position of an error in the pdb debugger) use the option debug.
In this case the multiprocessing is not used and the debugger finds the error correctly.
**Shared memory**
Shared memory can be created and used with :py:func:`~jscatter.parallel.shared_create` ,
:py:func:`~jscatter.parallel.shared_recover` and :py:func:`~jscatter.parallel.shared_close`.
See notes and example in :py:func:`~jscatter.parallel.shared_create`.
Examples
--------
**Using in a module**
This is the prefered and intended way for usage that works on Linux, macOS and also on Windows.
Write a module like the following and name the file *myfunctions.py* .
::
import jscatter as js
import numpy as np
def f(x,a,b,c,d):
res=x+a+b+c+d
return [x,res]
def f2(x,a,b,c,d):
res=x[0]+x[1]+a+b+c+d
return [x[0],res]
def mycomplicatedModel(N,a,b,c,d):
# using a list of 2 values for x (is first argument)
loop = np.arange(N).reshape(-1,2) # has 2 values in second dimension
res = js.formel.doForList(f2,looplist=loop,a=a,b=b,c=c,d=d)
return res
Create a working script where you use your functions defined in above module or do this on the command line.
::
import myfunctions as mf
res = mf.mycomplicatedModel(100,a=1,b=2,c=3,d=11)
**Using in a simple script**
Not recommended! Already repeating is only possible with *__spec__=None*
This is what you find looking for multiprocessing in Windows as a typical usage pattern
This is only neccessary on Windows but not on Linux or macOS
Take the following and paste it to *script.py*
In a python shell do *run scripy.py*
The calculation is encapsulated in the *if* clause and only executed on the main level.
This *if* line is not needed for Linux or macOS
::
import jscatter as js
import numpy as np
def f(x,a,b,c,d):
res=x+a+b+c+d
return [x,res]
def f2(x,a,b,c,d):
res=x[0]+x[1]+a+b+c+d
return [x[0],res]
if __name__ == '__main__':
__spec__ = None
# loop over first argument, here x
res = js.formel.doForList(f,looplist=np.arange(100),a=1,b=2,c=3,d=11)
# loop over 'd' ignoring the given d=11 (which can be omitted here)
res = js.formel.doForList(f,looplist=np.arange(100),loopover='d',x=0,a=1,b=2,c=3,d=11)
# using a list of 2 values for x (is first argument)
loop = np.arange(100).reshape(-1,2) # has 2 values fin second dimension
res = js.formel.doForList(f2,looplist=loop,a=1,b=2,c=3,d=11)
# looping over several variables in sync
loop = np.arange(100).reshape(-1,2)
res = js.formel.doForList(f2,looplist=loop,loopover=['a','b'],x=[100,200],a=1,b=2,c=3,d=11)
"""
output = kwargs.pop('output', True)
looplen = len(looplist)
ncpu = mp.cpu_count()
if 'ncpu' in kwargs:
if kwargs['ncpu'] < 0:
ncpu = max(ncpu + kwargs['ncpu'], 1)
elif kwargs['ncpu'] > 0:
ncpu = min(ncpu, kwargs['ncpu'])
del kwargs['ncpu']
# use not more cpu than elemtents in looplist
ncpu = min(ncpu, looplen)
# check for callback
cb = kwargs.pop('cb', None)
# not given defaults to first varname in funktion
loopover = kwargs.pop('loopover', funktion.__code__.co_varnames[0])
if (('debug' in kwargs and kwargs['debug'] > 0)
or ncpu == 1
or mp.current_process().name != 'MainProcess')\
or 'fork' not in mp.get_all_start_methods():
# in sequence for testing purposes
_ = kwargs.pop('debug', None)
res = []
if output:
print('start NO pool')
if isinstance(loopover, str):
for Q in looplist:
res.append(funktion(*args, **dict(kwargs, **{loopover: Q})))
if cb is not None: cb(res[-1])
elif isinstance(loopover, (list, tuple)) and (len(loopover) == 1):
for Q in looplist:
res.append(funktion(*args, **dict(kwargs, **{loopover[0]: Q})))
if cb is not None: cb(res[-1])
else:
for Q in looplist:
res.append(funktion(*args, **dict(kwargs, **dict(zip(loopover, Q)))))
if cb is not None: cb(res[-1])
else: # in parallel for production run
_ = kwargs.pop('debug', None)
if output:
print('start pool of ', ncpu)
mode = 'fork' if 'fork' in mp.get_all_start_methods() else 'spawn'
with mp.get_context(mode).Pool(ncpu) as pool:
# we need to create a temporary dict that is given to a job and is not only a view to the updated kwargs
# this is needed as the view changes to each new value in the loop even in the jobs
if isinstance(loopover, (list, tuple)) and (len(loopover) == 1):
loopover = loopover[0]
if isinstance(loopover, str):
# a single parameter name
jobs = [pool.apply_async(funktion, args, dict(kwargs, **{loopover: Q}), callback=cb)
for Q in looplist]
else:
# several parameter names to sync loop
jobs = [pool.apply_async(funktion, args, dict(kwargs, **dict(zip(loopover, Q))), callback=cb)
for Q in looplist]
# get results
res = [job.get() for job in jobs]
if output:
print('closed pool again')
return res
[docs]
def shared_create(ar):
"""
Create shared memory array before sending into pool of workers or usage in doForList .
Creation of shared memory for numpy arrays as described in
https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory
Parameters
----------
ar : array
Numpy array to copy into a shared memory array.
Returns
-------
sharedMemory, tuple with (namestring, ar.shape, ar.dtype)
- sharedMemory is the shared memory
- tuple is information to recover the original array, name is a unique identifier
The tuple needs to be passed to the function in doForList instead of ar.
Examples
--------
Take the following and paste it to *script.py*
In a python shell do *run scripy.py*
In this way the fucntions are defined and can be accessed.
Using this in imported scripts is not necessary.
::
import jscatter as js
import numpy as np
def model(q, ar_id):
# recover the array from shared memory
ar_sh, ar = js.formel.shared_recover(ar_id)
# here we do a difficult calculation for a single q value but the whole array
sum = np.sum(q*ar)
ar_sh.close()
return sum
if __name__ == '__main__':
__spec__ = None
qlist=np.r_[0.1:10:0.1]
# example grid with about 1e6 points
cloud = js.ff.superball(1, 10, p=2, nGrid=100, returngrid=True).XYZ
# transfer large grid to shared memory
cloud_sh, cloud_id = js.formel.shared_create(cloud)
# do the calculation for a qlist values
res = js.formel.doForList(model, qlist, loopover='q', ar=cloud_id)
# always free the shared memory (close and unlink)
js.formel.shared_close(cloud_sh)
Notes
-----
This function (with
:py:func:`~jscatter.parallel.shared_recover` and :py:func:`~jscatter.parallel.shared_close`) is more for experts.
The speedup e.g. in cloudScattering is not huge as the main time is consumed in calculations.
Also creating shared memory (~8ms for 1e6x3 array) takes around the same time as
numpy array pickling (~3ms) for transfer to a pool of workers.
Inside the pool the recover is much faster (~ 47µs) compared to unpickling (1.2.ms)
The speedup will be more important if e.g. the qlist (see example) is huge and the calculations are short.
In this case data transfer will be more important than the calculation.
"""
assert isinstance(ar, np.ndarray), 'This method accepts only numpy arrays.'
assert mp.current_process().name == 'MainProcess', 'shared memory creation only in main process. '
# create shared memory
shm = shared_memory.SharedMemory(create=True, size=ar.nbytes)
# create array using the previous buffer
shared_ar = np.ndarray(ar.shape, dtype=ar.dtype, buffer=shm.buf)
# fill with data
shared_ar[:] = ar[:]
return shm, (shm.name, ar.shape, ar.dtype)
[docs]
def shared_recover(shm_id):
"""
Recover shared memory array inside a function evaluated in a pool of workers.
Parameters
----------
shm_id : tuple created by shared_create
Returns
-------
sharedMemory, ndarray
"""
# recover the shared memory bu name
shm = shared_memory.SharedMemory(name=shm_id[0])
# create array using the buffer
ar = np.ndarray(shm_id[1], dtype=shm_id[2], buffer=shm.buf)
return shm, ar
[docs]
def shared_close(shm):
"""
Close and unlink shared memory array.
Parameters
----------
shm : sharedMemory
SharedMemory created by shared_create.
"""
# access buffer using its name
shm.close()
if mp.current_process().name == 'MainProcess':
shm.unlink()