Самая быстрая встроенная спин-блокировка
Я пишу многопоточное приложение в c ++, где производительность имеет решающее значение. Мне нужно использовать много блокировок при копировании небольших структур между streamами, для этого я выбрал использование штифтов.
Я провел некоторое исследование и тестирование скорости на этом, и я обнаружил, что большинство реализаций примерно одинаково быстрые:
- Microsoft CRITICAL_SECTION, с установленным значением SpinCount до 1000, насчитывает около 140 единиц времени
- Реализация этого алгоритма с помощью Microsoft InterlockedCompareExchange насчитывает около 95 единиц времени
- Ive также попытался использовать некоторую встроенную сборку с
__asm {}
используя что-то вроде этого кода, и она насчитывает около 70 единиц времени, но я не уверен, что был создан правильный барьер памяти.
Изменить: время, указанное здесь, – это время, затрачиваемое на 2 streamа для блокировки и разблокировки спин-блокировки в 1 000 000 раз.
- Что означают скобки в x86 asm?
- Почему для x86 (-64) для подписчиков и без знака умножаются разные инструкции?
- x86 Инструкция MUL от VS 2008/2010
- Режимы адресации на языке сборки (IA-32 NASM)
- Почему EDX должен быть 0, прежде чем использовать инструкцию DIV?
Я знаю, что это не так много, но поскольку спин-блокировка – это сильно используемый объект, можно подумать, что программисты согласились бы на самый быстрый способ сделать спин-блокировку. Однако поиск в Google приводит к множеству различных подходов. Я бы подумал, что вышеупомянутый метод будет самым быстрым, если он будет реализован с использованием встроенной сборки и с использованием команды CMPXCHG8B
вместо сравнения 32-битных регистров. Кроме того, необходимо учитывать барьеры памяти, это может быть сделано LOCK CMPXHG8B (я думаю?) , Что гарантирует «исключительные права» на общую память между ядрами. Наконец [некоторые подсказывают], что для оживленных ожиданий должен сопровождаться NOP: REP , который позволит процессорам Hyper-threading переключиться на другой stream, но я не уверен, верно это или нет?
Из моего теста производительности на разные шпиндельные блоки видно, что нет большой разницы, но для чисто академических целей я хотел бы знать, какой из них самый быстрый. Однако, поскольку у меня очень ограниченный опыт на языке ассемблера и с барьерами памяти, я был бы счастлив, если бы кто-нибудь смог написать код сборки для последнего примера, который я предоставил LOCK CMPXCHG8B и соответствующие барьеры памяти в следующем шаблоне:
__asm { spin_lock: ;locking code. spin_unlock: ;unlocking code. }
- Векторизация с неуравновешенными буферами: использование VMASKMOVPS: создание маски из подсчета несоосности? Или не использовать этот insn вообще
- Есть ли у блокировки xchg то же поведение, что и mfence?
- Как написать самомодифицирующийся код в сборке x86
- clflush для аннулирования строки кэша через функцию C
- Какова цель XORing регистрации с собой?
- Как отключить компьютер из автономной среды?
- Печать шестнадцатеричных цифр со сборкой
- Код C ++ для проверки гипотезы Collatz быстрее, чем assembly вручную - почему?
Просто посмотрите здесь: x86 spinlock с помощью cmpxchg
И благодаря Кори Нельсон
__asm{ spin_lock: xorl %ecx, %ecx incl %ecx spin_lock_retry: xorl %eax, %eax lock; cmpxchgl %ecx, (lock_addr) jnz spin_lock_retry ret spin_unlock: movl $0 (lock_addr) ret }
И еще один источник говорит: http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm
lock cmpxchg8b qword ptr [esi] is replaceable with the following sequence try: lock bts dword ptr [edi],0 jnb acquired wait: test dword ptr [edi],1 je try pause ; if available jmp wait acquired: cmp eax,[esi] jne fail cmp edx,[esi+4] je exchange fail: mov eax,[esi] mov edx,[esi+4] jmp done exchange: mov [esi],ebx mov [esi+4],ecx done: mov byte ptr [edi],0
И вот обсуждение блокировок и блокировок: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html
Хотя уже есть принятый ответ, есть несколько вещей, в которых пропущенные данные могут быть использованы для улучшения всех ответов, взятых из этой статьи Intel, во всех вышеперечисленных операциях с быстрой блокировкой :
- Вращение по нестабильной считываемой, а не атомной инструкции, позволяет избежать ненужного блокирования шины, особенно на высокоспорных замках.
- Используйте откат для сильно оспариваемых замков
- Встройте блокировку, желательно с встроенными для компиляторов, где inline asm вреден (в основном MSVC).
В Википедии есть хорошая статья о спинлоках, вот реализация x86
http://en.wikipedia.org/wiki/Spinlock#Example_implementation
Обратите внимание, что их реализация не использует префикс «блокировки», поскольку он избыточен на x86 для инструкции «xchg» – он неявно имеет семантику блокировки, как обсуждалось в этом обсуждении в Stackoverflow:
На многоядерном x86, LOCK необходимо как префикс для XCHG?
REP: NOP является псевдонимом для инструкции PAUSE, вы можете узнать больше об этом здесь.
Как инструкция x86 pause работает в spinlock * и *, может ли она использоваться в других сценариях?
По вопросу о барьерах памяти, вот все, что вы, возможно, захотите узнать
Memory Barriers: аппаратное обеспечение для программных хакеров Paul E. McKenney
http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf
Я обычно не из тех, кто жалуется на кого-то, стремящегося добиться быстрого кода: это обычно очень хорошее упражнение, которое приводит к лучшему пониманию программирования и более быстрого кода.
Я тоже не буду спорить, но я могу однозначно заявить, что вопрос о быстрой спин-блокировке 3 инструкции длинный или еще несколько – по крайней мере, в архитектуре x86 – бесполезная погоня.
Вот почему:
Вызов спин-блокировки с типичной кодовой последовательностью
lock_variable DW 0 ; 0 <=> free mov ebx,offset lock_variable mov eax,1 xchg eax,[ebx] ; if eax contains 0 (no one owned it) you own the lock, ; if eax contains 1 (someone already does) you don't
Освобождение спин-блокировки тривиально
mov ebx,offset lock_variable mov dword ptr [ebx],0
Инструкция xchg поднимает контакт блокировки на процессоре, что означает, что я хочу, чтобы шина находилась в течение следующих нескольких тактов. Этот сигнал прокладывает свой путь через кеши и вплоть до самого медленного устройства для шинирования, которое обычно является шиной PCI. Когда все устройства управления шиной завершены, сигнал блокировки (блокировки) посылается обратно. Затем происходит фактический обмен. Проблема в том, что последовательность lock / locka занимает ОЧЕНЬ долгое время. Шина PCI может работать на частоте 33 МГц с несколькими циклами задержки. На CPU с тактовой частотой 3,3 ГГц, что означает, что каждый цикл шины PCI занимает около ста циклов ЦП.
Как правило, я предполагаю, что для блокировки потребуется от 300 до 3000 циклов ЦП, и, в конце концов, я не знаю, смогу ли я иметь замок. Таким образом, несколько циклов, которые вы можете сохранить с помощью «быстрой» спин-блокировки, станут миражем, потому что никакой блокировки не будет похож на следующий, это будет зависеть от вашей ситуации в автобусе за это короткое время.
________________РЕДАКТИРОВАТЬ________________
Я просто прочитал, что спин-блокировка – это «сильно используемый объект». Ну, вы, очевидно, не понимаете, что спин-блокировка потребляет огромное количество циклов процессора за время, когда оно вызывается. Или, говоря другим способом, каждый раз, когда вы вызываете его, вы теряете значительную часть своих возможностей обработки.
Трюк при использовании спин-локов (или их более крупного брата, критический раздел) заключается в том, чтобы использовать их как можно более экономно, сохраняя при этом целевую программную функцию. Использование их повсюду легко, и в итоге вы получите тусклую производительность.
Дело не только в написании быстрого кода, но и в организации ваших данных. Когда вы пишете «копирование небольших структур между streamами», вы должны понимать, что блокировка может занять сотни раз дольше, чем фактическое копирование.
________________РЕДАКТИРОВАТЬ________________
Когда вы вычисляете среднее время блокировки, оно, вероятно, будет говорить очень мало, поскольку оно измеряется на вашем компьютере, что может не быть целью (которая может иметь совершенно разные характеристики использования шины). Для вашей машины среднее значение будет составлено из отдельных очень быстрых времен (когда деятельность по обучению шинам не мешала) вплоть до очень медленных времен (когда вмешательство мастеринга шины было значительным).
Вы можете ввести код, который определяет самые быстрые и медленные случаи, и рассчитать коэффициент, чтобы увидеть, насколько сильно время спинблока может меняться.
________________РЕДАКТИРОВАТЬ________________
Май 2016 года.
Питер Кордес выдвинул идею о том, что «настройка блокировки в неподтвержденном случае может иметь смысл», и время блокировки многих сотен тактовых циклов не происходит на современных процессорах, за исключением ситуаций, когда переменная блокировки смещена. Я начал задаваться вопросом, может ли моя предыдущая тестовая программа, написанная в 32-битном Watcom C, мешать WOW64, поскольку она работала на 64-битной ОС: Windows 7.
Поэтому я написал 64-битную программу и скомпилировал ее с gcc 5.3 TDM. Программа использует неявный вариант команды блокировки шины «XCHG r, m» для блокировки и простое назначение «MOV m, r» для разблокировки. В некоторых вариантах блокировки переменная блокировки была предварительно протестирована, чтобы определить, возможно ли даже пытаться заблокировать (используя простой сэмпл «CMP r, m», возможно, не задумываясь за пределами L3). Вот:
// compiler flags used: // -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0 #define CLASSIC_BUS_LOCK #define WHILE_PRETEST //#define SINGLE_THREAD typedef unsigned char u1; typedef unsigned short u2; typedef unsigned long u4; typedef unsigned int ud; typedef unsigned long long u8; typedef signed char i1; typedef short i2; typedef long i4; typedef int id; typedef long long i8; typedef float f4; typedef double f8; #define usizeof(a) ((ud)sizeof(a)) #define LOOPS 25000000 #include #include #ifndef bool typedef signed char bool; #endif u8 CPU_rdtsc (void) { ud tickl, tickh; __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh)); return ((u8)tickh << 32)|tickl; } volatile u8 bus_lock (volatile u8 * block, u8 value) { __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory"); return value; } void bus_unlock (volatile u8 * block, u8 value) { __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory"); } void rfence (void) { __asm__ __volatile__( "lfence" : : : "memory"); } void rwfence (void) { __asm__ __volatile__( "mfence" : : : "memory"); } void wfence (void) { __asm__ __volatile__( "sfence" : : : "memory"); } volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer) { return (bool)(*lockVariablePointer == 0ull); } volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer) { return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull); } void LOCK_spinlockLeave (volatile u8 *lockVariablePointer) { *lockVariablePointer = 0ull; } static volatile u8 lockVariable = 0ull, lockCounter = 0ull; static volatile i8 threadHold = 1; static u8 tstr[4][32]; /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */ struct LOCKING_THREAD_STRUCTURE { u8 numberOfFailures, numberOfPreTests; f8 clocksPerLock, failuresPerLock, preTestsPerLock; u8 threadId; HANDLE threadHandle; ud idx; } *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]}; DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp) { ud n = LOOPS; u8 clockCycles; SetThreadAffinityMask (ltsp->threadHandle, 1ull<idx); while (threadHold) {} clockCycles = CPU_rdtsc (); while (n) { Sleep (0); #ifdef CLASSIC_BUS_LOCK while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;} #else #ifdef WHILE_PRETEST while (1) { do { ++ltsp->numberOfPreTests; } while (!LOCK_spinlockPreTestIfFree (&lockVariable)); if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } #else while (1) { ++ltsp->numberOfPreTests; if (LOCK_spinlockPreTestIfFree (&lockVariable)) { if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } } #endif #endif ++lockCounter; LOCK_spinlockLeave (&lockVariable); #ifdef CLASSIC_BUS_LOCK while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;} #else #ifdef WHILE_PRETEST while (1) { do { ++ltsp->numberOfPreTests; } while (!LOCK_spinlockPreTestIfFree (&lockVariable)); if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } #else while (1) { ++ltsp->numberOfPreTests; if (LOCK_spinlockPreTestIfFree (&lockVariable)) { if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } } #endif #endif --lockCounter; LOCK_spinlockLeave (&lockVariable); n-=2; } clockCycles = CPU_rdtsc ()-clockCycles; ltsp->clocksPerLock = (f8)clockCycles/ (f8)LOOPS; ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS; ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS; //rwfence (); ltsp->idx = 4u; ExitThread (0); return 0; } int main (int argc, char *argv[]) { u8 processAffinityMask, systemAffinityMask; memset (tstr, 0u, usizeof(tstr)); lts[0]->idx = 3; lts[1]->idx = 2; lts[2]->idx = 1; lts[3]->idx = 0; GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask); SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS); SetThreadAffinityMask (GetCurrentThread (), 1ull); lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)<s[0]->threadId); #ifndef SINGLE_THREAD lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)<s[1]->threadId); lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)<s[2]->threadId); lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)<s[3]->threadId); #endif SetThreadAffinityMask (GetCurrentThread (), processAffinityMask); threadHold = 0; #ifdef SINGLE_THREAD while (lts[0]->idx<4u) {Sleep (1);} #else while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);} #endif printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock); printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock); printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock); printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock); printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+ lts[1]->clocksPerLock+ lts[2]->clocksPerLock+ lts[3]->clocksPerLock)/ 4., (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4., (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.); printf ("LC:%u\n", (ud)lockCounter); return 0; }
Программа запускалась на компьютере под управлением DELL i5-4310U с DDR3-800, 2 ядрами / 2 HT с 2,7 ГГц и общим кэшем L3.
Начнем с того, что влияние WOW64 было незначительным.
Один stream, выполняющий незащищенные блокировки / разблокировки, смог сделать это каждые 110 циклов. Настройка неуправляемой блокировки бесполезна: любой код, добавленный для улучшения одной инструкции XCHG, будет только замедлять работу.
С четырьмя HT, бомбардируя переменную блокировки с попытками блокировки, ситуация радикально меняется. Время, необходимое для достижения успешной блокировки, переходит в 994 цикла, из которых значительная часть может быть отнесена к неудачным попыткам блокировки 2.2. Иными словами, в ситуации с высоким уровнем конкуренции в среднем 3.2 блокировки должны быть предприняты для достижения успешного блокирования. Очевидно, что 110 циклов не стали 110 * 3,2, но ближе к 110 * 9. Таким образом, здесь действуют другие механизмы, как и тесты на старой машине. Кроме того, в среднем 994 цикла охватывают диапазон от 716 до 1157
Варианты блокировки, выполняющие предварительное тестирование, требовали около 95% циклов, которые были запрошены с помощью простейшего варианта (XCHG). В среднем они выполнили бы 17 CMP, чтобы найти возможность выполнить 1.75 блокировок, из которых 1 был успешным. Я рекомендую использовать предварительный тест не только потому, что он быстрее: он накладывает меньше нагрузки на механизм блокировки шины (3.2-1.75 = 1.45 меньше попыток блокировки), хотя это немного увеличивает сложность.
Просто спрашиваю:
Прежде чем вы будете копать эту глубину в прядильную и почти незаметную структуру данных:
У вас – в ваших тестах и вашем приложении – убедитесь, что конкурирующие streamи гарантированно работают на разных ядрах?
Если нет, вы можете закончить программу, которая отлично работает на вашей машине разработки, но всасывает / терпит неудачу в поле, потому что один stream должен быть как шкафчиком, так и разблокирующим вашей спин-блокировкой.
Чтобы дать вам рисунок: в Windows у вас есть стандартный срез времени 10 миллисекунд. Если вы не убедитесь, что задействованы два физических streamа в блокировке / разблокировке, вы получите около 500 блокировок / разблокировок в секунду, и этот результат будет очень мех