Как отдать приоритет привилегированной теме при блокировке мьютекса?

Прежде всего: я полностью новичок в mutex / многопоточном программировании, поэтому извините за любую ошибку заранее …

У меня есть программа, которая запускает несколько streamов. Потоки (как правило, один на процессор ядра) выполняют множество вычислений и «размышлений», а затем иногда решают вызвать конкретный (общий) метод, который обновляет некоторые статистические данные. Параллелизм при обновлении статистики управляется с помощью мьютекса:

stats_mutex.lock(); common_area->update_thread_stats( ... ); stats_mutex.unlock(); 

Теперь к проблеме. Из всех этих streamов есть один конкретный stream, который требует почти
приоритет в реальном времени, потому что это единственный stream, который фактически работает.

С «почти приоритетом в реальном времени» я имею в виду:

Предположим, что stream t0 является «привилегированным» и t1 …. t15 являются нормальными. Что происходит сейчас:

  • Поток t1 получает блокировку.
  • Потоки t2, t3, t0 вызовите метод lock () и дождитесь его успеха.
  • Тема t1 вызывает разблокировку ()
  • Один (случайно, насколько мне известно) из streamов t2, t3, t0 удается получить блокировку, а остальные продолжают ждать.

Что мне нужно:

  • Поток t1 получает блокировку.
  • Потоки t2, t3, t0 вызовите метод lock () и дождитесь его успеха.
  • Тема t1 вызывает разблокировку ()
  • Тема t0 приобретает блокировку, поскольку она является привилегированной

Итак, какой лучший (возможно, самый простой) способ сделать это?

То, что я думал, это иметь переменную bool под названием «privileged_needs_lock».

Но я думаю, мне нужен другой мьютекс, чтобы управлять доступом к этой переменной … Я не знаю, правильно ли это …

Дополнительная информация:

  • мои streamи используют C ++ 11 (с gcc 4.6.3)
  • код должен работать как на Linux, так и на Windows (но тестируется только на Linux в данный момент).
  • производительность механизма блокировки не является проблемой (моя проблема с производительностью заключается в вычислении внутренних streamов, а число streamов всегда будет низким, один или два на процессорный kernel ​​максимум)

Любая идея ценится. благодаря


Нижеследующее решение работает (три способа mutex):

 #include  #include  #include "unistd.h" std::mutex M; std::mutex N; std::mutex L; void lowpriolock(){ L.lock(); N.lock(); M.lock(); N.unlock(); } void lowpriounlock(){ M.unlock(); L.unlock(); } void highpriolock(){ N.lock(); M.lock(); N.unlock(); } void highpriounlock(){ M.unlock(); } void hpt(const char* s){ using namespace std; //cout << "hpt trying to get lock here" << endl; highpriolock(); cout << s << endl; sleep(2); highpriounlock(); } void lpt(const char* s){ using namespace std; //cout << "lpt trying to get lock here" << endl; lowpriolock(); cout << s << endl; sleep(2); lowpriounlock(); } int main(){ std::thread t0(lpt,"low prio t0 working here"); std::thread t1(lpt,"low prio t1 working here"); std::thread t2(hpt,"high prio t2 working here"); std::thread t3(lpt,"low prio t3 working here"); std::thread t4(lpt,"low prio t4 working here"); std::thread t5(lpt,"low prio t5 working here"); std::thread t6(lpt,"low prio t6 working here"); std::thread t7(lpt,"low prio t7 working here"); //std::cout << "All threads created" << std::endl; t0.join(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); t7.join(); return 0; } 

Попробовал решение ниже, как было предложено, но оно не работает (скомпилируйте с помощью «g ++ -std = c ++ 0x -o test test.cpp -lpthread»):

 #include  #include  #include "time.h" #include "pthread.h" std::mutex l; void waiter(){ l.lock(); printf("Here i am, waiter starts\n"); sleep(2); printf("Here i am, waiter ends\n"); l.unlock(); } void privileged(int id){ usleep(200000); l.lock(); usleep(200000); printf("Here i am, privileged (%d)\n",id); l.unlock(); } void normal(int id){ usleep(200000); l.lock(); usleep(200000); printf("Here i am, normal (%d)\n",id); l.unlock(); } int main(){ std::thread tw(waiter); std::thread t1(normal,1); std::thread t0(privileged,0); std::thread t2(normal,2); sched_param sch; int policy; pthread_getschedparam(t0.native_handle(), &policy, &sch); sch.sched_priority = -19; pthread_setschedparam(t0.native_handle(), SCHED_FIFO, &sch); pthread_getschedparam(t1.native_handle(), &policy, &sch); sch.sched_priority = 18; pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch); pthread_getschedparam(t2.native_handle(), &policy, &sch); sch.sched_priority = 18; pthread_setschedparam(t2.native_handle(), SCHED_FIFO, &sch); tw.join(); t1.join(); t0.join(); t2.join(); return 0; } 

Я могу думать о трех методах, используя только примитивы для нарезания резьбы:

Тройной мьютекс

Здесь будут работать три мьютекса:

  • mutex данных (‘M’)
  • mutex («N») и
  • Мьютекс доступа с низким приоритетом (‘L’)

Шаблоны доступа:

  • Низкоприоритетные streamи: блокировка L, блокировка N, блокировка M, разблокировка N, {do stuff}, разблокировка M, разблокировка L
  • Высокоприоритетный stream: блокировка N, блокировка M, разблокировка N, {do stuff}, разблокировка M

Таким образом, доступ к данным защищен, а stream с высоким приоритетом может опережать низкоприоритетные streamи при доступе к нему.

Mutex, переменная состояния, атомный флаг

Первоначальный способ сделать это – с переменной состояния и атомарной:

  • Mutex M;
  • Condvar C;
  • атомный bool hpt_waiting;

Шаблоны доступа к данным:

  • Низкоприоритетный stream: блокировка M, while (hpt_waiting) wait C on M, {do stuff}, трансляция C, разблокировка M
  • Высокоприоритетный stream: hpt_waiting: = true, lock M, hpt_waiting: = false, {do stuff}, broadcast C, unlock M

Mutex, переменная условия, два неатомных флага

В качестве альтернативы вы можете использовать два неатомных bools с condvar; в этом методе mutex / condvar защищает флаги, а данные защищаются не мьютексом, а флагом:

  • Mutex M;
  • Condvar C;
  • bool data_held, hpt_waiting;

  • Низкоприоритетный stream: блокировка M, while (hpt_waiting или data_held) wait C on M, data_held: = true, unlock M, {do stuff}, lock M, data_held: = false, broadcast C, unlock M

  • Высокоприоритетный stream: lock M, hpt_waiting: = true, while (data_held) wait C on M, data_held: = true, {do stuff}, lock M, data_held: = false, hpt_waiting: = false, broadcast C, unlock M

Поместите запрос streamов в очередь приоритета. Привилегированный stream может получить первый доступ к данным, когда он свободен.

Один из способов сделать это будет с помощью массива ConcurrentQueues [privilegeLevel], блокировки и некоторых событий.

Любой stream, который хочет, чтобы данные вошли в блокировку. Если данные свободны, (логически), он получает объект данных и выходит из блокировки. Если данные используются другим streamом, запрашивающий stream подталкивает событие к одной из параллельных очередей, в зависимости от его уровня привилегий, выходит из блокировки и ждет события.

Когда stream хочет освободить свою собственность на объект данных, он получает блокировку и выполняет итерацию массива ConcurrentQueues с наивысшей привилегии в конце вниз, ища событие (например, количество очередей> 0). Если он находит один, он сигнализирует об этом и выходит из замка, если нет, он устанавливает логическое значение «dataFree» и выходит из блокировки.

Когда stream, ожидающий события для доступа к данным, готов к работе, он может получить доступ к объекту данных.

I thnk, который должен работать. Пожалуйста, другие разработчики, проверьте этот дизайн и посмотрите, можете ли вы думать о любых гонках и т. Д.? Я по-прежнему несколько переживаю от «перегрузки гостеприимства» после поездки в ЧР ..

Редактировать – возможно, даже не нужны параллельные очереди из-за явного блокировки всех них. Любая старая очередь будет делать.

 #include  #include  #include  #include  class priority_mutex { std::condition_variable cv_; std::mutex gate_; bool locked_; std::thread::id pr_tid_; // priority thread public: priority_mutex() : locked_(false) {} ~priority_mutex() { assert(!locked_); } priority_mutex(priority_mutex&) = delete; priority_mutex operator=(priority_mutex&) = delete; void lock(bool privileged = false) { const std::thread::id tid = std::this_thread::get_id(); std::unique_lock lk(gate_); if (privileged) pr_tid_ = tid; cv_.wait(lk, [&]{ return !locked_ && (pr_tid_ == std::thread::id() || pr_tid_ == tid); }); locked_ = true; } void unlock() { std::lock_guard lk(gate_); if (pr_tid_ == std::this_thread::get_id()) pr_tid_ = std::thread::id(); locked_ = false; cv_.notify_all(); } }; 

УВЕДОМЛЕНИЕ. Этот priority_mutex обеспечивает несправедливое планирование streamов. Если привилегированный stream часто получает блокировку, другие непривилегированные streamи могут быть почти не запланированы.

Пример использования:

 #include  priority_mutex mtx; void privileged_thread() { //... { mtx.lock(true); // acquire 'priority lock' std::unique_lock lk(mtx, std::adopt_lock); // update shared state, etc. } //... } void normal_thread() { //... { std::unique_lock lk(mtx); // acquire 'normal lock' // do something } //... } 

В linux вы можете проверить этого человека: pthread_setschedparam, а также man sched_setscheduler

pthread_setschedparam (stream pthread_t, int policy, const struct sched_param * param);

Проверьте это также на c ++ 2011: http://msdn.microsoft.com/en-us/library/system.threading.thread.priority.aspx#Y78

Попробуйте что-то вроде следующего. Вы можете сделать class streamобезопасным синглетоном, и вы даже можете сделать его функтором.

 #include  #include  #include  class ThreadPrioFun { typedef std::multimap priomap_t; public: ThreadPrioFun() { pthread_mutex_init(&mtx, NULL); } ~ThreadPrioFun() { pthread_mutex_destroy(&mtx); } void fun(int prio, sem_t* pSem) { pthread_mutex_lock(&mtx); bool bWait = !(pm.empty()); priomap_t::iterator it = pm.insert(std::pair(prio, pSem) ); pthread_mutex_unlock(&mtx); if( bWait ) sem_wait(pSem); // do the actual job // .... // pthread_mutex_lock(&mtx); // done, remove yourself pm.erase(it); if( ! pm.empty() ) { // let next guy run: sem_post((pm.begin()->second)); } pthread_mutex_unlock(&mtx); } private: pthread_mutex_t mtx; priomap_t pm; }; 

У pthreads есть приоритеты streamов:

 pthread_setschedprio( (pthread_t*)(&mThreadId), wpri ); 

Если несколько streamов спадают в ожидании блокировки, планировщик сначала пробудит stream с наивысшим приоритетом.

Поскольку приоритеты streamов не работают для вас:

Создайте 2 мьютекса, обычный замок и блокировку приоритета.

Обычные streamи должны сначала заблокировать нормальный замок, а затем блокировку приоритета. Приоритетный stream должен блокировать блокировку приоритета:

 Mutex mLock; Mutex mPriLock; doNormal() { mLock.lock(); pthread_yield(); doPriority(); mLock.unlock(); } doPriority() { mPriLock.lock(); doStuff(); mPriLock.unlock(); } 

Изменен слегка ответ ecatmur , добавив четвертый мьютекс для одновременного обращения с несколькими высокоприоритетными streamами (обратите внимание, что это не было обязательным в моем первоначальном вопросе):

 #include  #include  #include "unistd.h" std::mutex M; //data access mutex std::mutex N; // 'next to access' mutex std::mutex L; //low priority access mutex std::mutex H; //hptwaiting int access mutex int hptwaiting=0; void lowpriolock(){ L.lock(); while(hptwaiting>0){ N.lock(); N.unlock(); } N.lock(); M.lock(); N.unlock(); } void lowpriounlock(){ M.unlock(); L.unlock(); } void highpriolock(){ H.lock(); hptwaiting++; H.unlock(); N.lock(); M.lock(); N.unlock(); } void highpriounlock(){ M.unlock(); H.lock(); hptwaiting--; H.unlock(); } void hpt(const char* s){ using namespace std; //cout << "hpt trying to get lock here" << endl; highpriolock(); cout << s << endl; usleep(30000); highpriounlock(); } void lpt(const char* s){ using namespace std; //cout << "lpt trying to get lock here" << endl; lowpriolock(); cout << s << endl; usleep(30000); lowpriounlock(); } int main(){ std::thread t0(lpt,"low prio t0 working here"); std::thread t1(lpt,"low prio t1 working here"); std::thread t2(hpt,"high prio t2 working here"); std::thread t3(lpt,"low prio t3 working here"); std::thread t4(lpt,"low prio t4 working here"); std::thread t5(lpt,"low prio t5 working here"); std::thread t6(hpt,"high prio t6 working here"); std::thread t7(lpt,"low prio t7 working here"); std::thread t8(hpt,"high prio t8 working here"); std::thread t9(lpt,"low prio t9 working here"); std::thread t10(lpt,"low prio t10 working here"); std::thread t11(lpt,"low prio t11 working here"); std::thread t12(hpt,"high prio t12 working here"); std::thread t13(lpt,"low prio t13 working here"); //std::cout << "All threads created" << std::endl; t0.join(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); t7.join(); t8.join(); t9.join(); t10.join(); t11.join(); t12.join(); t13.join(); return 0; } в #include  #include  #include "unistd.h" std::mutex M; //data access mutex std::mutex N; // 'next to access' mutex std::mutex L; //low priority access mutex std::mutex H; //hptwaiting int access mutex int hptwaiting=0; void lowpriolock(){ L.lock(); while(hptwaiting>0){ N.lock(); N.unlock(); } N.lock(); M.lock(); N.unlock(); } void lowpriounlock(){ M.unlock(); L.unlock(); } void highpriolock(){ H.lock(); hptwaiting++; H.unlock(); N.lock(); M.lock(); N.unlock(); } void highpriounlock(){ M.unlock(); H.lock(); hptwaiting--; H.unlock(); } void hpt(const char* s){ using namespace std; //cout << "hpt trying to get lock here" << endl; highpriolock(); cout << s << endl; usleep(30000); highpriounlock(); } void lpt(const char* s){ using namespace std; //cout << "lpt trying to get lock here" << endl; lowpriolock(); cout << s << endl; usleep(30000); lowpriounlock(); } int main(){ std::thread t0(lpt,"low prio t0 working here"); std::thread t1(lpt,"low prio t1 working here"); std::thread t2(hpt,"high prio t2 working here"); std::thread t3(lpt,"low prio t3 working here"); std::thread t4(lpt,"low prio t4 working here"); std::thread t5(lpt,"low prio t5 working here"); std::thread t6(hpt,"high prio t6 working here"); std::thread t7(lpt,"low prio t7 working here"); std::thread t8(hpt,"high prio t8 working here"); std::thread t9(lpt,"low prio t9 working here"); std::thread t10(lpt,"low prio t10 working here"); std::thread t11(lpt,"low prio t11 working here"); std::thread t12(hpt,"high prio t12 working here"); std::thread t13(lpt,"low prio t13 working here"); //std::cout << "All threads created" << std::endl; t0.join(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); t7.join(); t8.join(); t9.join(); t10.join(); t11.join(); t12.join(); t13.join(); return 0; } 

Как вы думаете? Это нормально? Это правда, что семафор мог справиться с этим лучше, но мьютексы мне гораздо легче управлять.

  • Вызов pthread_cond_signal без блокировки мьютекса
  • Запуск одного экземпляра приложения с помощью Mutex
  • Каков правильный способ создания приложения с одним экземпляром?
  • Давайте будем гением компьютера.