TThreadedQueue не способен к нескольким потребителям?
Попытка использовать TThreadedQueue (Generics.Collections) в единой схеме множественного потребления. (Delphi XE). Идея состоит в том, чтобы вставлять объекты в очередь и пропускать несколько рабочих streamов в очередь.
Однако он работает не так, как ожидалось. Когда два или более рабочих streamа вызывают PopItem, нарушения доступа выводятся из TThreadedQueue.
Если вызов PopItem сериализуется с критическим разделом, все в порядке.
- Delphi XE2: Возможно ли создать экземпляр формы FireMonkey в приложении VCL?
- Учебник стилей Delphi VCL - как изменить стиль во время выполнения
- Как динамически создавать элементы управления, совпадающие с вершиной, но после других выровненных элементов управления?
- Delphi: Нарушение доступа в конце конструктора Create ()
- Как проверить URL с помощью IdHTTP?
Конечно, TThreadedQueue должен иметь возможность обрабатывать несколько потребителей, так что я что-то упускаю или это чистая ошибка в TThreadedQueue?
Вот простой пример для получения ошибки.
program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '..\..\..\FastMM4\FastMM4.pas', Windows, Messages, Classes, SysUtils, SyncObjs, Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception ->' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= ''; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName, ': ', E.Message); ReadLn; end; end; end.
Обновление : ошибка в TMonitor, которая вызвала сбой TThreadedQueue, исправлена в Delphi XE2.
Обновление 2 : вышеупомянутый тест подчеркнул очередь в пустом состоянии. Дарьян Миллер обнаружил, что подчеркивание очереди в полном состоянии, все же может воспроизвести ошибку в XE2. Ошибка снова находится в TMonitor. См. Его ответ ниже для получения дополнительной информации. А также ссылку на QC101114.
Обновление 3 : с обновлением Delphi-XE2 4 было объявлено исправление для TMonitor
, которое TThreadedQueue
бы проблемы в TThreadedQueue
. Мои тесты пока не в состоянии воспроизвести какие-либо ошибки в TThreadedQueue
. Протестированные streamи одного производителя / нескольких потребителей, когда очередь пуста и заполнена. Также проверено несколько производителей / несколько потребителей. Я менял нити считывателя и писательские streamи от 1 до 100 без каких-либо сбоев. Но, зная историю, я осмеливаюсь, чтобы другие TMonitor
.
- Как определить версию приложения в одном месте для нескольких приложений?
- Библиотека Delphi JSON для XE2, доступная для сериализации объектов
- Каковы причины использования TArray вместо массива T?
- Есть ли ошибка в управлении представлением списка Delphi при использовании пользовательского чертежа?
- Разбивается ли COM в XE2 и как я могу его обойти?
- Как разбирать вложенный объект JSON в Delphi XE2?
Ну, трудно быть уверенным без большого тестирования, но, похоже, это ошибка, либо в TThreadedQueue, либо в TMonitor. Так или иначе, это RTL, а не ваш код. Вы должны указать это как отчет о контроле качества и использовать свой пример выше как код «как воспроизвести».
Я рекомендую вам использовать OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary при работе с streamами, параллелизмом и т. Д. Primoz проделал очень хорошую работу, и на сайте вы найдете много полезной документации ,
Ваш пример, похоже, отлично работает под XE2, но если мы заполняем вашу очередь, он терпит неудачу с AV на PushItem. (Проверено под обновлением XE21)
Чтобы воспроизвести, просто увеличивайте создание своей задачи с 100 до 1100 (ваша глубина очереди была установлена на 1024)
for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
Это всегда умирает для меня каждый раз в Windows 7. Сначала я попытался настойчиво протестировать его, и он потерпел неудачу в цикле 30 … затем в цикле 16 … затем на 65, так что с разными интервалами, но он последовательно терпел неудачу при некоторых точка.
iLoop := 0; while iLoop < 1000 do begin Inc(iLoop); WriteLn('Loop: ' + IntToStr(iLoop)); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); end;
Я искал class TThreadedQueue, но, похоже, не имел его в своем D2009. Я не собираюсь убивать себя над этим – поддержка streamа Delphi всегда была ошибкой .. errm … «неоптимальным», и я подозреваю, что TThreadedQueue ничем не отличается 🙂
Зачем использовать дженерики для объектов ПК (Продюсер / Потребитель)? Простой потомок TObjectQueue будет преуспевать – использовать его на протяжении десятилетий – отлично работает с несколькими производителями / потребителями:
unit MinimalSemaphorePCqueue; { Absolutely minimal PC queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count 'push' will always succeed unless memory runs out, then you're stuft anyway. 'pop' has a timeout parameter as well as the address of where any received object is to be put. 'pop' returns immediately with 'true' if there is an object on the queue available for it. 'pop' blocks the caller if the queue is empty and the timeout is not 0. 'pop' returns false if the timeout is exceeded before an object is available from the queue. 'pop' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called 'pop' and are blocked because the queue is empty, a single 'push' will make only one of the waiting threads ready. Methods to push/pop from the queue A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. When the handle is signaled, the 'peek' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a 'pop' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end.
-unit MinimalSemaphorePCqueue; { Absolutely minimal PC queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count 'push' will always succeed unless memory runs out, then you're stuft anyway. 'pop' has a timeout parameter as well as the address of where any received object is to be put. 'pop' returns immediately with 'true' if there is an object on the queue available for it. 'pop' blocks the caller if the queue is empty and the timeout is not 0. 'pop' returns false if the timeout is exceeded before an object is available from the queue. 'pop' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called 'pop' and are blocked because the queue is empty, a single 'push' will make only one of the waiting threads ready. Methods to push/pop from the queue A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. When the handle is signaled, the 'peek' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a 'pop' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end.
-unit MinimalSemaphorePCqueue; { Absolutely minimal PC queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count 'push' will always succeed unless memory runs out, then you're stuft anyway. 'pop' has a timeout parameter as well as the address of where any received object is to be put. 'pop' returns immediately with 'true' if there is an object on the queue available for it. 'pop' blocks the caller if the queue is empty and the timeout is not 0. 'pop' returns false if the timeout is exceeded before an object is available from the queue. 'pop' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called 'pop' and are blocked because the queue is empty, a single 'push' will make only one of the waiting threads ready. Methods to push/pop from the queue A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. When the handle is signaled, the 'peek' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a 'pop' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end.
Я не думаю, что TThreadedQueue должен поддерживать нескольких потребителей. Это FIFO, в соответствии с файлом справки. У меня создается впечатление, что есть одна нить, а другая (только одна!) Появляется.