蓝山帝景
以下代码可以使用支持和不支持out参数的随机生成器。虽然通常使用out参数会加快执行速度,但即使在没有并行执行的情况下,您也可以从使用并行执行中获得一些好处。import osimport concurrent.futuresimport numpy as npdef _rg_size(bit_gen, dist, num, *args, **kwargs): return getattr(np.random.Generator(bit_gen), dist)(size=num, *args, **kwargs)def _rg_out(bit_gen, dist, out, *args, **kwargs): return getattr(np.random.Generator(bit_gen), dist)(out=out, *args, **kwargs)def splitter(num, num_chunks): chunk_size = num // num_chunks + (1 if num % num_chunks else 0) while num > chunk_size: num -= chunk_size yield chunk_size yield numdef slicing_splitter(num, num_chunks): chunk_size = num // num_chunks + (1 if num % num_chunks else 0) i = 0 remaining = num while remaining > chunk_size: remaining -= chunk_size yield slice(i, i + chunk_size) i += chunk_size yield slice(i, num)def new_rgs(rg): while True: new_rg = rg.jumped() yield new_rg rg = new_rgdef glue(arrs, size, num_arrs=None): if num_arrs is None and hasattr(arrs, __len__): num_arrs = len(arrs) slicings = slicing_splitter(size, num_arrs) arrs = iter(arrs) arr = next(arrs) slicing = next(slicings) out = np.empty(size, dtype=arr.dtype) out[slicing] = arr for arr, slicing in zip(arrs, slicings): out[slicing] = arr return outdef parallel_rand_gen( num=None, dist='standard_normal', bit_gen=None, seed=None, out=None, num_workers=None, split_factor=1, *args, **kwargs): if num_workers is None: num_workers = min(32, os.cpu_count() + 1) if bit_gen is None: bit_gen = np.random.PCG64(seed) if out is not None: shape = out.shape out = out.ravel() num = out.size elif num is None: raise ValueError('Either `num` or `out` must be specified.') with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: num_splits = split_factor * num_workers if out is None: futures = [ executor.submit(_rg_size, rg, dist, n, *args, **kwargs) for rg, n in zip(new_rgs(bit_gen), splitter(num, num_splits))] concurrent.futures.wait(futures) result = (future.result() for future in futures) # out = np.concatenate(tuple(result)) # slower alternative out = glue(result, num, num_splits) else: futures = [ executor.submit(_rg_out, rg, dist, out[slicing], *args, **kwargs) for rg, slicing in zip(new_rgs(bit_gen), slicing_splitter(num, num_splits))] concurrent.futures.wait(futures) out = out.reshape(shape) return outprint(parallel_rand_gen(17))# [ 0.96710075 2.2935126 0.35537793 0.5825714 2.14440658 0.64483092# 0.54062617 0.44907003 0.11947266 -0.60748694 -0.52509115 0.62924905# 0.51714721 0.29864705 -0.46105766 -0.271093 0.33055528]为此standard_normal:n = 10000001%timeit parallel_rand_gen(n)# 89.3 ms ± 1.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)%timeit out = np.empty(n); parallel_rand_gen(out=out)# 74.6 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)%timeit rg = np.random.Generator(np.random.PCG64()); rg.standard_normal(n)# 181 ms ± 7.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)并且,对于binomial,这得到:n = 10000001%timeit parallel_rand_gen(n, 'binomial', n=100, p=0.5)# 480 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)%timeit rg = np.random.Generator(np.random.PCG64()); rg.binomial(100, 0.5, n)# 1.17 s ± 35.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)(在 4 核 6 岁笔记本电脑上测试。)
梵蒂冈之花
根据您提供的示例,我创建了代码:from numpy.random import Generator, PCG64import multiprocessingimport concurrent.futuresimport numpy as np# to calculate the bernoulli randomnessfrom scipy.stats import bernoulli# use this to see the resultsimport matplotlib.pyplot as plt#benchmark the multi threadingfrom time import timeclass MultithreadedRNG(object): def __init__(self, n, seed=None, number_of_threads=None): rg = PCG64(seed) if number_of_threads is None: number_of_threads = multiprocessing.cpu_count() self.number_of_threads = number_of_threads self._random_generators = [rg] last_rg = rg for _ in range(0, number_of_threads-1): new_rg = last_rg.jumped() self._random_generators.append(new_rg) last_rg = new_rg self.n = n self.executor = concurrent.futures.ThreadPoolExecutor(number_of_threads) # use this object to multithread self.value_array = np.empty(n) # reserve the array memory self.step = np.ceil(n / number_of_threads).astype(np.int_) # round up to get the number of steps def _thread_fill(self, rg, out, first, last): p = 0.3 # x = np.random.randn(N_points) # this uses a normal distribution self.value_array[first:last] = bernoulli.rvs(p, size=len(out[first:last])) #self.value_array[first:last] = np.random.standard_normal(len(out[first:last])) def fill(self): futures = {} for i in range(self.number_of_threads): args = ( self._thread_fill, self._random_generators[i], self.value_array, i * self.step, (i + 1) * self.step ) # this is a simple object to signal is complete futures[self.executor.submit(*args)] = i # wait for all the proccess to finish concurrent.futures.wait(futures) def __del__(self): self.executor.shutdown(False)if __name__ == "__main__": arr_size = 1000000 # populate using multi thread mrng = MultithreadedRNG(arr_size, seed=0) multi_thread_time1 = time() mrng.fill() mrng.__del__() print("Multi thread time: ", time() - multi_thread_time1) # populate using single thread single_thread_time1 = time() vec = np.random.standard_normal(arr_size) print("Single thread time: ", time() - single_thread_time1) # see the results print("Results: ", mrng.value_array) fig, axs = plt.subplots(2, sharex=False) axs[0].hist(vec, bins=30) axs[0].set_title('Standard distribution') axs[1].hist(mrng.value_array, bins=30) axs[1].set_title('Bernouli distribution') fig.tight_layout() plt.show()然后您可以更改伯努利分布的“p”值。运行示例如下所示: