Можно ли векторизовать рекурсивный расчет массива NumPy, где каждый элемент зависит от предыдущего?

T(i) = Tm(i) + (T(i-1)-Tm(i))**(-tau(i)) 

Tm и tau – векторы NumPy той же длины, которые были предварительно рассчитаны, и желание создать новый вектор T i включен только для указания индекса элемента для желаемого.

Нужен ли цикл for для этого случая?

Вы могли бы подумать, что это сработает:

 import numpy as np n = len(Tm) t = np.empty(n) t[0] = 0 # or whatever the initial condition is t[1:] = Tm[1:] + (t[0:n-1] - Tm[1:])**(-tau[1:]) 

но это не так: вы не можете на самом деле сделать рекурсию в numpy таким образом (поскольку numpy вычисляет всю RHS, а затем присваивает ее LHS).

Поэтому, если вы не можете придумать нерекурсивную версию этой формулы, вы застряли в явном цикле:

 tt = np.empty(n) tt[0] = 0. for i in range(1,n): tt[i] = Tm[i] + (tt[i-1] - Tm[i])**(-tau[i]) 

В некоторых случаях такая recursion возможна, а именно, если для формулы рекурсии имеется NumPy ufunc, например

 c = numpy.arange(10.) numpy.add(c[:-1], c[1:], c[1:]) 

Это вычисляет накопленные суммы по c на месте, используя выходной параметр numpy.add . Он не может быть записан как

 c[1:] = c[:-1] + c[1:] 

потому что теперь результат добавления является временным, который копируется в c[1:] после завершения вычисления.

Самое естественное, что нужно попробовать, это определить свой собственный ufunc:

 def f(T, Tm, tau): return Tm + (T - Tm)**(-tau) uf = numpy.frompyfunc(f, 3, 1) 

Но по причинам, которые выходят за frameworks меня, следующее не работает:

 uf(T[:-1], Tm[1:], tau[1:], T[1:]) 

По-видимому, результат не записывается непосредственно в T[1:] , но сохраняется во временном и скопированном после завершения операции. Даже если бы это сработало, я бы не ожидал ускорения от этого по сравнению с обычным циклом, так как ему нужно вызвать функцию Python для каждой записи.

Если вы действительно хотите избежать цикла Python, вам, вероятно, нужно обратиться за Cython или ctypes.

Я решил создать несколько тестов исходной проблемы, потому что мне также нужно было вычислить что-то рекурсивно, что невозможно было бы векторизовать. Я использовал abs () для базы экспонентов, потому что результат не определен, когда база отрицательна. Пример вычисления посредством нормального цикла массива Numpy:

 alen=100000 Tm = np.random.normal(size=alen).astype('float64') tau = np.random.normal(size=alen).astype('float64') def rec_numpy_loop(Tm,tau,alen): T=np.empty(alen) T[0]=0.0 for i in range(1,alen): T[i] = Tm[i] + (abs(T[i-1] - Tm[i]))**(-tau[i]) return T 

Результаты состоят в том, что компиляция функции в виде модуля Cython очень быстро выполняется как в исполнении (в 14 раз быстрее, чем любая реализация Python), так и при написании кода.

Другие интересные факты: Различные реализации Numpy немного отличаются от самой быстрой реализации, используя нотации a.item () и a.itemset (). Странно, но добавление списков работает наравне с добавлением предварительно распределенных массивов Numpy. Воспроизведение с memviews в Cython не дало значительного повышения производительности кода. Код Fortran был очень коротким, но почти невозможно отладить, и в итоге Cython появился быстрее Fortran с f2py, хотя и незначительно. C потребовалось много времени, потому что у него слишком много шаблонов, и в конце он работает с той же скоростью, что и Cython. Чистый C с чистыми C-массивами был в два раза быстрее, чем что-либо, вызванное Python.

Я не профессионал C / C ++, поэтому, возможно, можно написать более быструю программу.

 Looping over Numpy array: In [57]: %timeit -o rec_numpy_loop(Tm,tau,alen) 10 loops, best of 3: 87.9 ms per loop Using a.item() and a.itemset() with Numpy: In [58]: %timeit -o rec_numpy_loop_item(Tm,tau,alen) 10 loops, best of 3: 74.3 ms per loop Using np.fromiter() from Numpy: In [59]: %timeit -o rec_numpy_iter(Tm,tau,alen) 10 loops, best of 3: 78.1 ms per loop Using Numpy arrays with data but appending to a List: In [60]: %timeit -o rec_py_loop(Tm,tau,alen) 10 loops, best of 3: 91 ms per loop Calling a function compiled to Cython: In [61]: %timeit -o rec_cy_loop(Tm,tau,alen) 100 loops, best of 3: 6.46 ms per loop Using Memviews in Cython: In [62]: %timeit -o rec_cy_loop_memviews(Tm,tau,alen) 100 loops, best of 3: 6.26 ms per loop Using Memviews both for looping and as input variables in Cython: In [63]: %timeit -o rec_cy_loop_memviews_w_input(Tm,tau,alen) 100 loops, best of 3: 6.53 ms per loop Calling a fortran function compiled using f2py: In [64]: %timeit -o rec_fortran(Tm,tau,alen) 100 loops, best of 3: 6.78 ms per loop Calling a function compiled as C extension of Python using Numpy arrays: In [65]: %timeit -o rec_c(Tm,tau,alen) 100 loops, best of 3: 6.22 ms per loop Doing everything in C using dynamic C arrays: 1000 loops,best of 3: 2.751 ms per loop 

Код Python:

 import timeit import numpy as np from rec_cy_loop import rec_cy_loop #python setup_rec_cy_loop.py build_ext --inplace from rec_cy_loop_memviews import rec_cy_loop_memviews from rec_cy_loop_memviews_w_input import rec_cy_loop_memviews_w_input from rec_c import rec_c #python setup.py build_ext --inplace from rec_fortran import rec_fortran #f2py -c rec.f95 -m rec_fortran alen=100000 Tm = np.random.normal(size=alen).astype('float64') tau = np.random.normal(size=alen).astype('float64') def rec_numpy_loop(Tm,tau,alen): T=np.empty(alen) T[0]=0.0 for i in range(1,alen): T[i] = Tm[i] + (abs(T[i-1] - Tm[i]))**(-tau[i]) return T def rec_numpy_loop_item(Tm,tau,alen): T=np.empty(alen) Ti=T.item Tis=T.itemset Tmi=Tm.item taui=tau.item Tis(0,0.0) for i in range(1,alen): Tis(i,Tmi(i) + (abs(Ti(i-1) - Tmi(i)))**(-taui(i))) return T def it(Tm,tau): T=0.0 i=0 while True: yield T i+=1 T=Tm[i] + (abs(T - Tm[i]))**(-tau[i]) def rec_numpy_iter(Tm,tau,alen): return np.fromiter(it(Tm,tau), np.float64, alen) def rec_py_loop(Tm,tau,alen): T = [0.0] for i in range(1,alen): T.append(Tm[i] + (abs(T[i-1] - Tm[i]))**(-tau[i])) return np.array(T) T=rec_numpy_loop(Tm,tau,alen) Titer=rec_numpy_loop_item(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_numpy_iter(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_py_loop(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_cy_loop(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_cy_loop_memviews(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_cy_loop_memviews_w_input(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_fortran(Tm,tau,alen) np.sum(np.abs(Titer-T)) Titer=rec_c(Tm,tau,alen) np.sum(np.abs(Titer-T)) %timeit -o rec_numpy_loop(Tm,tau,alen) %timeit -o rec_numpy_loop_item(Tm,tau,alen) %timeit -o rec_numpy_iter(Tm,tau,alen) %timeit -o rec_py_loop(Tm,tau,alen) %timeit -o rec_cy_loop(Tm,tau,alen) %timeit -o rec_cy_loop_memviews(Tm,tau,alen) %timeit -o rec_cy_loop_memviews_w_input(Tm,tau,alen) %timeit -o rec_fortran(Tm,tau,alen) %timeit -o rec_c(Tm,tau,alen) 

Cython rec_cy_loop:

 cdef extern from "math.h": double exp(double m) import cython import numpy as np cimport numpy as np from numpy cimport ndarray @cython.boundscheck(False) @cython.wraparound(False) @cython.infer_types(True) def rec_cy_loop(ndarray[np.float64_t, ndim=1] Tm,ndarray[np.float64_t, ndim=1] tau,int alen): cdef ndarray[np.float64_t, ndim=1] T=np.empty(alen) cdef int i T[0]=0.0 for i in range(1,alen): T[i] = Tm[i] + (abs(T[i-1] - Tm[i]))**(-tau[i]) return T 

Cython rec_cy_loop_memviews:

 cdef extern from "math.h": double exp(double m) import cython import numpy as np cimport numpy as np from numpy cimport ndarray @cython.boundscheck(False) @cython.wraparound(False) @cython.infer_types(True) def rec_cy_loop_memviews(ndarray[np.float64_t, ndim=1] Tm,ndarray[np.float64_t, ndim=1] tau,int alen): cdef ndarray[np.float64_t, ndim=1] T=np.empty(alen) cdef int i cdef double[::1] T2 = T cdef double[::1] Tm2 = Tm cdef double[::1] tau2 = tau T2[0]=0.0 for i in range(1,alen): T2[i] = Tm2[i] + (abs(T2[i-1] - Tm2[i]))**(-tau2[i]) return T2 

Cython rec_cy_loop_memviews_w_input:

 cdef extern from "math.h": double exp(double m) import cython import numpy as np cimport numpy as np from numpy cimport ndarray @cython.boundscheck(False) @cython.wraparound(False) @cython.infer_types(True) def rec_cy_loop_memviews_w_input(double[::1] Tm,double[::1] tau,int alen): cdef double[::1] T=np.empty(alen) cdef int i T[0]=0.0 for i in range(1,alen): T[i] = Tm[i] + (abs(T[i-1] - Tm[i]))**(-tau[i]) return T 

Функция Fortran, скомпилированная через f2py:

 subroutine rec_fortran(tm,tau,alen,result) integer*8, intent(in) :: alen real*8, dimension(alen), intent(in) :: tm real*8, dimension(alen), intent(in) :: tau real*8, dimension(alen) :: res real*8, dimension(alen), intent(out) :: result res(1)=0 do i=2,alen res(i) = tm(i) + (abs(res(i-1) - tm(i)))**(-tau(i)) end do result=res end subroutine rec_fortran 

Чистый C:

 #include  #include  #include  #include  #include  double randn() { double u = rand(); if (u > 0.5) { return sqrt(-1.57079632679*log(1.0 - pow(2.0 * u - 1, 2))); } else { return -sqrt(-1.57079632679*log(1.0 - pow(1 - 2.0 * u,2))); } } void rec_pure_c(double *Tm, double *tau, int alen, double *T) { for (int i = 1; i < alen; i++) { T[i] = Tm[i] + pow(fabs(T[i - 1] - Tm[i]), (-tau[i])); } } int main() { int N = 100000; double *Tm= calloc(N, sizeof *Tm); double *tau = calloc(N, sizeof *tau); double *T = calloc(N, sizeof *T); double time = 0; double sumtime = 0; for (int i = 0; i < N; i++) { Tm[i] = randn(); tau[i] = randn(); } LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds; LARGE_INTEGER Frequency; for (int j = 0; j < 1000; j++) { for (int i = 0; i < 3; i++) { QueryPerformanceFrequency(&Frequency); QueryPerformanceCounter(&StartingTime); rec_pure_c(Tm, tau, N, T); QueryPerformanceCounter(&EndingTime); ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart; ElapsedMicroseconds.QuadPart *= 1000000; ElapsedMicroseconds.QuadPart /= Frequency.QuadPart; if (i == 0) time = (double)ElapsedMicroseconds.QuadPart / 1000; else { if (time > (double)ElapsedMicroseconds.QuadPart / 1000) time = (double)ElapsedMicroseconds.QuadPart / 1000; } } sumtime += time; } printf("1000 loops,best of 3: %.3f ms per loop\n",sumtime/1000); free(Tm); free(tau); free(T); } 

Я наткнулся на этот старый вопрос и публикую свои выводы и векторизованное решение вопроса!

Принятый ответ я бы выполнил следующим образом:

 import numpy as np np.random.seed(0) n = 100000 Tm = np.random.uniform(1, 10, size=n).astype('f') tau = np.random.uniform(-1, 0, size=n).astype('f') def calc_py(Tm_, tau_): tt = np.empty(len(Tm_)) tt[0] = Tm_[0] for i in range(1, len(Tm_)): tt[i] = (Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])) return tt[1:] 

И векторизованное решение:

 def calc_vect(Tm_, tau_): return Tm_[1:] - (Tm_[:-1] + Tm_[1:]) ** (-tau_[1:]) 

Для подтверждения:

 print(calc_py(Tm, tau)-calc_vect(Tm, tau)) 

Дает выход (предположительно плавающие ошибки ошибки?):

 [ -2.39640069e-06 0.00000000e+00 -1.22639676e-11 -3.82019749e-09 -3.43394937e-06 0.00000000e+00 -1.64249059e-10 -1.27897692e-13 -6.96935984e-07 -1.13686838e-13 -7.81597009e-14 -1.56319402e-13 0.00000000e+00 -1.06581410e-14 7.70565954e-07 -3.68179363e-07 -1.42108547e-14 -2.67318768e-06 0.00000000e+00 0.00000000e+00 -2.74236818e-06 4.36147587e-07 -2.05780665e-07 -5.14934904e-08] 

Однако можно использовать Numba для «компиляции» версии python с такой же производительностью, как и векторизация:

 from numba import jit, float32 @jit(float32[:](float32[:], float32[:]), nopython=False, nogil=True) def calc(Tm_, tau_): tt = np.empty(len(Tm_)) tt[0] = Tm_[0] for i in range(1,len(Tm_)): tt[i] = (Tm_[i] - (tt[i-1] + Tm_[i]) ** (-tau_[i])) return tt[1:] 

Timeit Результаты трех решений:

 Python: 118.34983052355095 Vectorized: 0.9753564222721991 Numba: 0.6740745080564352 

Чтобы основываться на ответе NPE, я согласен, что где-то должен быть цикл. Возможно, ваша цель – избежать накладных расходов, связанных с циклом Python for? В этом случае numpy.fromiter выбивает цикл for, но только немного:

Используя очень простое рекуррентное соотношение,

 x[i+1] = x[i] + 0.1 

я получил

 #FOR LOOP def loopit(n): x = [0.0] for i in range(n-1): x.append(x[-1] + 0.1) return np.array(x) #FROMITER #define an iterator (a better way probably exists -- I'm a novice) def it(): x = 0.0 while True: yield x x += 0.1 #use the iterator with np.fromiter def fi_it(n): return np.fromiter(it(), np.float, n) %timeit -n 100 loopit(100000) #100 loops, best of 3: 31.7 ms per loop %timeit -n 100 fi_it(100000) #100 loops, best of 3: 18.6 ms per loop 

Интересно, что предварительное выделение массива numpy приводит к существенной потере производительности. Это для меня загадка, хотя я бы предположил, что должно быть больше накладных расходов, связанных с доступом к элементу массива, чем с добавлением в список.

 def loopit(n): x = np.zeros(n) for i in range(n-1): x[i+1] = x[i] + 0.1 return x %timeit -n 100 loopit(100000) #100 loops, best of 3: 50.1 ms per loop 
Давайте будем гением компьютера.