如何在python中矢量化包含eigvalsh的复杂代码

我有以下代码(对不起,它不是太小,我已经尝试从原始代码中减少它)。


基本上,我在运行以下eval_s()方法/函数时遇到了性能问题:


1) 找到 4x4 厄米矩阵的 4 个特征值 eigvalsh()


2)将特征值的倒数和成一个变量result


3) 我对许多由 参数化的矩阵重复步骤 1 和 2 x,y,z,将累积和存储在 中result。


我在第 3 步中重复计算(查找特征值和求和)的次数取决于ksep我的代码中的一个变量,我需要这个数字在我的实际代码中增加(即,ksep必须减少)。但是计算中eval_s()有一个 for 循环x,y,z,我猜这确实会减慢速度。[试着ksep=0.5明白我的意思。]


有没有办法向量化我的示例代码中指示的方法(或者一般来说,涉及查找参数化矩阵的特征值的函数)?


代码:


import numpy as np

import sympy as sp

import itertools as it

from sympy.abc import x, y, z



class Solver:

    def __init__(self, vmat):

        self._vfunc = sp.lambdify((x, y, z),

                                  expr=vmat,

                                  modules='numpy')

        self._q_count, self._qs = None, []  # these depend on ksep!


    ################################################################

    # How to vectorize this?

    def eval_s(self, stiff):

        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"

        result = 0

        for k in self._qs:

            evs = np.linalg.eigvalsh(self._vfunc(*k))

            result += np.sum(np.divide(1., (stiff + evs)))

        return result.real - 4 * self._q_count

    ################################################################


    def populate_qs(self, ksep: float = 1.7):

        self._qs = [(kx, ky, kz) for kx, ky, kz

                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))]

        self._q_count = len(self._qs)

ps 代码的 sympy 部分可能看起来很奇怪,但它在我的原始代码中起到了作用。


阿波罗的战车
浏览 166回答 2
2回答

三国纷争

您可以,方法如下:def eval_s_vectorized(self, stiff):    assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"    mats = np.stack([self._vfunc(*k) for k in self._qs], axis=0)    evs = np.linalg.eigvalsh(mats)    result = np.sum(np.divide(1., (stiff + evs)))    return result.real - 4 * self._q_count这仍然使 Sympy 表达式的评估未向量化。这部分矢量化有点棘手,主要是因为1输入矩阵中的s。您可以通过修改来制作代码的完全矢量化版本,Solver以便用数组常量替换标量常量vmat:import itertools as itimport numpy as npimport sympy as spfrom sympy.abc import x, y, zfrom sympy.core.numbers import Numberfrom sympy.utilities.lambdify import implemented_functionxones = implemented_function('xones', lambda x: np.ones(len(x)))lfuncs = {'xones': xones}def vectorizemat(mat):    ret = mat.copy()    # get the first element of the set of symbols that mat uses    for x in mat.free_symbols: break    for i,j in it.product(*(range(s) for s in mat.shape)):        if isinstance(mat[i,j], Number):            ret[i,j] = xones(x) * mat[i,j]    return retclass Solver:    def __init__(self, vmat):        self._vfunc = sp.lambdify((x, y, z),                                  expr=vectorizemat(vmat),                                  modules=[lfuncs, 'numpy'])        self._q_count, self._qs = None, []  # these depend on ksep!    def eval_s_vectorized_completely(self, stiff):        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"        evs = np.linalg.eigvalsh(self._vfunc(*self._qs.T).T)        result = np.sum(np.divide(1., (stiff + evs)))        return result.real - 4 * self._q_count    def populate_qs(self, ksep: float = 1.7):        self._qs = np.array([(kx, ky, kz) for kx, ky, kz                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))])        self._q_count = len(self._qs)测试/时间对于小型ksep矢量化版本比原始版本快约 2 倍,完全矢量化版本比原始版本快约 20 倍:# old version for ksep=.3import timeitprint(timeit.timeit("test()", setup="from __main__ import test", number=10))-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882-85240.46154500882118.42847006605007# vectorized version for ksep=.3import timeitprint(timeit.timeit("test()", setup="from __main__ import test", number=10))-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.4615449836764.95763925800566# completely vectorized version for ksep=.3import timeitprint(timeit.timeit("test()", setup="from __main__ import test", number=10))-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.461544983675.648927717003971矢量化版本结果中的舍入误差与原始结果略有不同。这大概是由于result计算总和的方式不同造成的。

撒科打诨

已经完成了大部分工作。以下是如何在 20 倍的基础上再获得 2 倍的加速。手动做线性代数。当我尝试这样做时,我很震惊 numpy 在小矩阵上是多么浪费:>>> from timeit import timeit# using eigvalsh>>> print(timeit("test(False, 0.1)", setup="from __main__ import test", number=3))-2301206.495955009-2301206.495955009-2301206.49595500955.794611917983275>>> print(timeit("test(False, 0.3)", setup="from __main__ import test", number=5))-85240.46154498367-85240.46154498367-85240.46154498367-85240.46154498367-85240.461544983673.400342195003759# by hand>>> print(timeit("test(True, 0.1)", setup="from __main__ import test", number=3))-2301206.495955076-2301206.495955076-2301206.49595507626.67294767702697>>> print(timeit("test(True, 0.3)", setup="from __main__ import test", number=5))-85240.46154498379-85240.46154498379-85240.46154498379-85240.46154498379-85240.461544983791.5047460949863307请注意,部分加速可能被共享代码掩盖了,仅在线性代数上似乎更多,尽管我没有仔细检查。一个警告:我在矩阵的 2by2 分割上使用 Schur 补码来计算逆矩阵的对角元素。如果 Schur 补集不存在,即如果左上角或右下角子矩阵不可逆,这将失败。这是修改后的代码:import itertools as itimport numpy as npimport sympy as spfrom sympy.abc import x, y, zfrom sympy.core.numbers import Numberfrom sympy.utilities.lambdify import implemented_functionxones = implemented_function('xones', lambda x: np.ones(len(x)))lfuncs = {'xones': xones}def vectorizemat(mat):&nbsp; &nbsp; ret = mat.copy()&nbsp; &nbsp; for x in mat.free_symbols: break&nbsp; &nbsp; for i,j in it.product(*(range(s) for s in mat.shape)):&nbsp; &nbsp; &nbsp; &nbsp; if isinstance(mat[i,j], Number):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ret[i,j] = xones(x) * mat[i,j]&nbsp; &nbsp; return retclass Solver:&nbsp; &nbsp; def __init__(self, vmat):&nbsp; &nbsp; &nbsp; &nbsp; vmat = vectorizemat(vmat)&nbsp; &nbsp; &nbsp; &nbsp; self._vfunc = sp.lambdify((x, y, z),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; expr=vmat,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; modules=[lfuncs, 'numpy'])&nbsp; &nbsp; &nbsp; &nbsp; self._q_count, self._qs = None, []&nbsp; # these depend on ksep!&nbsp; &nbsp; def eval_s_vectorized_completely(self, stiff):&nbsp; &nbsp; &nbsp; &nbsp; assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"&nbsp; &nbsp; &nbsp; &nbsp; mats = self._vfunc(*self._qs.T).T&nbsp; &nbsp; &nbsp; &nbsp; evs = np.linalg.eigvalsh(mats)&nbsp; &nbsp; &nbsp; &nbsp; result = np.sum(np.divide(1., (stiff + evs)))&nbsp; &nbsp; &nbsp; &nbsp; return result.real - 4 * self._q_count&nbsp; &nbsp; def eval_s_pp(self, stiff):&nbsp; &nbsp; &nbsp; &nbsp; assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"&nbsp; &nbsp; &nbsp; &nbsp; mats = self._vfunc(*self._qs.T).T&nbsp; &nbsp; &nbsp; &nbsp; np.einsum('...ii->...i', mats)[...] += stiff&nbsp; &nbsp; &nbsp; &nbsp; (A, B), (C, D) = mats.reshape(-1, 2, 2, 2, 2).transpose(1, 3, 0, 2, 4)&nbsp; &nbsp; &nbsp; &nbsp; res = 0&nbsp; &nbsp; &nbsp; &nbsp; for AA, BB, CC, DD in ((A, B, C, D), (D, C, B, A)):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (a, b), (c, d) = DD.transpose(1, 2, 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rdet = 1 / (a*d - b*b)[:, None]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; iD = DD[..., ::-1, ::-1].copy()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; iD.reshape(-1, 4)[..., 1:3] *= -rdet&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; np.einsum('...ii->...i', iD)[...] *= rdet&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (Aa, Ab), (Ac, Ad) = AA.transpose(1, 2, 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (Ba, Bb), (Bc, Bd) = BB.transpose(1, 2, 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (Da, Db), (Dc, Dd) = iD.transpose(1, 2, 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; a = Aa - Ba*Ba*Da - 2*Bb*Ba*Db - Bb*Bb*Dd&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; d = Ad - Bd*Bd*Dd - 2*Bc*Bd*Db - Bc*Bc*Da&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b = Ab - Ba*Bc*Da - Ba*Bd*Db - Bb*Bd*Dd - Bb*Bc*Dc&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res += ((a + d) / (a*d - b*b)).sum()&nbsp; &nbsp; &nbsp; &nbsp; return res - 4 * self._q_count&nbsp; &nbsp; def populate_qs(self, ksep: float = 1.7):&nbsp; &nbsp; &nbsp; &nbsp; self._qs = np.array([(kx, ky, kz) for kx, ky, kz&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; np.arange(-3*np.pi, 3.01*np.pi, ksep),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; np.arange(-3*np.pi, 3.01*np.pi, ksep))])&nbsp; &nbsp; &nbsp; &nbsp; self._q_count = len(self._qs)def test(manual=False, ksep=0.3):&nbsp; &nbsp; vmat = sp.Matrix([[1, sp.cos(x/4+y/4), sp.cos(x/4+z/4), sp.cos(y/4+z/4)],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [sp.cos(x/4+y/4), 1, sp.cos(y/4-z/4), sp.cos(x/4 - z/4)],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [sp.cos(x/4+z/4), sp.cos(y/4-z/4), 1, sp.cos(x/4-y/4)],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [sp.cos(y/4+z/4), sp.cos(x/4-z/4), sp.cos(x/4-y/4), 1]]) * 2&nbsp; &nbsp; solver = Solver(vmat)&nbsp; &nbsp; solver.populate_qs(ksep=ksep)&nbsp; # <---- Performance starts to worsen (in eval_s) when ksep is reduced!&nbsp; &nbsp; if manual:&nbsp; &nbsp; &nbsp; &nbsp; print(solver.eval_s_pp(0.65))&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; print(solver.eval_s_vectorized_completely(0.65))
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python