Hello...
Please read all the following post to understand better my new algorithm...
I have presented to you yesterday my algorithm of
a concurrent FIFO queue , but it was not working correctly
on the pop() side, look at the pop():
==
function TWQueue.pop(var obj:long):boolean;
var lastTail,newtemp,newtemp1,newtemp2 : long;
begin
if fwait then sema.wait;
result:=true;
repeat
newtemp1:=tail;
newtemp2:=newtemp1+1;
if newtemp2<=head then
else
begin
result:=false;
exit;
end;
if CAS(tail,newtemp1,newtemp2) then break;
sleep(0);
until false;
lastTail:=newtemp1;
repeat
if fcount1^[lastTail and fMask].flag=1 then break;
sleep(0);
until false;
obj:=getObject(lastTail);
fcount1^[lastTail and fMask].flag:=0;
end;
==
I am updating with a CAS first and getting the item after
that, that's not correct cause another popping thread can get in between and that will not work, so i have decided to invente another algorithm that solves this problem, here it's:
We begin by the pop() method, its source code look like this:
===
function TWQueue.push(tm : long):boolean;
var lastHead,newtemp:long;
i,j:integer;
begin
if getlength >= fsize
then
begin
result:=false;
exit;
end;
newTemp:=LockedIncLong(head);
lastHead:=newtemp-1;
repeat
asm pause end;
until fcount1^[lastHead and fMask].flag=0;
setObject(lastHead,tm);
fcount1^[lastHead and fMask].flag:=1;
if fwait then sema.signal;
result:=true;
end;
===
you have to know that "fsize" is fixed to the length of the queue
minus a "margin" that is equal to 1000 , that means that it's limited to 1000 threads in total that can run at the same time the push(), but you can vary the margin to higher the number of threads that can
run the push() at the same time...
So if getlength equal fsize-1 that means that 1000 threads can cross
because "if getlength >= fsize" can be false at the same time, but
even if it is false at the same time , there a margin of 1000 threads,
so my reasonning is correct here.
Now we look at the rest of the code...
Every cell of the array based queue look like this:
type cell = record
obj:long;
flag:long;
{$IFDEF CPU32}
cache:typecache2;
{$ENDIF CPU32}
{$IFDEF CPU64}
cache:typecache3;
{$ENDIF CPU64}
end;
It's cache padded to a cache-line size and the array is aligned on
64 bytes so that to avoid false-sharing...
After that we increment the "head" like this with an atomic increment...
newTemp:=LockedIncLong(head);
lastHead:=newtemp-1;
and if fcount1^[lastHead and fMask].flag=0 that means there is no item
in the cell , we will write the item on the cell by doing this:
setObject(lastHead,tm);
and after that we set the flag of the cell to 1 so that the pop()
can read from it when it's set to 1 like this:
fcount1^[lastHead and fMask].flag:=1;
so as you have noticed i have reasonned and explained to you the push() side, and i think my reasonning is correct here.
Now here is the new pop() method...
==
function TWQueue.pop(var obj:long):boolean;
var lastTail,newtemp1,newtemp2: long;
i,t:integer;
begin
if fwait then sema.wait;
result:=true;
repeat
newtemp1:=tail;
newtemp2:=newtemp1+1;
if newtemp2<=head then
else
begin
result:=false;
exit;
end;
repeat
if fcount1^[newtemp1 and fMask].flag=1 then break;
sleep(0);
until ((false) or (newtemp1<>tail)) ;
obj:=getObject(newtemp1);
fcount1^[newtemp1 and fMask].flag:=0;
if CAS(tail,newtemp1,newtemp1+1) then break;
sleep(0);
t:=backoff.delay;
for i:=0 to 40*t do asm pause end;
until false;
end;
==
So as you have noticed we have to reason about this algorithm
to prove that all is correct, so follow with me please...
The very first thing to know on the pop() side is that we must increment the "tail" everytime but how can we increment the tail without going over the "head", this is why in my invention i have used a lockfree mechanism , It's like in lockfree algorithms , i have to copy and memories the "tail" into the newtemp1 variable and after that increment newtemp1 , so if newtemp1+1 is less or equal to "head" that means we are correct and we have not gone over the head , after that with a lockfree mechanism we will test with a CAS that tail is still equal to newtemp1 to be able to increment the "tail" , so my reasonning is correct here, but you have to know that the threads on the push() side will set the flag to 1 in there corresponding cells after they have put there items, so in the the pop() side we are testing with a "repeat until()" that fcount1^[newtemp1 and fMask].flag1 equal 1 if it's equal to 1 we continue , after that we get our item from the cell and we set fcount1^[newtemp1 and fMask].flag1 to 0 , so notice with me that even if we set fcount1^[newtemp1 and fMask].flag1 to 0 before incrementing tail, that's not a problem cause we have a margin of 1000
on the push() side so the threads on the push() side will stop pushing when "if getlength >= fsize" and fsize is equal to the length of the queue minus a "margin" of 1000.
So as you have noticed i have reasonned about my algorithm an i think
my algorithm is correct now.
Finally i have benchmarked my algorithm without using my SemaMonitor
and it's giving 2x times more throughtput on the pop() side than the
Chriss Thomasson concurrent Queue that uses a the bakery algorithm,
my new algorithm has scored 10 millions of pop() transactions per second on my 2.4 GHz Quadcore, and 6.4 millions of push() transaction per second. That's very fast, and it scales better even if the number of threads are greater than the number of cores, and it has more parallelism than the two locks algorithm.
You can download my new algorithm of a very fast concurrent FIFO queue
from:
http://pages.videotron.com/aminer/Thank you,
Amine Moulay Ramdane.