Recent

Author Topic: Tthread.Queue : How to use it?  (Read 4312 times)

witenite

  • New Member
  • *
  • Posts: 41
Tthread.Queue : How to use it?
« on: December 02, 2019, 01:04:52 am »
Hi chaps,
In recent times I have been playing (and learning) a lot with regard to multi-threading. I have reviewed various options for acquiring some relatively high speed data (periodically rolls through a 16 bit value/Word approximately every 1.2ms) in a thread, and then passing said data on to the main loop. After using about 3 different methods, the best success has been achieved using WriteAsyncQueue. Using this method and an array (buffer) for the data, I fill up an array in the thread, and then pass the data on to my main loop by using WriteAsyncQueue to trigger an event. In this event I make a copy of said data, and then use it to populate a graph (as a test to verify the data makes sense, and I can review the data at leisure).

This process almost works 100% perfectly, and I have no thread variable contentions of any sort, as the data always makes sense (as part of my evaluation procedure, I am testing the data in the buffer, as well as testing for buffer overruns). However I have discovered one caveat: WriteAsyncQueue executes the triggered event as a critical section, which means in effect my worker thread is momentarily frozen while the event is processed in the main loop. This means I lose up to about 15ms of valuable data, as the worker thread fails to pick up the data every (approximately) 1.2ms. The data in question is coming from an I2C Analogue to Digital converter.

With more investigating, I came across the TThread.Queue method (as found here: https://www.freepascal.org/docs-html/rtl/classes/tthread.queue.html )

This method is not process blocking like TThread.Synchronize, however that doesn't necessarily mean it isn't running a critical section internally, similar to WriteAsyncQueue, which would have the same negative result in my case. With that said, I would like to use it, however I have been completely unsuccessful in finding a working example online, of a worker thread that triggers an event in the main loop using TThread.Queue.

Could somebody please kindly point me in the direction of a working example (or at least explain with some code in sufficient detail) as to how to go about using this method? I would also appreciate any other comments or suggestions, should you feel there is a better means of going about achieving what I am trying to do.

Thanks again guys.

jamie

  • Hero Member
  • *****
  • Posts: 6077
Re: Tthread.Queue : How to use it?
« Reply #1 on: December 02, 2019, 01:26:46 am »
I am going to assume for the moment you are using windows.

In your Thread, use postMessage(MainThreadWindowHandle, WM_USER+?,Optional1, Optional2);

implement a circular buffer in your thread, this buffer should be allocated outside of the thread so not cause collision with the memory manager. The buffer type should be a wrap around type and does not need to be large. You need the last index, current index, basically a FIFO type and use a binary add + AND to wrap the index pointer around. So this means a 2's complement size buffer,
like 16,32, 64,128, 256 size buffer etc. The index calculation is basically MOD type.

 The idea is to post a message to the main window and you can process this message in the main thread. when the message is received you read from the circular buffer until empty then exit that method.

 In your form class.

  Procedure WMMyMessage(Var M:LMessage); message WM_USER+?;

and supply the method body for it.

« Last Edit: December 02, 2019, 01:33:03 am by jamie »
The only true wisdom is knowing you know nothing

jamie

  • Hero Member
  • *****
  • Posts: 6077
Re: Tthread.Queue : How to use it?
« Reply #2 on: December 02, 2019, 01:36:56 am »
P.S.

 Also remember to move the Form.handle to a normal variable somewhere, do this in the main thread before starting the secondary thread. From the secondary thread you use this reference for the form.handle when using PostMessage

 You don't want to be accessing the form's class body from the thread like that ;)
The only true wisdom is knowing you know nothing

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #3 on: December 02, 2019, 04:08:00 am »
I am going to assume for the moment you are using windows.

Doh! Sorry, I broke rule number 1: State your OS, version information etc.

I'm compiling this on my main computer (Ubuntu 18.04LTS Linux based) and cross compiling to Raspbian (which is also Linux based obviously). Is postmessage (and sendmessage for that matter) usage limited to Windows, or have they been adapted to suit other OS such as Linux too (This was another question I never found an answer for online)?

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #4 on: December 02, 2019, 04:14:34 am »
Also remember to move the Form.handle to a normal variable somewhere, do this in the main thread before starting the secondary thread. From the secondary thread you use this reference for the form.handle when using PostMessage
I haven't done that before. I'm sure there is good reason to do so, but I don't know why. Could you briefly elaborate (or point me in the direction of more info)? Thanks.

PascalDragon

  • Hero Member
  • *****
  • Posts: 5444
  • Compiler Developer
Re: Tthread.Queue : How to use it?
« Reply #5 on: December 02, 2019, 09:32:13 am »
This process almost works 100% perfectly, and I have no thread variable contentions of any sort, as the data always makes sense (as part of my evaluation procedure, I am testing the data in the buffer, as well as testing for buffer overruns). However I have discovered one caveat: WriteAsyncQueue executes the triggered event as a critical section, which means in effect my worker thread is momentarily frozen while the event is processed in the main loop. This means I lose up to about 15ms of valuable data, as the worker thread fails to pick up the data every (approximately) 1.2ms. The data in question is coming from an I2C Analogue to Digital converter.

With more investigating, I came across the TThread.Queue method (as found here: https://www.freepascal.org/docs-html/rtl/classes/tthread.queue.html )

This method is not process blocking like TThread.Synchronize, however that doesn't necessarily mean it isn't running a critical section internally, similar to WriteAsyncQueue, which would have the same negative result in my case. With that said, I would like to use it, however I have been completely unsuccessful in finding a working example online, of a worker thread that triggers an event in the main loop using TThread.Queue.

TThread.Queue works like Synchronize though you need to keep the data available until the Queue call returns. However for your purpose this wouldn't work as the queue mechanism (which is shared by Queue and Synchronize) as you suspected internally uses a critical section. What you need is a lockless threadsafe queue (with a good implementation of such maybe I'd adjust the internals of TThread as well...)

sash

  • Sr. Member
  • ****
  • Posts: 366
Re: Tthread.Queue : How to use it?
« Reply #6 on: December 02, 2019, 09:41:37 am »
I am going to assume for the moment you are using windows.
... postMessage(MainThreadWindowHandle, ...
Instead of proposing non-portable code, I'd rather suggest RTLEvents.
Lazarus 2.0.10 FPC 3.2.0 x86_64-linux-gtk2 @ Ubuntu 20.04 XFCE

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #7 on: December 02, 2019, 10:20:47 am »

TThread.Queue works like Synchronize though you need to keep the data available until the Queue call returns. However for your purpose this wouldn't work as the queue mechanism (which is shared by Queue and Synchronize) as you suspected internally uses a critical section. What you need is a lockless threadsafe queue (with a good implementation of such maybe I'd adjust the internals of TThread as well...)
Thanks for confirming what I suspected (as far as TThread.Queue using a critical section in its method). I don't believe I will have too many difficulties getting my buffer to be thread safe once I have a lockless option up and running, as I have literally spent weeks carrying out many tests transferring variables of various types across the thread barrier and monitoring/mitigating variable contention. I wrote a good program to test this, thrashing 2 CPU's at 100% for hours at a time. Tested on both my Intel I7 based machine as well as the ARM based Raspberry Pi 3 board.

I had some interesting results, but the bottom line is that only Boolean values were thread safe. Given enough time, I could see failures of every other data type tested, given enough loop back tests. I was surprised to see that even a Word (being a 16bit unsigned integer) fails on a 64 bit system (and presumably on 32bit as well). This more or less aligns with what I read online that with multiple core machines running threads asynchronously, atomicity doesn't mean much anymore. With that said, I did write some efficient code to detect collisions or variable contention and avoid accepting said erroneous data. This was achieved in the worker thread that was reading the value as written by the main loop. I even had additional detection to detect if my data trap failed, and let erroneous data in. After literally 10's of millions of read/write loop back tests, I confirmed that I had detected and avoided 100% of all instances of variable contention. The code was deliberately written to not have any critical sections or synchronization going on to achieve all this.

Of course all this extensive testing/learning has not revealed to me how to trigger an event in the main loop when the thread has filled the buffer as required (and not incur any critical section as you do with WriteAsyncQueue). To me that would be ideal. Thread fills buffer, makes copy of buffer, triggers main loop event, returns to filling buffer from start. Then main loop services the triggered event when convenient, and processes the copied buffer before the thread whips around and refreshes it once more. Buffer overrun would be easy to detect (and mitigate) too, and the buffer size would be dependent on how long the application takes to periodically service the triggered events, and sample rate of worker thread.

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #8 on: December 02, 2019, 10:26:40 am »
Instead of proposing non-portable code, I'd rather suggest RTLEvents.
Thanks Sash, being a big Linux fan, I definitely prefer to learn options that are ideally platform/OS agnostic. I have developed software before that relies heavily on Windows API's etc. and while they certainly hold some attraction, code that I can simply cross compile etc. with zero changes is by far the most appealing option.  :D

avra

  • Hero Member
  • *****
  • Posts: 2514
    • Additional info
Re: Tthread.Queue : How to use it?
« Reply #9 on: December 02, 2019, 01:19:44 pm »
WriteAsyncQueue executes the triggered event as a critical section, which means in effect my worker thread is momentarily frozen while the event is processed in the main loop. This means I lose up to about 15ms of valuable data, as the worker thread fails to pick up the data every (approximately) 1.2ms. The data in question is coming from an I2C Analogue to Digital converter.

This is how critical section is used in QueueAsyncCall():
Code: Pascal  [Select][+][-]
  1.   System.EnterCriticalsection(FAsyncCall.CritSec);
  2.   try
  3.     with FAsyncCall.Next do begin
  4.       lItem^.PrevItem := Last;
  5.       if Last<>nil then begin
  6.         assert(Top <> nil, 'TApplication.QueueAsyncCall: Top entry missing (but last is assigned)');
  7.         Last^.NextItem := lItem
  8.       end else begin
  9.         assert(Last = nil, 'TApplication.QueueAsyncCall: Last entry found, while Top not assigned');
  10.         Top := lItem;
  11.       end;
  12.       Last := lItem;
  13.     end;
  14.   finally
  15.     System.LeaveCriticalsection(FAsyncCall.CritSec);
  16.   end;

I really do not see how could anything there block for 15ms. Especially since QueueAsyncCall() is called from the working thread. And WriteAsyncQueue() is called from the main GUI thread, and there is no lock at all inside of WriteAsyncQueue(). Do you use QueueAsyncCall() exactly as explained here, or in some other way? Is your WriteAsyncQueue() located in the GUI thread?

Your worker thread (by calling dozens of QueueAsyncCalls in a row) should with each call simply create a New() data record, assign data to it, and use QueueAsyncCall() to put a pointer to WriteAsyncQueue() and a pointer to data in the main GUI thread message queue. That serialization uses critical section and is thread safe. Then when OS decides that time came to update your GUI (which could depending on the OS be close to that 15ms you have found empirically), all queued calls to WriteAsyncQueue() should be executed one by one, and data should be Dispose()d.

Anyway, alternative lock free queue implementation can be found here:
https://sites.google.com/site/scalable68/system/app/pages/sitemap/hierarchy
ct2laz - Conversion between Lazarus and CodeTyphon
bithelpers - Bit manipulation for standard types
pasettimino - Siemens S7 PLC lib

Martin_fr

  • Administrator
  • Hero Member
  • *
  • Posts: 9754
  • Debugger - SynEdit - and more
    • wiki
Re: Tthread.Queue : How to use it?
« Reply #10 on: December 02, 2019, 02:18:34 pm »
First of all, I do not know what linux and how you set it up. This does not matter do most of my answer, but it may matter do what you do.

On a normal OS, nothing is realtime, any thread (including your data collector) can be interrupted for any amount of time (usually happens if other apps run on your disk, and compete for cpu or ram (worst, if your thread gets swapped to disk).....
That also means you have to think on how to manage memory, as again you could run into swapping...

Do you use an intel compatible CPU?
They have "lock" asm instruction. In FPC InterlockedExchange/InterlockedIncrement/...Decrement/...Add
Those are but *one* asm instruction. You cant get faster than this, but...

Now before diving into it, you should read up on "memory barriers" (google)
If thread-A does "Foo := 1", then it is possible that thread-B does not see this (since both threads have CPU mem caches). So you cant just change variables...

This is where interlocked comes in. It ensures cpu caches are flushed. (and whatever other optimization your cpu may do).
So each interlocked, does have a small cost, because after it, the cpu must read data from slower ram, rather than cache (and may have lost other optimizations).
However, any critical section always includes this too, so the interlocked can (depending on usage) be faster.

Next it depends on how you organize your memory.
If your worker (reader) thread has to allocate new memory whenever needed, then the "allocmem" call to the OS will cost time, and that may again loose a read sample. FPC pre-allocs big chunks from the OS (because the OS call is expensive), and gives you slices of it as you need them (which is quick, and why you normally do not see time loss on getting memory). But if you use more and more memory, you will get the OS call... I have no idea how much time that costs (So it could be fast enough, but I do not know).
I also do not know, how the fpc mem manager implements thread safety, in case you make any allocation/deallocations.

Also interlocked does not wake the main thread. So you have to have your main thread run in a loop, always looking for data. Your main thread will therefore use more CPU time.
Normally your main thread sleeps, if it has nothing todo, and Syncronize/Queue/... will signal it to wake up.


Ok that was a lot of info upfront. 

If you have only one thread writing (or a separate queue for each thread), then you can do something like (not tested)

var
  entries: array [0..15] of TEntry;  // 16 entries, must be power of 2
  writePos: integer;  // next write goes to entries[(writepos + 1) and 15];
  ReadPos: integer;  // the next read of the main thread will be entries[readpos and 15]; /

Only the ONE worker thread is allowed to write
Only the main thread will read (and must do so, without for a wake up call)

If (writepos + 1) and 15 = (readpos and 15) the queue is full, and the worker must alloc more mem, and keep collecting date. It will then later write a bigger chunk of data, all at once.

If (readpos and 15) = (writepos and 15) then the queue is empty

ReadPos is ONLY changed by the main thread
WritePos is ONLY changed by the worker thread

Both threads will increment their ...pos. So the value can be 1299 or anything. An "AND" will give the correct number (hence the array size must be a power of 2).
The counters will overflow when the reach $FFFFFFFF, but that still is ok.

worker thread
  r := InterlockedAdd(ReadPos, 0); // threadsafe read, as "add 0" is a none op
  //readpos can change, while we are between lines.... But that does not matter in this case (you can simulate all possibilities)
  // we can read writepos directly, since the main thread never changes it
  If (writepos + 1) and 15 = (readpos and 15) then
     ... keep going, queue full (keep the data, and join it with whatever is read next)
   // write to queue
  entries[writepos + 1] := data
  InterLockedIncrement(writePos)  // threadsave inc, so the main thread will be able to read it.

main thread
  w := InterlockedAdd(WritePos, 0); // threadsafe read, as "add 0" is a none op
  //writepos can change, while we are between lines.... But that does not matter in this case (you can simulate all possibilities)
  // we can read readpos directly, since the worker thread never changes it
  If (writepos) and 15 = (readpos and 15) then
     ... keep going, queue empty
  data := entries[readpos and 15];
  InterLockedIncrement(readPos)




You can google "interlocked queue" ...
« Last Edit: December 02, 2019, 02:58:11 pm by Martin_fr »

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #11 on: December 03, 2019, 09:38:17 am »
I really do not see how could anything there block for 15ms.

Hi Avra,
I did not pay much attention to exactly what is going on inside the critical section of code in there, sorry. The point has been raised in this forum, that another process (nothing to do with the application) may be robbing the CPU of cycles. I had not considered that. When I run the test application it uses up 2 CPU's, leaving another 2 mostly idle for the rest of the OS, and I therefore thought the OS would leave my application running unimpeded. I suppose it is conceivable that something still happens now and then as Linux decides that servicing another process or routine is more important, and puts my application on pause. This may be happening though how I would prove it is another question. I could look at changing the priority level of my worker thread, to ensure it keeps running no matter what. I have not done this before, and am not sure if it's even possible (though from what I have seen with FPC/Lazarus, there's just about nothing that cannot be done in Pascal  :))

With regard to QueueAsyncCall I made this part of my main loop, called by the worker thread. This does not seem very safe to me (having the worker thread calling a main loop procedure), however it compiles and works just fine. I followed the instructions found here:
https://wiki.lazarus.freepascal.org/Asynchronous_Calls

From the example (very similar to the example in your earlier link too) it shows both QueueAsyncCall and WriteAsyncQueue to be part of the same class or structure (presumably TMemoChannel is the main Form of the application). Am I wrong? as WriteAsyncQueue is the triggered event, and contains visual components that you would expect in the main loop/GUI it seems sensible that these are main loop procedures. Should they both instead be part of my worker thread?

Thanks again for your valued input.
« Last Edit: December 03, 2019, 10:15:02 am by witenite »

witenite

  • New Member
  • *
  • Posts: 41
Re: Tthread.Queue : How to use it?
« Reply #12 on: December 03, 2019, 10:09:56 am »
On a normal OS, nothing is realtime, any thread (including your data collector) can be interrupted for any amount of time...
Truer words were never spoken  :)
I come from an embedded firmware design background, and spent the last 20 years writing code mostly in Assembler, along with some C. I am therefore "cutting my teeth" on FPC as a high level language (I have had a few weeks worth of formal training with Delphi, and that's it!) and fully expect some roadblocks when it comes to non-realtime program execution. I have found that Linux performs a lot better than Windows, and seeing as I design predominantly for industrial applications, this suits me just fine. Linux is closer to being a realtime operating system, though it is still a long way off. This is completely alright though, and I am simply pushing the envelope as part of my learnings with this language, to see exactly what I can reasonably expect to get out of it. I have been impressed with FPC (and Delphi for that matter) to date, and plan to continue using it for the foreseeable future. Being able to cross compile to Raspberry Pi is a very nice feature too, and works well. I'm somewhat off topic here, but think it helps the guys here to better understand my abilities (or lack thereof...  :-[)

Thank you very much for your extensive information. The explanation you gave on using a circular buffer is excellent, and you actually answered some questions for me that I have not been able to find online (IE the readPOS and WritePOS potential variable conflicts when checking to see whether the buffer is full or empty, and the fact that they don't matter is good to know). I think as a matter of exercise, I am going to try this as well.

A quick question on the circular buffer: Do you use an array for the buffer? I think this should be safe (as long as I am not reading/writing the same array element at the same time across threads of course) but would like to confirm, thanks.

« Last Edit: December 03, 2019, 10:13:48 am by witenite »

440bx

  • Hero Member
  • *****
  • Posts: 3921
Re: Tthread.Queue : How to use it?
« Reply #13 on: December 03, 2019, 11:20:30 am »
A quick question on the circular buffer: Do you use an array for the buffer?
A pre-allocated array is the most efficient way, particularly when the reading and writing of elements is time sensitive as in the case you've described.   Basically, in a time sensitive process, you don't want time spent allocating new memory blocks.

I think this should be safe (as long as I am not reading/writing the same array element at the same time across threads of course) but would like to confirm, thanks.
Correctly implemented it is safe.  That means, as long as the variables "ReadPos" and "WritePos" are _always_ written by different threads then it should be fine. IOW, each variable is "owned" (written to) by one and only one thread.  The one concern is, in a lockless implementation, the buffer must be large enough to ensure data is always read faster than it is written, to ensure there is always an "already read" slot in the array available for a new element.  A lockless implementation is basically a race condition between reader and writer where the winner of the race is "guaranteed" (hopefully, the "reader" thread.)

« Last Edit: December 03, 2019, 11:22:48 am by 440bx »
(FPC v3.0.4 and Lazarus 1.8.2) or (FPC v3.2.2 and Lazarus v3.2) on Windows 7 SP1 64bit.

avra

  • Hero Member
  • *****
  • Posts: 2514
    • Additional info
Re: Tthread.Queue : How to use it?
« Reply #14 on: December 04, 2019, 02:08:19 pm »
Linux decides that servicing another process or routine is more important, and puts my application on pause.
While you can not stop Linux from interrupting your code, you can make it behave in a deterministic way:
https://wiki.linuxfoundation.org/realtime/start
https://github.com/emlid/linux-rt-rpi

With regard to QueueAsyncCall I made this part of my main loop, called by the worker thread. This does not seem very safe to me (having the worker thread calling a main loop procedure), however it compiles and works just fine.
QueueAsyncCall simply puts in a serialized thread safe way 2 pointers (1 for method and 1 for data) into a queue. Then just before time comes for GUI update (either by OS or forced with Application.ProcessMessages) your main application thread stops execution of your code and calls all methods from the queue. Nothing unsafe there. Only one thread has GUI access, so no problems at all.

I followed the instructions found here:
https://wiki.lazarus.freepascal.org/Asynchronous_Calls

From the example (very similar to the example in your earlier link too) it shows both QueueAsyncCall and WriteAsyncQueue to be part of the same class or structure (presumably TMemoChannel is the main Form of the application). Am I wrong? as WriteAsyncQueue is the triggered event, and contains visual components that you would expect in the main loop/GUI it seems sensible that these are main loop procedures. Should they both instead be part of my worker thread?
Forum example is very similar because record based thread safe GUI update example in the wiki was added by me after that forum discussion. I gave you forum link because unlike wiki it shows some discussion on the topic. It doesn't matter where WriteAsyncQueue is located (although there might be some good and some bad practices). What matters is that WriteAsyncQueue gets called from the main thread at the right time to allow thread safe GUI access. TMemoChannel is not a form. It is a class that I have implemented for MultiLog and it was used to add thread safe logging to TMemo located in some user application form. You can find full implementation here: https://github.com/blikblum/multilog/blob/master/memochannel.pas. Thanks to that I have a 24/7 industrial application running for years now, having several threads logging into the same memo for visualization and into the same archive log file without issues.
ct2laz - Conversion between Lazarus and CodeTyphon
bithelpers - Bit manipulation for standard types
pasettimino - Siemens S7 PLC lib

 

TinyPortal © 2005-2018