- Код: Выделить всё
- fCompletion:=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,Length(fThreads));
Задачи в этот порт пихаются:
- Код: Выделить всё
- PostQueuedCompletionStatus(fCompletion,0,0,POverlapped(Data));
А рабочий поток получает их так:
- Код: Выделить всё
- while(GetQueuedCompletionStatus(fCompletion,T,K,Ovr,INFINITE))do begin
 .. // выполнение реальной работы
 end;
Тем не менее, несмотря на то, что у меня такой вариант вполне себе работает (в многопоточной загрузке файлов из интернета), я не могу сказать, что на 100% точно уверен, как это работает. Потому, у меня используется другой вариант, который мне более понятен:
- Код: Выделить всё
- // Реализует пул потоков на базе массива
 // Пример функции потока (одной из пула):
 // function Foo(P:pointer):integer;
 // var
 // Data:PTaskInfo absolute P;
 // Obj:TObject;
 // begin
 // repeat
 // Obj:=Data.Queue.Pop;
 // if Obj=nil then break;
 // ... выполняем нашу работу
 // until false;
 // Result:=0;
 // end;
 unit ThreadQueue;
 interface
 Uses
 Windows;
 Type
 TThreadQueue=class;
 // это запись, которая передаётся каждому потоку в пуле
 PTaskInfo=^TTaskInfo;
 TTaskInfo=record
 ThreadInfo:pointer;
 ThreadNo:integer;
 Queue:TThreadQueue;
 end;
 // собственно, пул
 TThreadQueue=class
 private
 fThreads:array of cardinal;
 fTasks:array of TObject;
 fMaxTask:integer;
 fSemaphore,fKillEvent,fPauseEvent:cardinal;
 fRealObjs:boolean;
 fCritical:TRTLCriticalSection;
 public
 // создаёт пул. Параметры:
 // ThreadCount - количество потоков в пуле. Должно быть не меньше 2 (иначе нет смысла в пуле)
 // TaskCount - максимальное количество задач в буфере... можно ставить довольно большим, например, 1000
 // ThreadFunc - функция потока (на базе которой будут созданы все потоки пула)
 // aThreadInfo - дополнительная информация, передаваемая пулу
 // TasksAreRealObj - если установлена в true, деструктор вызовет .Free для всех задач, которые остались в буфере
 // НЕ устанавливайте его в true, если собиратесь кормить потоки фиктивными объектами, например, TObject(1)
 Constructor Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
 Destructor Destroy;override;
 // функция только для потока. Вызывайте её, чтобы получить новую задачу.
 // если задач нет - функция автоматически уходит в ожидание
 function Pop(Wait:cardinal=INFINITE):TObject;
 // функция только для вызывающего приложения. Вызывайте её, чтобы отдать пулу новую задачу.
 // НЕ пытайтесь положить в пул задачу nil.
 function Push(S:TObject):boolean;
 // останавливает выдачу задач в пул
 procedure SetPaused(Pause:boolean);
 end;
 implementation
 { TThreadQueue }
 Constructor TThreadQueue.Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
 var
 I:integer;
 P:PTaskInfo;
 ThId:cardinal;
 begin
 InitializeCriticalSection(fCritical);
 fRealObjs:=TasksAreRealObj;
 // сообщение выставляется при завершении работы
 fKillEvent:=CreateEvent(nil,true,false,nil);
 // максимальная длина очереди запросов. Сигнал, если есть сообщения.
 if ThreadCount>10 then ThreadCount:=10;
 if ThreadCount<2 then ThreadCount:=2;
 if TaskCount>1000 then fMaxTask:=1000
 else if TaskCount<ThreadCount then
 fMaxTask:=ThreadCount
 else
 fMaxTask:=TaskCount;
 // изначально, семафор обнулён, никто задания не получит
 fSemaphore:=CreateSemaphore(nil,0,fMaxTask,nil);
 // изначально пул НЕ стоит на паузе
 fPauseEvent:=CreateEvent(nil,true,true,nil);
 // генерируем потоки. Пока они могут инициализировать свои внутренние структуры,
 // а потом встанут колом на Pop, потому что задач пока нет.
 SetLength(fThreads,ThreadCount);
 For I:=Low(fThreads) to High(fThreads) do begin
 New(P);
 P^.Queue:=self;
 P^.ThreadInfo:=aThreadInfo;
 P^.ThreadNo:=I;
 fThreads[i]:=BeginThread(nil,64*1024,ThreadFunc,P,0,ThId);
 end;
 end;
 Destructor TThreadQueue.Destroy;
 var
 LockTime,CurTime:cardinal;
 S:string;
 I:integer;
 begin
 // сообщаем потокам, что надо бы добровольно откинуть копыта
 SetEvent(fKillEvent);
 // снимаемся с паузы, на случай, если это было так
 SetEvent(fPauseEvent);
 // максимальное время ожидания - 2 секунды... пока хардкод
 // в течении этого времени ВСЕ потоки должны откинуться
 LockTime:=GetTickCount+2000;
 for I:=Low(fThreads) to High(fThreads) do begin
 CurTime:=GetTickCount;
 if LockTime>CurTime then
 CurTime:=LockTime-CurTime
 else
 CurTime:=0;
 // ожидаем завершения конкретного потока
 if WaitForSingleObject(fThreads[I],CurTime)<>WAIT_OBJECT_0 then begin
 // выводим предупреждение, если поток откидываться добровольно не стал
 Str(I,S);
 OutputDebugString(PChar('Killed thread number '+S));
 TerminateThread(fThreads[I],1);
 end;
 CloseHandle(fThreads[I]);
 end;
 // типовое освобождение ресурсов
 if fRealObjs then
 For I:=Low(fTasks) to High(fTasks) do
 fTasks[i].Free;
 CloseHandle(fKillEvent);
 CloseHandle(fSemaphore);
 CloseHandle(fPauseEvent);
 DeleteCriticalSection(fCritical);
 end;
 procedure TThreadQueue.SetPaused(Pause:boolean);
 begin
 if Pause then
 ResetEvent(fPauseEvent)
 else
 SetEvent(fPauseEvent);
 end;
 function TThreadQueue.Pop(Wait:cardinal=INFINITE):TObject;
 var
 Events:array[0..1]of cardinal;
 I:integer;
 begin
 // ждём снятия с паузы
 if WaitForSingleObject(fPauseEvent,Wait)=WAIT_TIMEOUT then begin
 Result:=nil;
 exit;
 end;
 events[0]:=fKillEvent;
 events[1]:=fSemaphore;
 // ждём снятия с семафора, что означает - есть задачи
 if WaitForMultipleObjects(2,@Events,false,Wait)<>WAIT_OBJECT_0+1 then begin
 Result:=nil;
 exit;
 end;
 // входим в критическую секцию, нам нужно отредактировать список задач
 EnterCriticalSection(fCritical);
 if Length(fTasks)<=0 then begin
 // такого быть не должно, но что-то подсказывает...
 OutputDebugString('No tasks, but sema in signal state');
 LeaveCriticalSection(fCritical);
 Result:=nil;
 exit;
 end;
 // выталкиваем первую задачу в очереди и смещаем остальные в конец
 Result:=fTasks[0];
 For I:=Low(fTasks) to High(fTasks)-1 do
 fTasks[i]:=fTasks[i+1];
 SetLength(fTasks,pred(Length(fTasks)));
 // всё
 LeaveCriticalSection(fCritical);
 end;
 function TThreadQueue.Push(S:TObject):boolean;
 begin
 // nil добавлять нельзя, это признак смерти потока
 if S=nil then begin
 Result:=false;
 exit;
 end;
 // входим в критическую секцию, нам надо отредактировать перечень задач
 EnterCriticalSection(fCritical);
 if Length(fTasks)>=fMaxTask then begin
 LeaveCriticalSection(fCritical);
 Result:=false;
 exit;
 end;
 // добавляем задачу в конец
 SetLength(fTasks,succ(Length(fTasks)));
 fTasks[High(fTasks)]:=S;
 LeaveCriticalSection(fCritical);
 // поднимаем семафор. Если есть готовые к исполнению потоки - они тут же получат задачу
 ReleaseSemaphore(fSemaphore,1,nil);
 Result:=true;
 end;
 end.
Собственно, этот вариант использования в моих проектах используется чаще. Но многопоточную отработку я применяю в основном когда ограничен по вводу-выводу. Например, чтение/запись файлов, загрузка из интернета. При попытке распараллелить работу с данными я чаще встречался с проблемами и потому предпочитал работать в один поток (даже если это и не GUI поток, а отдельный, но один).
В задаче про поиск студентов по группам я вижу только один вариант параллеленья: пусть у нас 4 потока, делим весь массив на 4 части, каждую отдаём своему потоку, ждём завершения всех, результат суммируем. Какого-то пула потоков тут я не вижу. Пул требуется когда заранее число исполняемых задач неизвестно, а порождение и уничтожение потока в Windows (да и Linux, наверное) - штука довольно дорогая, потому имеет смысл держать несколько "горячих" потоков, которые будут исполнять задачи без непрерывного порождения/смерти.



