Recent

Author Topic: Utilising Multiple Processors with Single Jobs from a Large Batch  (Read 10495 times)

Gizmo

  • Hero Member
  • *****
  • Posts: 831
Hi

A common complaint by users of my program (a data hashing program) is that it doesn't utilise multiple processing power. I keep returning to the concept of multi-threading which always seems a bit tricky for using multiple CPU's for hashing one file. However, one user has made a valid point, in that if there are, say 100K files, it could be coded to use multiple processes to each individually look at one file each, as opposed to using multiple processes to look at the same file, and then the next file, and so on. So multiple CPU power is used, but on a process by process basis; one per file. So lets say there's 1 file that's enormous, then one process can be hashing that, and meanwhile, the other 99K smaller files can be examined by other CPU processes in batches of say 4 or 6 or however many cores there are.

However, having never done it, I'm not really sure where to start, other than my awareness of TProcess which I have used for calling external programs.

Basically, I have a FileSearcher instance that finds all the files in a given folder. For each found file, it calls a function that returns a hash value for the file name it was passed. So I need some help with what steps to take next, to make the existing code into how I describe above. If any of you could give me a pointer?

Code: [Select]
var
 FS : TFileSearcher;
begin
  try
    FS := TFileSearcher.Create;
    FS.OnFileFound := @MyHashingFunction;
    FS.Search(PathToFile, SearchMask, True, False);  // So now, for each file found, MyHashingFunction is called on it
  finally
    FS.Free;
  end;
end;


taazz

  • Hero Member
  • *****
  • Posts: 5368
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #1 on: May 09, 2017, 02:53:35 pm »
There are 3 main steps here.
1.a) create a self contained hash function. Meaning create a function that takes a filename and returns a hash, the code inside the function must not access data outside the function or use any class, function etc that requires access to external data. (Thread safe code)
1.b) Create a TThread descendant (ee THashThread) that uses this function keeping the data access inside the class it self only.(Can be merged to the step above by implementing the hash function directly in the execute method.)
2) Create a thread safe string list to hold the file names found by your searcher class. Keep in mind that this is to be accessed by both the manager and the main thread which makes it the
3) Create a thread manager class that will manage 2 things
  a) how many threads are executed simultaneously.
  b) monitors the thread safe string list for items and feeds them one by one to the next THashThread class.
Good judgement is the result of experience … Experience is the result of bad judgement.

OS : Windows 7 64 bit
Laz: Lazarus 1.4.4 FPC 2.6.4 i386-win32-win32/win64

Gizmo

  • Hero Member
  • *****
  • Posts: 831
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #2 on: May 10, 2017, 12:44:10 am »
Thanks Taaz...always helpful as ever.

Points 1 and 2 I think I can achieve with some more reading. Have browsed https://www.freepascal.org/docs-html/rtl/classes/tthread.html for example.

Point 3 may be the difficulty. Creating a "thread manager" sounds complicated. Do you know of any existing examples or projects that I could draw some knowledge from?

sky_khan

  • Guest
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #3 on: May 10, 2017, 03:24:53 am »
Well, let me remind you if you run this on old hard disks reading from multiple files at the same time may be slower than reading them one by one because of seek time of disk head or still it can be better if processing them takes too much time. Maybe you should try it by testing your current program as different processes on different folders or something.

bylaardt

  • Sr. Member
  • ****
  • Posts: 309
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #4 on: May 10, 2017, 04:39:25 am »
whаt Tаzz said is valid, and you can try use mtprocs too.
http://wiki.freepascal.org/Parallel_procedures

taazz

  • Hero Member
  • *****
  • Posts: 5368
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #5 on: May 10, 2017, 08:34:10 am »
Thanks Taaz...always helpful as ever.

Points 1 and 2 I think I can achieve with some more reading. Have browsed https://www.freepascal.org/docs-html/rtl/classes/tthread.html for example.

Point 3 may be the difficulty. Creating a "thread manager" sounds complicated. Do you know of any existing examples or projects that I could draw some knowledge from?
Thread manager requirements.
1) does not pause or otherwise stops the main thread from searching.
2) can create a destroy thashthreads as needed.
3) has a property named MaxActiveThreadCount.
4) Has a Constructor that looks like.
Code: Pascal  [Select][+][-]
  1.  
  2. constructor TThreadManager.Create(aFileList:TThreadSafeFileList);
  3. begin
  4.    inherited Create .....
  5.    FFiles:= aFileList;
  6. end;
  7.  
Usually it is a TThread descendant it self, its execute method will look something like this
Code: Pascal  [Select][+][-]
  1. procedure TThreadManager.Execute;
  2. begin
  3.   FLastAccessed := -1;
  4.   While not Terminated do begin
  5.     if (FFiles.Count > FlastAccessed) and (FActiveThreads <= FMAxActiveThreadCount )then begin
  6.       with THashthread.Create(suspended) do begin
  7.         Filename := FFiles.Strings[FLastAccessed+1];
  8.         InterLockedIncrement(FLastActive);
  9.         OnTerminate := @ManageTheHash;
  10.         FreeOnTerminate := True;
  11.         InterLockedIncrement(fActiveThreads);
  12.         Resume;
  13.       end;
  14.     end;
  15.     sleep(0);//do not run in a tight loop give some breathing space to the system.
  16.   end;
  17. end;
  18. procedure TThreadManager.managethehash(sender:TObject);
  19. begin
  20.   InterLockedDecrement(FActiveThreads);
  21.   doHashed(THashThread(Sender).Filename, THashThread(Sender).Hash);
  22. end;
  23.  
a) As you can see the manager assumes that the list with the filenames never looses items. if for example the manager has processed 10 items then you delete one and add a new the manager will never process the new item.
b) I think that ManageThehash will always execute in the main thread regardless that is a method of manager class but I'm not sure you need to test for it your self. if it does then the above pseudo code should be fine otherwise use the thread's synchronize call. to call the dohashed.
Good judgement is the result of experience … Experience is the result of bad judgement.

OS : Windows 7 64 bit
Laz: Lazarus 1.4.4 FPC 2.6.4 i386-win32-win32/win64

Gizmo

  • Hero Member
  • *****
  • Posts: 831
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #6 on: May 19, 2017, 12:33:54 pm »
Taaz

I think I'm getting there. Over the last few evenings I've started a basic application (to avoid confusing my main program) that just has a button to select a folder of files, and MD5 hashes using the HashLib4Pascal library.

Now, it SEEMS to work, in that for each file found via the FileIterator, a thread is invoked. When pointed at a folder of many variable sized files, multiple files are started and they each gradually finish at various times. CPU usage gets much higher - 4 cores utilise about 80% of total CPU usage. And usage decreases when there's only one large file left.

Trouble is, I seem to have done it without utilising your threadmanager, which makes me feel it's wrong or unstable. It just calls a threadworker for every found file from the users chosen folder.

Relevant code samples from last night are below :

Code: Pascal  [Select][+][-]
  1. uses
  2. ... HlpHashFactory,  HlpIHash,  HlpIHashResult;
  3.    
  4. type
  5.   TFileHashingWorkerThread = class(TThread)
  6.   public
  7.     constructor Create(filename : string);
  8.     procedure Execute; override;
  9.   end;      
  10.  
  11. TForm1 = class(TForm)
  12.     Button2: TButton;
  13.     Memo1: TMemo;
  14.     Memo2: TMemo;
  15.     SelectDirectoryDialog1: TSelectDirectoryDialog;
  16.     procedure Button2Click(Sender: TObject);
  17.     procedure InvokeHashThread(FileIterator: TFileIterator);
  18.   private
  19.     { private declarations }
  20.   public
  21.     MyWorkerThread: TThread;
  22.     FileToBeHashed : string;
  23.     { public declarations }
  24.   end;
  25.  
  26. constructor TFileHashingWorkerThread.Create(filename : string);
  27. var
  28.   filetobehashed : string;
  29. begin
  30.   inherited create(false);
  31.   filetobehashed := filename;
  32. end;
  33.  
  34. procedure TForm1.InvokeHashThread(FileIterator: TFileIterator);
  35. begin
  36.   FileToBeHashed := FileIterator.FileName;
  37.   if TThread.IsSingleProcessor then  // If only one CPU, use the old way of doing it. Otherwise, call threads.
  38.   begin
  39.     Form1.Memo1.Lines.Add(FileToBeHashed + ' ' + MD5Print(MD5File(FileToBeHashed, 2097152)));
  40.   end else TFileHashingWorkerThread.Create(FileToBeHashed);
  41. end;
  42.  
  43. procedure TFileHashingWorkerThread.Execute;
  44. const
  45.    BufSize = 64 * 1024;  // 64kb buffer
  46. var
  47.   GeneratedHash : string;
  48.   FileToBeHashed : string;
  49.   fsFileToBeHashed : TFileStream;
  50.   HashInstanceMD5  : IHash;
  51.   HashInstanceResultMD5 : IHashResult;
  52.   i : integer;
  53.   Buffer: array [0 .. BufSize - 1] of Byte;
  54.   TotalBytesRead, LoopCounter : QWord;
  55.  
  56. begin
  57.   i := 0;
  58.   FileToBeHashed := Form1.FileToBeHashed;
  59.   fsFileToBeHashed := TFileStream.Create(FileToBeHashed, fmOpenRead);
  60.   Form1.Memo1.Lines.Add('Started ' + fsFileToBeHashed.FileName + ', using ThreadID ' + IntToStr(ThreadID) + ' ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now));
  61.  
  62. // ...
  63. // all the hashing stuff etc etc
  64. // ...  
  65.   HashInstanceResultMD5 := HashInstanceMD5.TransformFinal();
  66.   generatedhash := HashInstanceResultMD5.ToString();
  67.  
  68.   Form1.Memo1.Lines.Add('Finished ' + fsFileToBeHashed.filename + ' at ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now) + ' ' + GeneratedHash);
  69. end;
  70.  
  71. procedure TForm1.Button2Click(Sender: TObject);
  72. var
  73.   FilesToBeHashed : TStringList;
  74.   FileList : TFileSearcher;
  75.   SearchMask : string;
  76. begin
  77.   SearchMask := '*';  // *.* is OK on Windows but * works cross platform
  78.   if SelectDirectoryDialog1.Execute then
  79.   try
  80.     FileList := TFileSearcher.Create;
  81.     FileList.FileAttribute := faAnyFile;
  82.     FileList.OnFileFound := @InvokeHashThread;  // So for each found file, a hash thread is invoked
  83.     FileList.Search(SelectDirectoryDialog1.FileName, SearchMask, true, false);
  84.   finally
  85.     FileList.Free;
  86.   end;
  87. end;
  88.  

Does that look anywhere near correct? Or am I horribly off base and verging on unleashing computer armageddon? I'm concious that no CPU controls are in place, so how does my program not use too many threads and make the computer unstable? Does the TThread class have an OS manager that prevent instability?
« Last Edit: May 19, 2017, 01:27:36 pm by Gizmo »

sky_khan

  • Guest
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #7 on: May 19, 2017, 02:01:16 pm »
You should not use forms or its components from thread.execute ever.  It is an invitation of all kinds of bugs. I recommend putting your thread class in its own unit.
and never assume your main thread and secondary threads will be executed in order that you thought. e.g
Code: Pascal  [Select][+][-]
  1. constructor TFileHashingWorkerThread.Create(filename : string);
  2. var
  3.   filetobehashed : string;
  4. begin
  5.   inherited create(false); // --> Operating system may decide pausing your main thread and executing your newly created thread here. So filetobehashed will not be assigned in Execute.
  6.   filetobehashed := filename;
  7. end;
  8.  

You may change this as below and  make FFiletobehashed a member variable of thread and use this member variable in Execute instead of Form1.filetobehashed
Plus, you cant update forms/components directly from Execute either. You need to use TThread.Syncronize or something for this.
Code: Pascal  [Select][+][-]
  1. constructor TFileHashingWorkerThread.Create(filename : string);
  2. begin
  3.   FFiletobehashed := filename;
  4.   inherited create(false);
  5. end;
  6.  

taazz

  • Hero Member
  • *****
  • Posts: 5368
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #8 on: May 19, 2017, 03:34:24 pm »
Here are the corrections on your current code.
Code: Pascal  [Select][+][-]
  1. type
  2.   PInfoData = ^TInfoData;
  3.   TInfoData = record
  4.     Message  :string;
  5.     ThreadID :TThreadID;
  6.     When     :TDateTime;
  7.   end;
  8.  
  9.   TFileHashingWorkerThread = class(TThread)
  10.   private
  11.     FFileName :string;
  12.     FEvent    :TDataEvent;
  13.   public
  14.     constructor Create(aFilename : string; const FinishEvent:TDataEvent);
  15.     procedure Execute; override;
  16.   end;
  17.  
  18.   TForm1 = class(TForm)
  19.     Button1 :TButton;
  20.     Memo1 :TMemo;
  21.     SelectDirectoryDialog1 :TSelectDirectoryDialog;
  22.     procedure Button1Click(Sender :TObject);
  23.   private
  24.     { private declarations }
  25.     //MyWorkerThread: TThread;
  26.     FileToBeHashed : string;
  27.   public
  28.     { public declarations }
  29.     procedure InvokeHashThread(FileIterator: TFileIterator);
  30.     procedure HashInfo(Data: PtrInt);
  31.   end;
  32. .....
  33.  
  34. constructor TFileHashingWorkerThread.Create(aFilename : string;const FinishEvent:TDataEvent);
  35. begin
  36.   inherited Create(False);
  37.   FFileName := aFilename;
  38.   FEvent    := FinishEvent;
  39.   FreeOnTerminate := True; //No memory Leaks.
  40. end;
  41.  
  42. procedure TForm1.Button1Click(Sender :TObject);
  43. var
  44.   FilesToBeHashed :TStringList;
  45.   FileList        :TFileSearcher;
  46.   SearchMask      :string;
  47. begin
  48.   SearchMask := '*';  // *.* is OK on Windows but * works cross platform
  49.   if SelectDirectoryDialog1.Execute then
  50.   try
  51.     FileList := TFileSearcher.Create;
  52.     FileList.FileAttribute := faAnyFile;
  53.     FileList.OnFileFound   := @InvokeHashThread;  // So for each found file, a hash thread is invoked
  54.     FileList.Search(SelectDirectoryDialog1.FileName, SearchMask, true, false);
  55.   finally
  56.     FileList.Free;
  57.   end;
  58. end;
  59.  
  60. procedure TForm1.InvokeHashThread(FileIterator: TFileIterator);
  61. begin
  62.   FileToBeHashed := FileIterator.FileName;
  63.   if TThread.IsSingleProcessor then  // If only one CPU, use the old way of doing it. Otherwise, call threads.
  64.   begin
  65.     Form1.Memo1.Lines.Add(FileToBeHashed + ' ' + MD5Print(MD5File(FileToBeHashed, 2097152)));
  66.   end else TFileHashingWorkerThread.Create(FileToBeHashed, @HashInfo);
  67. end;
  68.  
  69. procedure TFileHashingWorkerThread.Execute;
  70. const
  71.    BufSize = 64 * 1024;  // 64kb buffer
  72. var
  73.   GeneratedHash    :string;
  74.   FileToBeHashed   :string;
  75.   fsFileToBeHashed :TFileStream;
  76.   HashInstanceMD5  :IHash;
  77.   HashInstanceResultMD5 :IHashResult;
  78.   i                     :integer;
  79.   Buffer                :array [0 .. BufSize - 1] of Byte;
  80.   TotalBytesRead,
  81.   LoopCounter           :QWord;
  82.   function NewData(msg:string;when:Tdatetime):PInfoData;
  83.   begin
  84.     Result := New(PInfoData);
  85.     Result^.Message := msg;
  86.     Result^.ThreadID := ThreadID;
  87.     Result^.When     := when;
  88.   end;
  89. begin
  90.   i := 0;
  91.   fsFileToBeHashed := TFileStream.Create(FFileName, fmOpenRead);
  92.   //Form1.Memo1.Lines.Add('Started ' + fsFileToBeHashed.FileName + ', using ThreadID ' + IntToStr(ThreadID) + ' ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now));
  93.   Application.QueueAsyncCall(FEvent,NewData('Started ' + fsFileToBeHashed.FileName + ', using ThreadID ' + IntToStr(ThreadID) + ' ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now),Now));
  94. // ...
  95. // all the hashing stuff etc etc
  96. // ...
  97.   HashInstanceResultMD5 := HashInstanceMD5.TransformFinal();
  98.   generatedhash         := HashInstanceResultMD5.ToString();
  99.  
  100.   //Form1.Memo1.Lines.Add('Finished ' + fsFileToBeHashed.filename + ' at ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now) + ' ' + GeneratedHash);
  101.   Application.QueueAsyncCall(FEvent,NewData('Finished ' + fsFileToBeHashed.filename + ' at ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now) + ' ' + GeneratedHash, Now));
  102. end;
  103.  
  104. procedure TForm1.HashInfo(Data :PtrInt);
  105. var
  106.   vStr:string;
  107. begin
  108.   writestr(vStr, PInfoData(Data)^.ThreadID, ' : ', PInfoData(Data)^.When, ' : ', PInfoData(Data)^.Message);
  109.   Memo1.Lines.Add(vstr);
  110.   Free(Data);//no memory leaks
  111. end;
  112.  

As you can see I eliminated all access to data outside the thread it self. Keep in mind that this is your main goal for now no thread should access data outside its stack. I'm also demonstrating an asynchronous method to inform the main thread of progress.

As for the thread manager there are 2 reasons to build one
1) the processing speed will degrade as the thread number increases.
2) the disk access speed will decrease as the random reads increase (the same way a fragmented disk behaves).

for now focus on writing the thread code and eliminate any possible memory leaks, after that test your implementation for a couple of hours and see how it behaves. Keep in mind that no matter how much testing you do you will never catch all the problems. After that write the manager and play with maximum number of active threads, test speed, disk speed, etc the internal buffer size can minimize the disk access and minimize disk access as well.
If you need a manager or not its up to you to decide you can always send a release candidate with out a manager and gather feed back from your clients.

Good judgement is the result of experience … Experience is the result of bad judgement.

OS : Windows 7 64 bit
Laz: Lazarus 1.4.4 FPC 2.6.4 i386-win32-win32/win64

taazz

  • Hero Member
  • *****
  • Posts: 5368
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #9 on: May 19, 2017, 03:51:33 pm »
You should not use forms or its components from thread.execute ever.  It is an invitation of all kinds of bugs. I recommend putting your thread class in its own unit.
and never assume your main thread and secondary threads will be executed in order that you thought. e.g
Code: Pascal  [Select][+][-]
  1. constructor TFileHashingWorkerThread.Create(filename : string);
  2. var
  3.   filetobehashed : string;
  4. begin
  5.   inherited create(false); // --> Operating system may decide pausing your main thread and executing your newly created thread here. So filetobehashed will not be assigned in Execute.
  6.   filetobehashed := filename;
  7. end;
  8.  
That is the old TThread implementation, in the current implementation the thread is created suspended in the constructor and it gets resumed on the afterconstruction method. take a look on the TThread constructor your self.
Good judgement is the result of experience … Experience is the result of bad judgement.

OS : Windows 7 64 bit
Laz: Lazarus 1.4.4 FPC 2.6.4 i386-win32-win32/win64

sky_khan

  • Guest
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #10 on: May 19, 2017, 04:11:06 pm »
@tazz
You're right. My example still works too but I was not aware of that change. I wish I had checked first. Its a pity I had to reply that. I violated my own signature but whatever :)

taazz

  • Hero Member
  • *****
  • Posts: 5368
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #11 on: May 19, 2017, 04:38:44 pm »
@tazz
You're right. My example still works too but I was not aware of that change. I wish I had checked first. Its a pity I had to reply that. I violated my own signature but whatever :)
No worries, we all have our blind spots, I know I have been proved proven wrong a number of times in here, the goal is to have accurate info on the thread.
« Last Edit: May 19, 2017, 04:44:47 pm by taazz »
Good judgement is the result of experience … Experience is the result of bad judgement.

OS : Windows 7 64 bit
Laz: Lazarus 1.4.4 FPC 2.6.4 i386-win32-win32/win64

Gizmo

  • Hero Member
  • *****
  • Posts: 831
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #12 on: May 22, 2017, 04:52:38 pm »
Taazz

OK, I've implemented most of your suggestions and it seems to work very well. I tested it across several large files of varying size (500Mb, 200Mb, 4Gb, 8Gb) and they all started at more or less the same second but finished at different times, which is what I was expecting. So that is good news and thank Taazz for his help and support as usual. 

But there's a few areas that stumbled me.

Line 93 and similarily 101 :
Code: Pascal  [Select][+][-]
  1. Application.QueueAsyncCall(FEvent,NewData('Started ' + fsFileToBeHashed.FileName + ', using ThreadID ' + IntToStr(ThreadID) + ' ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now),Now));
  2.  

wouldn't compile as they were. From what I can gather, the second property passed to QueueAsyncCall has to be a pointer to an Integer (ptrInt). So I changed it to the following which compiled:

Code: Pascal  [Select][+][-]
  1. Application.QueueAsyncCall(FEvent,PtrInt(NewData('Started ' + fsFileToBeHashed.FileName + ', using ThreadID ' + IntToStr(ThreadID) + ' ' + FormatDateTime('dd/mm/yy HH:MM:SS', Now),Now)));
  2.  

And then there is Free :
Code: Pascal  [Select][+][-]
  1. Line 110 :   Free(Data);//no memory leaks
  2.  

The compiler wouldn't accept that. It reported that not enough arguments were passed, yet I can see no reference to what else Free expects!? It just seems to be TObject.Free.  So, I tried "FreeAndNil(Data)", which did compile, but, when I run the program, I hit SIGSERV errors half way through. So it obviously doesn't like that. If I comment out the Free line entirely, the program compiles and seems to work fine as well, but then I guess memory leaks are occuring somewhere.

So, how might I correct this? Free(data) is insufficient arguments. FreeAndNill(Data) generates runtime errors. And commenting out free is obviously skirting around an issue that must exist for you to have mentioned it at all Taazz. As you say, I want to get this small demo project working well and stable before moving on to the next step.


molly

  • Hero Member
  • *****
  • Posts: 2330
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #13 on: May 22, 2017, 05:01:00 pm »
And then there is Free :
Probably a typo from taazz or thinking about/mixed with system api call free().

New is to be used in combination with dispose.

Gizmo

  • Hero Member
  • *****
  • Posts: 831
Re: Utilising Multiple Processors with Single Jobs from a Large Batch
« Reply #14 on: May 22, 2017, 05:06:07 pm »
Thanks Molly!

Code: Pascal  [Select][+][-]
  1. Dispose(PInfoData(Data));
  2.  


works great!!

 

TinyPortal © 2005-2018