Recent

Author Topic: My threadpool engine and scalability  (Read 3657 times)

aminer

  • Hero Member
  • *****
  • Posts: 956
My threadpool engine and scalability
« on: October 01, 2012, 05:57:13 pm »

Hello,

If you take a look at my threadpool engine:

http://pages.videotron.com/aminer/

you will notice that it does not scale in the execute() method
cause LockedIncLong(balance1) is expensive.

here is the execute method:

==

function TThreadPool.execute(func:TMyProc;const Context: Pointer): Boolean;

var
local_balance,local_count:long;

params:  PParams;
p:pointer;

begin
new(params);
setlength(params^,1);
params^[0].proc:=func;
params^[0].data:=context;

local_balance:=LockedIncLong(balance1) mod FThreadCount;
while not Queues[local_balance].push(tobject(params))  do sleep(0);
events[local_balance].setevent;
end;

===

Now to be able to scale the execute() method we have to add a getindex()
method like this

function TThreadPool.getindex(var index:long);
begin
 if  index = 0
 then
  begin
   inc(index);
    index:=index mod FThreadCount;
 end;
end;


and  also  add an execute1() method that scales perfectly like this:

==
function TThreadPool.execute1(func:TMyProc;const Context:
Pointer;index:long): Boolean;

var
local_balance,local_count:long;

params:  PParams;
p:pointer;

begin
new(params);
setlength(params^,1);
params^[0].proc:=func;
params^[0].data:=context;

while not Queues[index].push(tobject(params))  do sleep(0);
events[local_balance].setevent;
end;

==

so we have to call the getindex() method from mutiple threads like this:

// we have to initialize count to zero before:

getindex(count)
ThreadPool.execute(myproc,data,count)

and now the execute1() method will scale perfectly.


I will try update my threadpol engine soon.



Thank you,
Amine Moulay Ramdane.



aminer

  • Hero Member
  • *****
  • Posts: 956
Re: My threadpool engine and scalability
« Reply #1 on: October 01, 2012, 06:08:02 pm »
After adding getindex() and execute1() methods , this threadpool
engine can also be used as a scalable queue (not a strict FIFO queue,
but it's also useful),
.

Amine Moulay Ramdane


aminer

  • Hero Member
  • *****
  • Posts: 956
Re: My threadpool engine and scalability
« Reply #2 on: October 01, 2012, 06:19:31 pm »

I mean this threadpool engine will become scalable, cause i am
using mutiple lockfree FIFO queues , so that there is less contention
and using work-stealing also, so after adding getindex() and
execute1() methods , this threadpool engine will become scalable.
this threadpool engine can also be used as a scalable queue
(not a strict FIFO queue, but it's also useful).

This threadpool engine  is useful and that is what's important.

.

Amine Moulay Ramdane


aminer

  • Hero Member
  • *****
  • Posts: 956
Re: My threadpool engine and scalability
« Reply #3 on: October 01, 2012, 06:31:41 pm »

Hello,

Of course i can change and simplify the interface of my threadpool engine
so there will be no getindex() method just an execute1() method like this

==

 function TThreadPool.execute1(func:TMyProc;const Context:
 Pointer;index:long): Boolean;
 
 var
 local_balance,local_count:long;
 
 params:  PParams;
 p:pointer;
 
 begin
 new(params);
 setlength(params^,1);
 params^[0].proc:=func;
 params^[0].data:=context;

 if  index = 0
 then
  begin
   inc(index);
    index:=index mod FThreadCount;
 end;
 
 while not Queues[index].push(tobject(params))  do sleep(0);
 events[local_balance].setevent;
 end;

=




Amine Moulay Ramdane.

aminer

  • Hero Member
  • *****
  • Posts: 956
Re: My threadpool engine and scalability
« Reply #4 on: October 01, 2012, 06:36:17 pm »


Hello,

I have corrected a mistake in execute1() method , here it is again:

==

 function TThreadPool.execute1(func:TMyProc;const Context:
 Pointer;index:long): Boolean;
 
 params:  PParams;
 p:pointer;
 
 begin
 new(params);
 setlength(params^,1);
 params^[0].proc:=func;
 params^[0].data:=context;

 if  index = 0
 then
  begin
   inc(index);
    index:=index mod FThreadCount;
 end;
 
 while not Queues[index].push(tobject(params))  do sleep(0);
 events[index].setevent;
 end;

=


Amine Moulay Ramdane.


aminer

  • Hero Member
  • *****
  • Posts: 956
Re: My threadpool engine and scalability
« Reply #5 on: October 01, 2012, 06:43:12 pm »


Hello,

Sorry here is the final execute1() method, no need
for getindex(), i have to use LockedIncLong() , but
it will be called only once:

==

 function TThreadPool.execute1(func:TMyProc;const Context:
 Pointer;index:long): Boolean;
 
 params:  PParams;
 p:pointer;
 
 begin
 new(params);
 setlength(params^,1);
 params^[0].proc:=func;
 params^[0].data:=context;

 if  index = 0
 then
  begin
   index:=LockedIncLong(balance1) mod FThreadCount;
 end;
 
 while not Queues[index].push(tobject(params))  do sleep(0);
 events[index].setevent;
 end;

=


Thank you,
Amine Moulay Ramdane.



User137

  • Hero Member
  • *****
  • Posts: 1791
    • Nxpascal home
Re: My threadpool engine and scalability
« Reply #6 on: October 02, 2012, 01:48:32 am »
You are repeating same code block in many messages now. Just noting that it is possible to edit your earlier posts, for example just first post that has always the latest version. You can still do that, would clean up the thread quite a bit.

If you take a look at my threadpool engine:

http://pages.videotron.com/aminer/

you will notice that it does not scale in the execute() method
cause LockedIncLong(balance1) is expensive.

here is the execute method:
...
Now to be able to scale the execute() method we have to add a getindex()
method like this
...
and  also  add an execute1() method that scales perfectly like this:
...
so we have to call the getindex() method from mutiple threads like this:
...
and now the execute1() method will scale perfectly.

I will try update my threadpol engine soon.
Also i would suggest planning your message a little beforehand. I cannot tell what the purpose of the message is; representing ready code that does something very well, or if it has problems that need solving.

 

TinyPortal © 2005-2018