在 numpy 二项式函数上设置输出矩阵

我想在我的项目中使用以下示例,它使用多个线程用随机值填充数组。 https://numpy.org/doc/1.18/reference/random/multithreading.html

但是,我想使用二项分布而不是标准正态分布。我的问题是numpy.random.Generator.binomial方法没有放置结果的“out”参数(如 standard_normal 方法)。这意味着我将不得不将给我的输出矩阵复制到我的矩阵中,这会大大降低性能。

是否有替代方法可以解决此问题?

如果这有帮助,我实际上需要伯努利分布,即二项分布中的 n=1(但任意 p)。


紫衣仙女
浏览 105回答 3
3回答

蓝山帝景

以下代码可以使用支持和不支持out参数的随机生成器。虽然通常使用out参数会加快执行速度,但即使在没有并行执行的情况下,您也可以从使用并行执行中获得一些好处。import osimport concurrent.futuresimport numpy as npdef _rg_size(bit_gen, dist, num, *args, **kwargs):    return getattr(np.random.Generator(bit_gen), dist)(size=num, *args, **kwargs)def _rg_out(bit_gen, dist, out, *args, **kwargs):    return getattr(np.random.Generator(bit_gen), dist)(out=out, *args, **kwargs)def splitter(num, num_chunks):    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)    while num > chunk_size:        num -= chunk_size        yield chunk_size    yield numdef slicing_splitter(num, num_chunks):    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)    i = 0    remaining = num    while remaining > chunk_size:        remaining -= chunk_size        yield slice(i, i + chunk_size)        i += chunk_size    yield slice(i, num)def new_rgs(rg):    while True:        new_rg = rg.jumped()        yield new_rg        rg = new_rgdef glue(arrs, size, num_arrs=None):    if num_arrs is None and hasattr(arrs, __len__):        num_arrs = len(arrs)    slicings = slicing_splitter(size, num_arrs)    arrs = iter(arrs)    arr = next(arrs)    slicing = next(slicings)    out = np.empty(size, dtype=arr.dtype)    out[slicing] = arr    for arr, slicing in zip(arrs, slicings):        out[slicing] = arr    return outdef parallel_rand_gen(        num=None,        dist='standard_normal',        bit_gen=None,        seed=None,        out=None,        num_workers=None,        split_factor=1,        *args,        **kwargs):    if num_workers is None:        num_workers = min(32, os.cpu_count() + 1)    if bit_gen is None:        bit_gen = np.random.PCG64(seed)    if out is not None:        shape = out.shape        out = out.ravel()        num = out.size    elif num is None:        raise ValueError('Either `num` or `out` must be specified.')    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:        num_splits = split_factor * num_workers        if out is None:            futures = [                executor.submit(_rg_size, rg, dist, n, *args, **kwargs)                for rg, n in zip(new_rgs(bit_gen), splitter(num, num_splits))]            concurrent.futures.wait(futures)            result = (future.result() for future in futures)            # out = np.concatenate(tuple(result))  # slower alternative            out = glue(result, num, num_splits)        else:            futures = [                executor.submit(_rg_out, rg, dist, out[slicing], *args, **kwargs)                for rg, slicing in zip(new_rgs(bit_gen), slicing_splitter(num, num_splits))]            concurrent.futures.wait(futures)            out = out.reshape(shape)    return outprint(parallel_rand_gen(17))# [ 0.96710075  2.2935126   0.35537793  0.5825714   2.14440658  0.64483092#   0.54062617  0.44907003  0.11947266 -0.60748694 -0.52509115  0.62924905#   0.51714721  0.29864705 -0.46105766 -0.271093    0.33055528]为此standard_normal:n = 10000001%timeit parallel_rand_gen(n)# 89.3 ms ± 1.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)%timeit out = np.empty(n); parallel_rand_gen(out=out)# 74.6 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)%timeit rg = np.random.Generator(np.random.PCG64()); rg.standard_normal(n)# 181 ms ± 7.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)并且,对于binomial,这得到:n = 10000001%timeit parallel_rand_gen(n, 'binomial', n=100, p=0.5)# 480 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)%timeit rg = np.random.Generator(np.random.PCG64()); rg.binomial(100, 0.5, n)# 1.17 s ± 35.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)(在 4 核 6 岁笔记本电脑上测试。)

梵蒂冈之花

根据您提供的示例,我创建了代码:from numpy.random import Generator, PCG64import multiprocessingimport concurrent.futuresimport numpy as np# to calculate the bernoulli randomnessfrom scipy.stats import bernoulli# use this to see the resultsimport matplotlib.pyplot as plt#benchmark the multi threadingfrom time import timeclass MultithreadedRNG(object):    def __init__(self, n, seed=None, number_of_threads=None):        rg = PCG64(seed)        if number_of_threads is None:            number_of_threads = multiprocessing.cpu_count()        self.number_of_threads = number_of_threads        self._random_generators = [rg]        last_rg = rg        for _ in range(0, number_of_threads-1):            new_rg = last_rg.jumped()            self._random_generators.append(new_rg)            last_rg = new_rg        self.n = n        self.executor = concurrent.futures.ThreadPoolExecutor(number_of_threads) # use this  object to multithread        self.value_array = np.empty(n) # reserve the array memory        self.step = np.ceil(n / number_of_threads).astype(np.int_) # round up to get the number of steps    def _thread_fill(self, rg, out, first, last):        p = 0.3        # x = np.random.randn(N_points) # this uses a normal distribution        self.value_array[first:last] = bernoulli.rvs(p, size=len(out[first:last]))        #self.value_array[first:last] = np.random.standard_normal(len(out[first:last]))    def fill(self):        futures = {}        for i in range(self.number_of_threads):            args = (                self._thread_fill,                self._random_generators[i],                self.value_array,                i * self.step,                (i + 1) * self.step                )            # this is a simple object to signal is complete            futures[self.executor.submit(*args)] = i        # wait for all the proccess to finish        concurrent.futures.wait(futures)    def __del__(self):        self.executor.shutdown(False)if __name__ == "__main__":    arr_size = 1000000    # populate using multi thread    mrng = MultithreadedRNG(arr_size, seed=0)    multi_thread_time1 = time()    mrng.fill()    mrng.__del__()    print("Multi thread time: ", time() - multi_thread_time1)    # populate using single thread    single_thread_time1 = time()    vec = np.random.standard_normal(arr_size)    print("Single thread time: ", time() - single_thread_time1)    # see the results    print("Results: ", mrng.value_array)    fig, axs = plt.subplots(2, sharex=False)    axs[0].hist(vec, bins=30)    axs[0].set_title('Standard distribution')    axs[1].hist(mrng.value_array, bins=30)    axs[1].set_title('Bernouli distribution')    fig.tight_layout()    plt.show()然后您可以更改伯努利分布的“p”值。运行示例如下所示:

GCT1015

out如果你想在numpy.random.Generator.random中绘制统一的浮点数,则有一个参数[0.0,1.0)。然后你可以只使用:def&nbsp;bernoulli(shape,&nbsp;p): &nbsp;&nbsp;&nbsp;&nbsp;U&nbsp;=&nbsp;uniform(shape)&nbsp;#&nbsp;can&nbsp;use&nbsp;multithreading &nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;U&nbsp;<&nbsp;p&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;#&nbsp;should&nbsp;be&nbsp;fast&nbsp;enough
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python