猿问

有没有办法矢量化这种动态时间扭曲算法?

动态时间扭曲算法提供了两个速度可能变化的时间序列之间的距离概念。如果我有 N 个序列要相互比较,我可以通过成对应用该算法来构造一个具有核对角线的 NXN 对称矩阵。然而,对于长二维序列来说,这非常慢。因此,我尝试对代码进行矢量化以加速该矩阵计算。重要的是,我还想提取定义最佳对齐的索引。

到目前为止我的成对比较代码:

import math

import numpy as np


seq1 = np.random.randint(100, size=(100, 2)) #Two dim sequences

seq2 = np.random.randint(100, size=(100, 2))


def seqdist(seq1, seq2):                      # dynamic time warping function


    ns = len(seq1)

    nt = len(seq2)


    D = np.zeros((ns+1, nt+1))+math.inf

    D[0, 0] = 0

    cost = np.zeros((ns,nt))


    for i in range(ns):

       for j in range(nt): 


          cost[i,j] = np.linalg.norm(seq1[i,:]-seq2[j,:])

          D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])


    d = D[ns,nt]                            # distance


    matchidx = [[ns-1, nt-1]]              # backwards optimal alignment computation 

    i = ns

    j = nt

    for k in range(ns+nt+2):

        idx = np.argmin([D[i-1, j], D[i, j-1], D[i-1, j-1]])

        if idx == 0 and i > 1 and j > 0:

           matchidx.append([i-2, j-1])

           i -= 1

        elif idx == 1 and i > 0 and j > 1:

             matchidx.append([i-1, j-2])

             j -= 1

        elif idx == 2 and i > 1 and j > 1:

             matchidx.append([i-2, j-2])

             i -= 1

             j -= 1

        else:

             break


    matchidx.reverse()


    return d, matchidx


[d,matchidx] = seqdist(seq1,seq2) #try it


斯蒂芬大帝
浏览 113回答 1
1回答

阿晨1998

这是对代码的一种重写,使其更适合numba.jit. 这并不完全是矢量化解决方案,但我发现该基准测试速度提高了 230 倍。from numba import jitfrom scipy import spatial@jitdef D_from_cost(cost, D):&nbsp; # operates on D inplace&nbsp; ns, nt = cost.shape&nbsp; for i in range(ns):&nbsp; &nbsp; for j in range(nt):&nbsp; &nbsp; &nbsp; D[i+1, j+1] = cost[i,j]+min(D[i, j+1], D[i+1, j], D[i, j])&nbsp; &nbsp; &nbsp; # avoiding the list creation inside mean enables better jit performance&nbsp; &nbsp; &nbsp; # D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])@jitdef get_d(D, matchidx):&nbsp; ns = D.shape[0] - 1&nbsp; nt = D.shape[1] - 1&nbsp; d = D[ns,nt]&nbsp; matchidx[0,0] = ns - 1&nbsp; matchidx[0,1] = nt - 1&nbsp; i = ns&nbsp; j = nt&nbsp; for k in range(1, ns+nt+3):&nbsp; &nbsp; idx = 0&nbsp; &nbsp; if not (D[i-1,j] <= D[i,j-1] and D[i-1,j] <= D[i-1,j-1]):&nbsp; &nbsp; &nbsp; if D[i,j-1] <= D[i-1,j-1]:&nbsp; &nbsp; &nbsp; &nbsp; idx = 1&nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; idx = 2&nbsp; &nbsp; if idx == 0 and i > 1 and j > 0:&nbsp; &nbsp; &nbsp; # matchidx.append([i-2, j-1])&nbsp; &nbsp; &nbsp; matchidx[k,0] = i - 2&nbsp; &nbsp; &nbsp; matchidx[k,1] = j - 1&nbsp; &nbsp; &nbsp; i -= 1&nbsp; &nbsp; elif idx == 1 and i > 0 and j > 1:&nbsp; &nbsp; &nbsp; # matchidx.append([i-1, j-2])&nbsp; &nbsp; &nbsp; matchidx[k,0] = i-1&nbsp; &nbsp; &nbsp; matchidx[k,1] = j-2&nbsp; &nbsp; &nbsp; j -= 1&nbsp; &nbsp; elif idx == 2 and i > 1 and j > 1:&nbsp; &nbsp; &nbsp; # matchidx.append([i-2, j-2])&nbsp; &nbsp; &nbsp; matchidx[k,0] = i-2&nbsp; &nbsp; &nbsp; matchidx[k,1] = j-2&nbsp; &nbsp; &nbsp; i -= 1&nbsp; &nbsp; &nbsp; j -= 1&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; break&nbsp; return d, matchidx[:k]def seqdist2(seq1, seq2):&nbsp; ns = len(seq1)&nbsp; nt = len(seq2)&nbsp; cost = spatial.distance_matrix(seq1, seq2)&nbsp; # initialize and update D&nbsp; D = np.full((ns+1, nt+1), np.inf)&nbsp; D[0, 0] = 0&nbsp; D_from_cost(cost, D)&nbsp; matchidx = np.zeros((ns+nt+2,2), dtype=np.int)&nbsp; d, matchidx = get_d(D, matchidx)&nbsp; return d, matchidx[::-1].tolist()assert seqdist2(seq1, seq2) == seqdist(seq1, seq2)%timeit seqdist2(seq1, seq2) # 1000 loops, best of 3: 365 µs per loop%timeit seqdist(seq1, seq2)&nbsp; # 10 loops, best of 3: 86.1 ms per loop以下是一些变化:cost是使用 计算的spatial.distance_matrix。的定义idx被一堆丑陋的 if 语句取代,这使得编译的代码更快。min([D[i, j+1], D[i+1, j], D[i, j]])替换为min(D[i, j+1], D[i+1, j], D[i, j]),即我们不取列表的最小值,而是取三个值的最小值。这导致了令人惊讶的加速jit。matchidx被预先分配为 numpy 数组,并在输出之前截断为正确的大小。
随时随地看视频慕课网APP

相关分类

Python
我要回答