Recent

Author Topic: [Solved] Correct use of multiple threads  (Read 1609 times)

lainz

  • Hero Member
  • *****
  • Posts: 4460
    • https://lainz.github.io/
[Solved] Correct use of multiple threads
« on: August 17, 2022, 05:22:59 pm »
Hi, I need to invoke say 50 threads to upload data to an API.

For example create an array of 0..40 a TMyUploadThread.
For example create an array of 0..49 of TMyData.

Assign to the Thread the data, start the thread.

Some things to take into consideration
- Maybe not all threads are started, say we just upload 9 of 50 this time, next time can be 200. Is not predictable.
- All used threads must be stopped and freed.
- Threads must be reused / recreated for the next data.

Here is some of our current code that works, that has a flaw on it because the Execute shouldn't be called, instead should be used Start. Else I think we're just runing it on the main thread. The problem is that without Execute it finishes everything without uploading nothing  :'(

How we can fix this old code to work propperly with threads.

Note: in this demo code the filling of the data is not complete, I've ommited it, the important thing is the thread handling.

The thread itself works fine.

Some code:

Code: Pascal  [Select][+][-]
  1.  
  2. procedure TPostThread.Execute;
  3. begin
  4.   try
  5.     FAFinished:=false;
  6.     if FMemTable<>nil then AgregarRegistro;
  7.   finally
  8.     FAFinished:=true;
  9.   end;
  10. end;
  11.  
  12. .....
  13.  
  14.  
  15. var
  16.    jObject : jsontools.TJsonNode;
  17.    sErrMsg, sQuery, sCampoClave, sID:String;
  18.    i,j, iRegistros{, iCount}:integer;
  19.    MemTableQuery: TBufDataset;
  20.    Threads: array[1..ciPostThreadCount] of TPostThread;
  21.    ThreadsRecords: array[1..ciPostThreadCount] of TBufDataset;
  22.    AllFinished: boolean;
  23. begin
  24.      //si carga registros
  25.      result:=false;
  26.  
  27. // here code that loads data....
  28.  
  29.  
  30.      jObject:= jsontools.TJsonNode.Create();
  31.      MemTableQuery:= TBufDataset.Create(nil);
  32.      try
  33.              if not MemTableQuery.isEmpty then
  34.              MemTableQuery.First;
  35.              //iCount := 0;
  36.              for j:=Low(Threads) to High(Threads) do
  37.                 begin
  38.                     Threads[j]:=nil;
  39.                     ThreadsRecords[j]:=nil;
  40.                 end;
  41.              while (not MemTableQuery.EOF) and (gTerminando=false) do
  42.              begin
  43.                 for j:=Low(Threads) to High(Threads) do
  44.                 begin
  45.                   if not MemTableQuery.EOF then
  46.                   begin
  47.                     ThreadsRecords[j]:=TBufDataset.Create(nil);
  48.                     CopyFromDatasetRecord(ThreadsRecords[j],MemTableQuery);
  49.                     if not Assigned(Threads[j]) then
  50.                       Threads[j]:=TPostThread.Create(true);
  51.                     Threads[j].MemTable:=ThreadsRecords[j];
  52.                     Threads[j].Tabla:=sTabla;
  53.                     Threads[j].CampoClave:=sCampoClave;
  54.                     Threads[j].Execute;
  55.                     MemTableQuery.Next;
  56.                     //sleep(50);
  57.                   end;
  58.                 end;
  59.                 // wait till all threads finished
  60.                 repeat
  61.                   AllFinished:=true;
  62.                   for j:=Low(Threads) to High(Threads) do
  63.                     if Threads[j]<>nil then
  64.                        if not Threads[j].AFinished then
  65.                           AllFinished:=false;
  66.                 until AllFinished;
  67.                 // free the threads
  68.                 for j:=Low(Threads) to High(Threads) do
  69.                 begin
  70.                   if Threads[j]<>nil then Threads[j].Terminate;
  71.                   if ThreadsRecords[j]<>nil then FreeAndNil(ThreadsRecords[j]);
  72.                 end;
  73.              end;
  74.              for j:=Low(Threads) to High(Threads) do
  75.               begin
  76.                 if Threads[j]<>nil then FreeAndNil(Threads[j]);
  77.               end;
  78.              Result:=True;
  79.      finally
  80.         jObject.Free;
  81.         MemTableQuery.Free;
  82.      end;
  83. end;
« Last Edit: August 19, 2022, 04:34:09 pm by lainz »

dje

  • Full Member
  • ***
  • Posts: 134
Re: Correct use of multiple threads
« Reply #1 on: August 17, 2022, 06:15:13 pm »
Eeek! That looks like trouble.

First up, shouldnt this line be while not MemTableQuery.EOF?
Code: Pascal  [Select][+][-]
  1. if not MemTableQuery.EOF then

ThreadRecords should not be a global array. Since each thread has its own TBufDataset, and only that thread should ever access its dataset, the object should be a strict private field inside the TThread class. The TBufDataset should be created by a thread, and freed when the thread finishes. Usually in the constructor/destructor.

Its unclear how you are transferring thread "seeding" data to the TBufDatasets, but I would look into a intermediate data structure.

So, pull record data from MemTableQuery and place it into a new record, pass that record to the thread constructor, which will then create a new TBufDataset and work in the background doing whatever it wants without having access to any _other_ data, other than what you have provided it.

Your list of threads should be stored in a TThreadList. Which is simply a thread safe TList to prevent multiple threads accessing the list at the same time. If you look in the following code:
fpcsrc/3.2.0/rtl/objpas/classes/classes.inc

You will find a private TThread class called TExternalThread. This gives a perfect example of how to use a TThreadList to safely manage multiple TThreads.

Code: Pascal  [Select][+][-]
  1.  
  2. constructor TExternalThread.Create;
  3. begin
  4.   FExternalThread := True;
  5.   { the parameter is unimportant if FExternalThread is True }
  6.   inherited Create(False);
  7.   with ExternalThreads.LockList do
  8.     try
  9.       Add(Self);
  10.     finally
  11.       ExternalThreads.UnlockList;
  12.     end;
  13. end;
  14.  
  15. destructor TExternalThread.Destroy;
  16. begin
  17.   inherited;
  18.   if not ExternalThreadsCleanup then
  19.     with ExternalThreads.LockList do
  20.       try
  21.         Extract(Self);
  22.       finally
  23.         ExternalThreads.UnlockList;
  24.       end;
  25. end;

Note how the thread adds and removes itself from a global TThreadList. Only then can you safely count how many threads are running, and iterate the threads to signal them to terminate.

Anyway, hope that helps.

EDIT: One last thought. Think about threads as external programs. Don't "cheat" and allow thread code access to your entire app. Isolate the thread code in its own unit, and only pass data to it that it needs. And lock those threads up in a TThreadList. Otherwise, it gets messy!
« Last Edit: August 17, 2022, 06:49:03 pm by derek.john.evans »

lainz

  • Hero Member
  • *****
  • Posts: 4460
    • https://lainz.github.io/
Re: Correct use of multiple threads
« Reply #2 on: August 18, 2022, 05:44:19 pm »
There's a way to share objects like a single tbufdataset with a copy of a single record between the starter thread and the uploader thread?

The starter thread grabs in a single pass all the data from the database. Then assigns a copy of a different single record to each upload thread.

That copy should be used only within the upload thread.

There are problems with that?

alpine

  • Hero Member
  • *****
  • Posts: 1038
Re: Correct use of multiple threads
« Reply #3 on: August 18, 2022, 06:52:20 pm »
Are those:
Code: Pascal  [Select][+][-]
  1. procedure TPostThread.Execute;
  2. begin
  3.   try
  4.     FAFinished:=false;
  5.     if FMemTable<>nil then AgregarRegistro;
  6.   finally
  7.     FAFinished:=true;
  8.   end;
  9. end;
the the only references to FAFinished? Is there a constructor for TPostThread?

What is locality of the
Code: Pascal  [Select][+][-]
  1. Threads: array[1..ciPostThreadCount] of TPostThread;
Because of conditional creation with:
Code: Pascal  [Select][+][-]
  1. if not Assigned(Threads[j]) then
  2.                       Threads[j]:=TPostThread.Create(true);
existing (already completed) thread can be skipped with AFinished = True, i.e. not created and filled with zeroes.

Hypothesis: Calling all Threads[j].Execute; in a serial manner in a current thread will do the job and then end with all AFinished = True. Replacing with Threads[j].Start; will delay the
Code: Pascal  [Select][+][-]
  1. try
  2.     FAFinished:=false;
into the Execute and:
Code: Pascal  [Select][+][-]
  1. // wait till all threads finished
  2.                 repeat
  3.                   AllFinished:=true;
  4.                   for j:=Low(Threads) to High(Threads) do
  5.                     if Threads[j]<>nil then
  6.                        if not Threads[j].AFinished then
  7.                           AllFinished:=false;
  8.                 until AllFinished;
may exit right on the first pass.

It must be something with the TPostThreads lifecycle...
"I'm sorry Dave, I'm afraid I can't do that."
—HAL 9000

loaded

  • Hero Member
  • *****
  • Posts: 824
Re: Correct use of multiple threads
« Reply #4 on: August 18, 2022, 07:06:51 pm »
I had a similar job once. Here is the code I used for a situation where the thread needs to be terminated regardless of what state it is in;
Code: Pascal  [Select][+][-]
  1.  
  2.   type
  3.   TSendThread = class(TThread)
  4.   private
  5.     FPngstream: tmemorystream;
  6.     ...
  7.   protected
  8.     procedure Execute; override;
  9.   public
  10.     constructor Create(....);
  11.   end;
  12.  
  13.   ....
  14.   FThread:array of TSendThread;  
  15.  
  16.  
  17.   try
  18.     for ix:=low(FThread) to High(FThread) do
  19.     begin
  20.       TerminateThread(FThread[ix].Handle,0);
  21.       KillThread(FThread[ix].ThreadID);
  22.       Application.ProcessMessages;
  23.     end;
  24.   except
  25.   end;
  26.  
Check out  loaded on Strava
https://www.strava.com/athletes/109391137

jamie

  • Hero Member
  • *****
  • Posts: 6090
Re: Correct use of multiple threads
« Reply #5 on: August 18, 2022, 11:07:13 pm »
for shared objects isn't using the TCriticalSection variable the correct way of controlly the access ?
The only true wisdom is knowing you know nothing

lainz

  • Hero Member
  • *****
  • Posts: 4460
    • https://lainz.github.io/
Re: Correct use of multiple threads
« Reply #6 on: August 18, 2022, 11:19:12 pm »
for shared objects isn't using the TCriticalSection variable the correct way of controlly the access ?

I don't know for that I'm asking.

The question is. If I share an object with a thread there will be any problem if only that thread uses the data.

And where to free the data?

jamie

  • Hero Member
  • *****
  • Posts: 6090
Re: Correct use of multiple threads
« Reply #7 on: August 18, 2022, 11:31:29 pm »
You use a common path to access that data so that this path will have to pass through the Critical section test.

This will block other threads from accessing anything between those two points.

 First you create the critical section variable somewhere in code where it is accessible from all threads.

Then you use the EnterCriticalSection, do you code and then use the LeaveCriticalSection..

 when using the EnterCriticalSection, the execution will be block until the ref count is 0.
The only true wisdom is knowing you know nothing

paweld

  • Hero Member
  • *****
  • Posts: 970
Re: Correct use of multiple threads
« Reply #8 on: August 18, 2022, 11:32:45 pm »
Each thread should have its own copy of the dataset, especially since it will need a maximum of a few records.Sample in attachment.
Best regards / Pozdrawiam
paweld

lainz

  • Hero Member
  • *****
  • Posts: 4460
    • https://lainz.github.io/
Re: Correct use of multiple threads
« Reply #9 on: August 19, 2022, 12:25:22 am »
Hi paweld, awesome demo thanks =)

I'm asking for more help, if is not too much to ask. Paweld's code:

Code: Pascal  [Select][+][-]
  1. procedure TForm1.Button1Click(Sender: TObject);
  2. var
  3.   i, j: Integer;
  4.   recarr: array of Integer;
  5. begin
  6.   SpinEdit1.Enabled := False;
  7.   Button1.Enabled := False;
  8.   Button2.Enabled := True;
  9.   Application.ProcessMessages;
  10.   SetLength(tharr, SpinEdit1.Value);
  11.   for i := Low(tharr) to High(tharr) do
  12.   begin
  13.     SetLength(recarr, Random(10) + 1);
  14.     for j := Low(recarr) to High(recarr) do
  15.       recarr[j] := Random(MasterData.RecordCount) + 1;
  16.     tharr[i] := TPostThread.Create(i, recarr, MasterData);
  17.     SetLength(recarr, 0);
  18.   end;
  19. end;

Instead of random, I want to equally distribute the ammount of records, and to don't duplicate records for each thread. The order is not important, just to don't duplicate, miss any record.

Also if the number of records is less than the number of threads, just limiting the number of threads.

Any ideas on how to change that piece of code to do that?

Gustavo 'Gus' Carreno

  • Hero Member
  • *****
  • Posts: 1111
  • Professional amateur ;-P
Re: Correct use of multiple threads
« Reply #10 on: August 19, 2022, 01:00:34 am »
Hey lainz,

What your asking seems to be that is the perfect use case for a thread pool.

In a thread pool you can have 1 to a GAZILLION threads.

You tell the pool manager that you want to run, let's say CPU Threads - 1, of them.

Then the pool managers will pick the fist N batch of threads, run them all, and as soon as one finishes he get the next and so on until all the threads have ran.

Sorry if I', coming across as patronising in this explanation, it's not my intention.

And with this you don't even have to worry abut Critical Sections, because when You add a thread to the pool, you should also include the data in some way, like a self contained data class or a record.
Unless you need to write state back to a database. Well, even then, you have an ID and database connections are usually thread safe.

Sorry, no code, since I've never had the real need to use thread pools, so I don't have any links, not code for it.

Cheers,
Gus
Lazarus 3.99(main) FPC 3.3.1(main) Ubuntu 23.10 64b Dark Theme
Lazarus 3.0.0(stable) FPC 3.2.2(stable) Ubuntu 23.10 64b Dark Theme
http://github.com/gcarreno

paweld

  • Hero Member
  • *****
  • Posts: 970
Re: Correct use of multiple threads
« Reply #11 on: August 19, 2022, 01:21:54 am »
Roughly something like this:
Code: Pascal  [Select][+][-]
  1. procedure TForm1.Button1Click(Sender: TObject);
  2. var
  3.   i, j, k, l: Integer;
  4.   recarr: array of Integer;
  5. begin
  6.   SpinEdit1.Enabled := False;  Button1.Enabled := False;
  7.   Button2.Enabled := True;
  8.   Application.ProcessMessages;
  9.   if MasterData.RecordCount < SpinEdit1.Value then
  10.     SetLength(tharr, MasterData.RecordCount)
  11.   else
  12.     SetLength(tharr, SpinEdit1.Value);  
  13.   k := Length(tharr);  
  14.   l := MasterData.RecordCount;  
  15.   for i := Low(tharr) to High(tharr) do  
  16.   begin
  17.     if l mod k > i then
  18.       SetLength(recarr, (l div k) + 1)
  19.     else
  20.       SetLength(recarr, l div k);
  21.     for j := Low(recarr) to High(recarr) do
  22.       recarr[j] := i + (j * k);
  23.     tharr[i] := TPostThread.Create(i, recarr, MasterData);
  24.     SetLength(recarr, 0);
  25.   end;
  26. end;
« Last Edit: August 19, 2022, 01:27:27 am by paweld »
Best regards / Pozdrawiam
paweld

lainz

  • Hero Member
  • *****
  • Posts: 4460
    • https://lainz.github.io/
Re: Correct use of multiple threads
« Reply #12 on: August 19, 2022, 01:41:59 am »
Thanks again.

And finally a question of the usage of criticalsection.
That should be used only when more than a thread is used or as well for example when timers are involved?

HeavyUser

  • Sr. Member
  • ****
  • Posts: 397
Re: Correct use of multiple threads
« Reply #13 on: August 19, 2022, 02:00:50 am »
Thanks again.

And finally a question of the usage of criticalsection.
That should be used only when more than a thread is used or as well for example when timers are involved?
depends on the timers. standard windows timers send a message to the application message loop which ends up being the serializer  so you are safe with out a critical section.

alpine

  • Hero Member
  • *****
  • Posts: 1038
Re: Correct use of multiple threads
« Reply #14 on: August 19, 2022, 08:34:01 am »
Thanks again.

And finally a question of the usage of criticalsection.
That should be used only when more than a thread is used or as well for example when timers are involved?
In general, critical sections (or other means of mutual exclusion, e.g. mutexes) should be used when there is a possibility of interim change of the shared resource. If all threads just read the resource, and there is no side effects of the reads (i.e. ds.next or like), everything should be fine.

BTW why threads were spawned for posting? I can't see a differentiation between them. I mean for the server it is normal but for posting of a single dataset? What is the benefit?
"I'm sorry Dave, I'm afraid I can't do that."
—HAL 9000

 

TinyPortal © 2005-2018