Recent

Author Topic: TPowerThread  (Read 9337 times)

mercurhyo

  • Full Member
  • ***
  • Posts: 242
TPowerThread
« on: July 06, 2018, 07:27:10 am »
I just put here a template that I'am working on, as a case study around Pascal power
It compiles but it is unfinished yet. I will put a final version later.

Goal =
- Command to Threads with 'orders/answers records' of data pushed on queues
- Exec methods in threads
- sync or queue callbacks to main thread after methods ends
- automaticaly THEN free 'orders/answers' records memory

Code: Pascal  [Select][+][-]
  1. unit PowThread;
  2.  
  3. { MERCVRHYO }
  4.  
  5. {$mode objfpc}{$H+}
  6.  
  7. interface
  8.  
  9. uses
  10.   Classes, SysUtils, contnrs, syncobjs;
  11.  
  12. type
  13.  
  14.   TPowerThread = class;
  15.  
  16.   PThreadCmd = ^TThreadCmd;
  17.  
  18.   TPowCallback = procedure(ACmd: PThreadCmd; Sender: TPowerThread) of object;
  19.  
  20.   TThreadCmd = record
  21.     msg: shortstring;
  22.     param: longint;
  23.     extra: Pointer;
  24.     Done: boolean; // always true when TPowMethod finished
  25.     wait: boolean; // false = queue callback - true = sync callback
  26.     callback: TPowCallBack; // called in main thread, after method finished
  27.   end;
  28.  
  29.   TPowMethod = procedure(ACmd: TThreadCmd) of object;
  30.  
  31.   { TPowCmdQueue }
  32.  
  33.   TPowCmdQueue = class(TQueue)
  34.   private
  35.     FLocker: TCriticalSection;
  36.     FNotBusy: TEvent;
  37.     FEvName: string;
  38.   protected
  39.     procedure ClearAll; virtual;
  40.     procedure PushItem(AItem: Pointer); override;
  41.     function PopItem: Pointer; override;
  42.     function PeekItem: Pointer; override;
  43.   public
  44.     constructor Create;
  45.     destructor Destroy; override;
  46.   end;
  47.  
  48.   { TPowerThread }
  49.  
  50.   TPowerThread = class(TThread)
  51.   private
  52.     type
  53.     PInternalCmd = ^TInternalCmd;
  54.  
  55.     TInternalCmd = record
  56.       method: TPowMethod;
  57.       rec: TThreadCmd;
  58.     end;
  59.   private
  60.   var
  61.     FCmdQ: TPowCmdQueue;
  62.     FCBCmd: TThreadCmd;
  63.     FCallBack: TPowCallback;
  64.   protected
  65.     procedure DoCallBack;
  66.     // redefine DefHandler in derived classes as needed
  67.     // DefHandler is called by PostMsg(nil, aShortString)
  68.     procedure DefHandler(aCmd: TThreadCmd); virtual;
  69.   public
  70.     procedure Execute; override;
  71.     constructor Create(CreateSuspended: boolean;
  72.       const StackSize: SizeUInt = DefaultStackSize);
  73.     destructor Destroy; override;
  74.     // usr procs called from outside the thread
  75.     procedure PostCmd(aMethod: TPowMethod; aCmd: PThreadCmd = nil);
  76.     procedure PostMsg(aMethod: TPowMethod; aMsg: shortstring);
  77.     // add here your TPowMethod and TPowCallback
  78.     // or inherit class(TPowerThread) and add them
  79.     // to public section
  80.     // TPowMethods will be called inside the thread while
  81.     // TPowCallback will be queued or synced inside the main thread
  82.   end;
  83.  
  84. var
  85.   MainThreadID: TThreadID;
  86.  
  87. implementation
  88.  
  89. var
  90.   EvNameFmt: string;
  91.   EvRef: smallint;
  92.  
  93. { TPowCmdQueue }
  94.  
  95. procedure TPowCmdQueue.ClearAll; // in main thread: no need lock
  96. begin
  97.   while not (TQueue(Self).Peek = nil) do
  98.     Dispose(PThreadCmd(TQueue(Self).Pop));
  99. end;
  100.  
  101. procedure TPowCmdQueue.PushItem(AItem: Pointer);
  102. var
  103.   c: TEvent;
  104. begin
  105.   FLocker.Enter;
  106.   c := TEvent.Create(nil, True, True, FEvName);
  107.   if c.WaitFor(10) <> wrSignaled then
  108.   begin
  109.     c.Free;
  110.     FLocker.Leave;
  111.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  112.   end;
  113.   c.Free;
  114.   FNotBusy.ResetEvent;
  115.   inherited PushItem(AItem);
  116.   FNotBusy.SetEvent;
  117.   FLocker.Leave;
  118. end;
  119.  
  120. function TPowCmdQueue.PopItem: Pointer;
  121. var
  122.   c: TEvent;
  123. begin
  124.   FLocker.Enter;
  125.   c := TEvent.Create(nil, True, True, FEvName);
  126.   if c.WaitFor(10) <> wrSignaled then
  127.   begin
  128.     c.Free;
  129.     FLocker.Leave;
  130.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  131.   end;
  132.   c.Free;
  133.   FNotBusy.ResetEvent;
  134.   Result := inherited PopItem;
  135.   FNotBusy.SetEvent;
  136.   FLocker.Leave;
  137. end;
  138.  
  139. function TPowCmdQueue.PeekItem: Pointer;
  140. var
  141.   c: TEvent;
  142. begin
  143.   FLocker.Enter;
  144.   c := TEvent.Create(nil, True, True, FEvName);
  145.   if c.WaitFor(10) <> wrSignaled then
  146.   begin
  147.     c.Free;
  148.     FLocker.Leave;
  149.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  150.   end;
  151.   c.Free;
  152.   FNotBusy.ResetEvent;
  153.   Result := inherited PeekItem;
  154.   FNotBusy.SetEvent;
  155.   FLocker.Leave;
  156. end;
  157.  
  158. constructor TPowCmdQueue.Create;
  159. begin
  160.   FLocker := TCriticalSection.Create;
  161.   Inc(EvRef);
  162.   FEvName := Format(EvNameFmt, [EvRef]);
  163.   FNotBusy := TEvent.Create(nil, True, True, FEvName);
  164.   inherited;
  165. end;
  166.  
  167. destructor TPowCmdQueue.Destroy;
  168. begin
  169.   FNotBusy.Free;
  170.   FLocker.Free;
  171.   inherited Destroy;
  172. end;
  173.  
  174. { TPowerThread }
  175.  
  176. procedure TPowerThread.DoCallBack;
  177. begin
  178.   FCallBack(@FCBCmd,Self);
  179. end;
  180.  
  181. procedure TPowerThread.DefHandler(aCmd: TThreadCmd);
  182. begin
  183.   aCmd.Done := True;
  184. end;
  185.  
  186. procedure TPowerThread.Execute;
  187. var
  188.   cmd: PInternalCmd;
  189. begin
  190.   while not Terminated do
  191.   begin
  192.     if FCmdQ.Count > 0 then
  193.     begin
  194.       cmd := FCmdQ.Pop;
  195.       if not Assigned(cmd^.method) then
  196.         cmd^.method := @DefHandler;
  197.       cmd^.method(cmd^.rec);
  198.       cmd^.rec.Done := True;
  199.       if Assigned(cmd^.rec.callback) then
  200.       begin
  201.         FCBCmd := cmd^.rec;
  202.         FCallBack:=cmd^.rec.callback;
  203.         if cmd^.rec.wait then
  204.           Synchronize(@DoCallBack)
  205.         else
  206.           Queue(@DoCallBack);
  207.       end;
  208.       Dispose(cmd);
  209.     end;
  210.     Sleep(1);
  211.   end;
  212. end;
  213.  
  214. constructor TPowerThread.Create(CreateSuspended: boolean; const StackSize: SizeUInt);
  215. begin
  216.   FCmdQ := TPowCmdQueue.Create;
  217.   inherited Create(CreateSuspended, StackSize);
  218. end;
  219.  
  220. destructor TPowerThread.Destroy;
  221. begin
  222.   FCmdQ.ClearAll;
  223.   FCmdQ.Free;
  224.   inherited Destroy;
  225. end;
  226.  
  227. procedure TPowerThread.PostCmd(aMethod: TPowMethod; aCmd: PThreadCmd);
  228. var
  229.   c: PInternalCmd;
  230. begin
  231.   if aCmd = nil then
  232.     exit;
  233.   new(c);
  234.   FillChar(c, SizeOf(TInternalCmd), #0);
  235.   c^.method := aMethod;
  236.   if Assigned(aCmd) then
  237.     c^.rec := aCmd^;
  238.   FCmdQ.Push(c);
  239. end;
  240.  
  241. procedure TPowerThread.PostMsg(aMethod: TPowMethod; aMsg: shortstring);
  242. var
  243.   c: PInternalCmd;
  244. begin
  245.   new(c);
  246.   FillChar(c, SizeOf(TInternalCmd), #0);
  247.   c^.method := aMethod;
  248.   c^.rec.msg := aMsg;
  249.   FCmdQ.Push(c);
  250. end;
  251.  
  252. initialization
  253.   EvNameFmt := 'EvPowThread_%.5d';
  254.   EvRef := 0;
  255.   MainThreadID := TThread.CurrentThread.ThreadID;
  256. end.
  257.  
« Last Edit: July 06, 2018, 08:27:48 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #1 on: July 06, 2018, 07:28:57 am »
I am not happy with WThread package, so I am working on an alternative (I hope crossplatform)
Faster (no variant craps)
shorter If possible
« Last Edit: July 06, 2018, 07:34:03 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #2 on: July 06, 2018, 07:34:18 am »
we'll see  :D
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #3 on: July 06, 2018, 07:42:54 am »
1st post modified ->> DoCallBack done
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #4 on: July 07, 2018, 04:00:45 am »
Well, ... after some debugging and little corrections, I can put there a working PowerThread

Code: Pascal  [Select][+][-]
  1. unit PowThread;
  2.  
  3. { MERCVRHYO }
  4. { version 0.3}
  5.  
  6. {$mode objfpc}{$H+}
  7.  
  8. interface
  9.  
  10. uses
  11.   Classes, SysUtils, contnrs, syncobjs;
  12.  
  13. type
  14.  
  15.   TPowerThread = class;
  16.  
  17.   PThreadCmd = ^TThreadCmd;
  18.  
  19.   TPowCallback = procedure(ACmd: PThreadCmd; Sender: TPowerThread) of object;
  20.  
  21.   TThreadCmd = record
  22.     msg: shortstring;
  23.     param: longint;
  24.     extra: Pointer;
  25.     Done: boolean; // always true when TPowMethod finished
  26.     wait: boolean; // false = queue callback - true = sync callback
  27.     callback: TPowCallBack; // called in main thread, after method finished
  28.   end;
  29.  
  30.   TPowMethod = procedure(ACmd: TThreadCmd) of object;
  31.  
  32.   { TPowCmdQueue }
  33.  
  34.   TPowCmdQueue = class(TQueue)
  35.   private
  36.     FLocker: TCriticalSection;
  37.     FNotBusy: TEvent;
  38.     FEvName: string;
  39.   protected
  40.     procedure ClearAll; virtual;
  41.     procedure PushItem(AItem: Pointer); override;
  42.     function PopItem: Pointer; override;
  43.     function PeekItem: Pointer; override;
  44.   public
  45.     constructor Create;
  46.     destructor Destroy; override;
  47.   end;
  48.  
  49.   { TPowerThread }
  50.  
  51.   TPowerThread = class(TThread)
  52.   private
  53.     type
  54.     PInternalCmd = ^TInternalCmd;
  55.  
  56.     TInternalCmd = record
  57.       method: TPowMethod;
  58.       rec: PThreadCmd;
  59.     end;
  60.   private
  61.   var
  62.     FCmdQ, FcbQ: TPowCmdQueue;
  63.     FSyncCmd: PThreadCmd;
  64.   protected
  65.     procedure DoSyncCallBack;
  66.     procedure DoAsyncCallBack;
  67.     // redefine DefHandler in derived classes as needed
  68.     // called when PostCmd(nil,aCmd)
  69.     procedure DefHandler(aCmd: TThreadCmd); virtual;
  70.   public
  71.     procedure Execute; override;
  72.     constructor Create(CreateSuspended: boolean;
  73.       const StackSize: SizeUInt = DefaultStackSize);
  74.     destructor Destroy; override;
  75.     // usr proc called from outside the thread
  76.     procedure PostCmd(aMethod: TPowMethod; var aCmd: TThreadCmd);
  77.     // add here your TPowMethod and TPowCallback
  78.     // or inherit class(TPowerThread) and add them
  79.     // to public section
  80.     // TPowMethods will be called inside the thread while
  81.     // TPowCallback will be queued or synced inside the main thread
  82.   end;
  83.  
  84. var
  85.   MainThreadID: TThreadID;
  86.  
  87. implementation
  88.  
  89. var
  90.   EvNameFmt: string;
  91.   EvRef: smallint;
  92.  
  93. { TPowCmdQueue }
  94.  
  95. procedure TPowCmdQueue.ClearAll; // in main thread: no need lock
  96. begin
  97.   while not (TQueue(Self).Peek = nil) do
  98.     Dispose(PThreadCmd(TQueue(Self).Pop));
  99. end;
  100.  
  101. procedure TPowCmdQueue.PushItem(AItem: Pointer);
  102. var
  103.   c: TEvent;
  104. begin
  105.   FLocker.Enter;
  106.   c := TEvent.Create(nil, True, True, FEvName);
  107.   if c.WaitFor(10) <> wrSignaled then
  108.   begin
  109.     c.Free;
  110.     FLocker.Leave;
  111.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  112.   end;
  113.   c.Free;
  114.   FNotBusy.ResetEvent;
  115.   inherited;
  116.   FNotBusy.SetEvent;
  117.   FLocker.Leave;
  118. end;
  119.  
  120. function TPowCmdQueue.PopItem: Pointer;
  121. var
  122.   c: TEvent;
  123. begin
  124.   FLocker.Enter;
  125.   c := TEvent.Create(nil, True, True, FEvName);
  126.   if c.WaitFor(10) <> wrSignaled then
  127.   begin
  128.     c.Free;
  129.     FLocker.Leave;
  130.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  131.   end;
  132.   c.Free;
  133.   FNotBusy.ResetEvent;
  134.   Result := inherited;
  135.   FNotBusy.SetEvent;
  136.   FLocker.Leave;
  137. end;
  138.  
  139. function TPowCmdQueue.PeekItem: Pointer;
  140. var
  141.   c: TEvent;
  142. begin
  143.   FLocker.Enter;
  144.   c := TEvent.Create(nil, True, True, FEvName);
  145.   if c.WaitFor(10) <> wrSignaled then
  146.   begin
  147.     c.Free;
  148.     FLocker.Leave;
  149.     raise ESyncObjectException.Create('Thread Cmdqueue error');
  150.   end;
  151.   c.Free;
  152.   FNotBusy.ResetEvent;
  153.   Result := inherited;
  154.   FNotBusy.SetEvent;
  155.   FLocker.Leave;
  156. end;
  157.  
  158. constructor TPowCmdQueue.Create;
  159. begin
  160.   FLocker := TCriticalSection.Create;
  161.   Inc(EvRef);
  162.   FEvName := Format(EvNameFmt, [EvRef]);
  163.   FNotBusy := TEvent.Create(nil, True, True, FEvName);
  164.   inherited;
  165. end;
  166.  
  167. destructor TPowCmdQueue.Destroy;
  168. begin
  169.   FNotBusy.Free;
  170.   FLocker.Free;
  171.   inherited;
  172. end;
  173.  
  174. { TPowerThread }
  175.  
  176. procedure TPowerThread.DoSyncCallBack;
  177. begin
  178.   FSyncCmd^.callback(FSyncCmd, Self);
  179. end;
  180.  
  181. procedure TPowerThread.DoAsyncCallBack;
  182. var
  183.   c: TThreadCmd;
  184. begin
  185.   if FcbQ.Count > 0 then
  186.   begin
  187.     c := PThreadCmd(FcbQ.Pop)^;
  188.     c.callback(@c, Self);
  189.   end;
  190. end;
  191.  
  192. procedure TPowerThread.DefHandler(aCmd: TThreadCmd);
  193. begin
  194.   // nothing to do
  195. end;
  196.  
  197. procedure TPowerThread.Execute;
  198. var
  199.   cmd: PInternalCmd;
  200. begin
  201.   while not Terminated do
  202.   begin
  203.     if FCmdQ.Count > 0 then
  204.     begin
  205.       cmd := FCmdQ.Pop;
  206.       if not Assigned(cmd^.method) then
  207.         cmd^.method := @DefHandler;
  208.       cmd^.method(cmd^.rec^);
  209.       cmd^.rec^.Done := True;
  210.       if Assigned(cmd^.rec^.callback) then
  211.       begin
  212.         if cmd^.rec^.wait then
  213.         begin
  214.           New(FSyncCmd);
  215.           FSyncCmd^ := cmd^.rec^;
  216.           Synchronize(@DoSyncCallBack);
  217.           Dispose(FSyncCmd);
  218.         end
  219.         else
  220.           FcbQ.Push(@cmd^.rec);
  221.       end;
  222.       while FcbQ.Count > 0 do
  223.         Queue(@DoAsyncCallBack);
  224.       Dispose(cmd);
  225.     end;
  226.     Sleep(1);
  227.   end;
  228. end;
  229.  
  230. constructor TPowerThread.Create(CreateSuspended: boolean; const StackSize: SizeUInt);
  231. begin
  232.   FCmdQ := TPowCmdQueue.Create;
  233.   FcbQ := TPowCmdQueue.Create;
  234.   inherited;
  235. end;
  236.  
  237. destructor TPowerThread.Destroy;
  238. begin
  239.   FCmdQ.ClearAll;
  240.   FCmdQ.Free;
  241.   FcbQ.Free;
  242.   inherited;
  243. end;
  244.  
  245. procedure TPowerThread.PostCmd(aMethod: TPowMethod; var aCmd: TThreadCmd);
  246. var
  247.   c: PInternalCmd;
  248. begin
  249.   new(c);
  250.   FillChar(c^, SizeOf(TInternalCmd), #0);
  251.   c^.method := aMethod;
  252.   c^.rec:=@aCmd;
  253.   FCmdQ.Push(c);
  254. end;
  255.  
  256. initialization
  257.   EvNameFmt := 'EvPowThread_%.5d';
  258.   EvRef := 0;
  259.   MainThreadID := TThread.CurrentThread.ThreadID;
  260. end.

I started testing as follow, with synapse

Code: Pascal  [Select][+][-]
  1. unit TestClient;
  2.  
  3. {$mode objfpc}{$H+}
  4.  
  5. interface
  6.  
  7. uses
  8.   Classes, SysUtils, blcksock, PowThread;
  9.  
  10. const
  11.   STK_SZ = $10000;
  12.  
  13. type
  14.  
  15.   { TClientThread }
  16.  
  17.   TClientThread = class(TPowerThread)
  18.   private
  19.     FSock: TTCPBlockSocket;
  20.   public
  21.     // this will be general Handler parsing aCmd inside the worker thread
  22.     procedure Handler(aCmd: TThreadCmd);
  23.   end;
  24.  
  25.   { TPlug }
  26.  
  27.   TPlug = class(TComponent)
  28.   private
  29.   var
  30.     FWorker: TClientThread;
  31.   public
  32.     constructor Create(AOwner: TComponent); override;
  33.     destructor Destroy; override;
  34.   end;
  35.  
  36. implementation
  37.  
  38. { TClientThread }
  39.  
  40. procedure TClientThread.Handler(aCmd: TThreadCmd);
  41. begin
  42.   // todo interpretation of aCmd
  43. end;
  44.  
  45. { TPlug }
  46.  
  47. constructor TPlug.Create(AOwner: TComponent);
  48. var
  49.   cmd: TThreadCmd;
  50. begin
  51.   inherited Create(AOwner);
  52.   FWorker := TClientThread.Create(False, STK_SZ);
  53.   //  
  54.   // testing
  55.   //
  56.   Sleep(10);
  57.   FillChar(cmd,SizeOf(TThreadCmd),#0);
  58.   cmd.msg:='testing';
  59. // POST JOB TO DO
  60.   FWorker.PostCmd(@FWorker.Handler,cmd);
  61. // WAITFOR job done
  62.   while not cmd.Done do Sleep(1);
  63. end;
  64.  
  65. destructor TPlug.Destroy;
  66. begin
  67.   FWorker.Free;
  68.   inherited Destroy;
  69. end;
  70.  
  71. end.

I used GDB with breakpoints inside the Worker thread and around the calling, all went good so far
« Last Edit: July 07, 2018, 04:53:21 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #5 on: July 07, 2018, 04:03:32 am »
it's a bit different from the 1st post, because I found out I needed another queue to manage desync callbacks. I will rework that
while synchronize pauses the thread, queue() does not. thats problem for data at callbacks. I need to prevent race conditions. I think my programming is going that way, but ok.. need deep checks
« Last Edit: July 07, 2018, 04:09:00 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #6 on: July 14, 2018, 03:08:57 am »
yet a better Working version

[todo, replace Sleep(1) in the thread's loop by a TEvent driven in 'Post' method

Code: Pascal  [Select][+][-]
  1. unit PowerThread;
  2.  
  3. { MERCVRHYO }
  4. { version 0.5 }
  5.  
  6. {$mode objfpc}{$H+}
  7.  
  8. interface
  9.  
  10. uses
  11.   Classes, SysUtils, contnrs, syncobjs;
  12.  
  13. type
  14.  
  15.   PThreadParams = ^TThreadParams;
  16.   TThreadParams = array of TVarRec;
  17.   TThreadMethod = procedure(Args: TThreadParams) of object;
  18.   TThreadCallBack = procedure(Data: Pointer) of object;
  19.  
  20.  
  21.   { TPowerThread }
  22.  
  23.   TPowerThread = class(TThread)
  24.   private
  25.     type
  26.     PCmdRec = ^TcmdRec;
  27.  
  28.     TCmdRec = record
  29.       method: TThreadMethod;
  30.       params: TThreadParams;
  31.       Done: PBoolean;
  32.       CallBack: TThreadCallBack;
  33.       Data: Pointer;
  34.     end;
  35.  
  36.     { TLockQueue }
  37.  
  38.     TLockQueue = class(TQueue)
  39.     private
  40.       FLock: TCriticalSection;
  41.       FReady: TEvent;
  42.       FEvName: string;
  43.     protected
  44.       procedure PushItem(AItem: Pointer); override;
  45.       function PopItem: Pointer; override;
  46.       function PeekItem: Pointer; override;
  47.     public
  48.       constructor Create;
  49.       destructor Destroy; override;
  50.     end;
  51.  
  52.   private
  53.   var
  54.     FJobQueue: TLockQueue;
  55.     FSyncCall: TThreadCallBack;
  56.     FData: Pointer;
  57.  
  58.     procedure DoSyncCallBack;
  59.   protected
  60.     procedure Execute; override;
  61.   public
  62.     constructor Create(CreateSuspended: boolean;
  63.       const StackSize: SizeUInt = DefaultStackSize);
  64.     destructor Destroy; override;
  65.     procedure Post(aMethod: TThreadMethod; Args: array of const;
  66.       var Done: boolean; aCallBack: TThreadCallBack = nil; aData: Pointer = nil);
  67.   end;
  68.  
  69.   { TThreadedComponent }
  70.  
  71.   TThreadedComponent = class(TComponent)
  72.   private
  73.     FThread: TPowerThread;
  74.     function GetSuspended: boolean; inline;
  75.     procedure SetSuspended(AValue: boolean); inline;
  76.   public
  77.     constructor Create(AOwner: TComponent; CreateSuspended: boolean;
  78.       const StackSize: SizeUInt = DefaultStackSize); virtual;
  79.     destructor Destroy; override;
  80.     procedure Post(aMethod: TThreadMethod; Args: array of const;
  81.       var Done: boolean; aCallBack: TThreadCallBack = nil; aData: Pointer = nil);
  82.     property Suspended: boolean read GetSuspended write SetSuspended;
  83.   end;
  84.  
  85. implementation
  86.  
  87. var
  88.   EvQueueFmt: PChar = 'EvPTQ_%.5d';
  89.   EvRef: smallint;
  90.  
  91. { TPowerThread.TCmdQueue }
  92.  
  93. procedure TPowerThread.TLockQueue.PushItem(AItem: Pointer);
  94. var
  95.   chkev: TEvent;
  96. begin
  97.   FLock.Enter;
  98.   chkev := TEvent.Create(nil, True, True, FEvName);
  99.   if chkev.WaitFor(10) <> wrSignaled then
  100.   begin
  101.     chkev.Free;
  102.     FLock.Leave;
  103.     raise ESyncObjectException.Create('PowerThread Queue error');
  104.   end;
  105.   FReady.ResetEvent;
  106.   chkev.Free;
  107.   inherited PushItem(AItem);
  108.   FReady.SetEvent;
  109.   FLock.Leave;
  110. end;
  111.  
  112. function TPowerThread.TLockQueue.PopItem: Pointer;
  113. var
  114.   chkev: TEvent;
  115. begin
  116.   FLock.Enter;
  117.   chkev := TEvent.Create(nil, True, True, FEvName);
  118.   if chkev.WaitFor(10) <> wrSignaled then
  119.   begin
  120.     chkev.Free;
  121.     FLock.Leave;
  122.     raise ESyncObjectException.Create('PowerThread Queue error');
  123.   end;
  124.   FReady.ResetEvent;
  125.   chkev.Free;
  126.   Result := inherited PopItem;
  127.   FReady.SetEvent;
  128.   FLock.Leave;
  129. end;
  130.  
  131. function TPowerThread.TLockQueue.PeekItem: Pointer;
  132. var
  133.   chkev: TEvent;
  134. begin
  135.   FLock.Enter;
  136.   chkev := TEvent.Create(nil, True, True, FEvName);
  137.   if chkev.WaitFor(10) <> wrSignaled then
  138.   begin
  139.     chkev.Free;
  140.     FLock.Leave;
  141.     raise ESyncObjectException.Create('PowerThread Queue error');
  142.   end;
  143.   FReady.ResetEvent;
  144.   chkev.Free;
  145.   Result := inherited PeekItem;
  146.   FReady.SetEvent;
  147.   FLock.Leave;
  148. end;
  149.  
  150. constructor TPowerThread.TLockQueue.Create;
  151. begin
  152.   FLock := TCriticalSection.Create;
  153.   Inc(EvRef);
  154.   FEvName := Format(EvQueueFmt, [EvRef]);
  155.   FReady := TEvent.Create(nil, True, True, FEvName);
  156.   inherited Create;
  157. end;
  158.  
  159. destructor TPowerThread.TLockQueue.Destroy;
  160. begin
  161.   FReady.Free;
  162.   FLock.Free;
  163.   inherited Destroy;
  164. end;
  165.  
  166. { TPowerThread }
  167.  
  168. procedure TPowerThread.DoSyncCallBack;
  169. begin
  170.   FSyncCall(FData);
  171. end;
  172.  
  173. procedure TPowerThread.Execute;
  174. var
  175.   c: PCmdRec;
  176. begin
  177.   while not Terminated do
  178.   begin
  179.     c := FJobQueue.Pop;
  180.     if Assigned(c) then
  181.     begin
  182.       c^.method(c^.params);
  183.       c^.Done^ := True;
  184.       if Assigned(c^.CallBack) then
  185.       begin
  186.         FSyncCall := c^.CallBack;
  187.         FData := c^.Data;
  188.         Synchronize(@DoSyncCallBack);
  189.       end;
  190.       SetLength(c^.params,0);
  191.       Dispose(c);
  192.     end;
  193.     Sleep(1); // ThreadSwitch doesn't work on all platforms
  194.   end;
  195. end;
  196.  
  197. constructor TPowerThread.Create(CreateSuspended: boolean; const StackSize: SizeUInt);
  198. begin
  199.   FJobQueue := TLockQueue.Create;
  200.   inherited Create(CreateSuspended, StackSize);
  201. end;
  202.  
  203. destructor TPowerThread.Destroy;
  204. var
  205.   c: PCmdRec;
  206. begin
  207.   while FJobQueue.Count > 0 do
  208.   begin
  209.     c := FJobQueue.Pop;
  210.     SetLength(c^.params, 0); // ;-)
  211.     Dispose(c);
  212.   end;
  213.   inherited Destroy;
  214. end;
  215.  
  216. procedure TPowerThread.Post(aMethod: TThreadMethod; Args: array of const;
  217.   var Done: boolean; aCallBack: TThreadCallBack; aData: Pointer);
  218. var
  219.   c: PCmdRec;
  220.   i: smallint;
  221. begin
  222.   New(c);
  223.   with c^ do
  224.   begin
  225.     method := aMethod;
  226.     SetLength(params, 1 + High(Args));
  227.     for i := 0 to High(Args) do
  228.       params[i] := Args[i];
  229.     CallBack := aCallBack;
  230.     Data := aData;
  231.   end;
  232.   c^.Done := @Done;
  233.   c^.Done^ := False;
  234.   FJobQueue.Push(c);
  235. end;
  236.  
  237. { TThreadedComponent }
  238.  
  239. function TThreadedComponent.GetSuspended: boolean;
  240. begin
  241.   Result := FThread.Suspended;
  242. end;
  243.  
  244. procedure TThreadedComponent.SetSuspended(AValue: boolean);
  245. begin
  246.   FThread.Suspended := AValue;
  247. end;
  248.  
  249. constructor TThreadedComponent.Create(AOwner: TComponent;
  250.   CreateSuspended: boolean; const StackSize: SizeUInt);
  251. begin
  252.   inherited Create(AOwner);
  253.   FThread := TPowerThread.Create(True, StackSize);
  254.   FThread.FreeOnTerminate := True;
  255.   FThread.Suspended := CreateSuspended;
  256. end;
  257.  
  258. destructor TThreadedComponent.Destroy;
  259. begin
  260.   FThread.Terminate;
  261.   inherited Destroy;
  262. end;
  263.  
  264. procedure TThreadedComponent.Post(aMethod: TThreadMethod; Args: array of const;
  265.   var Done: boolean; aCallBack: TThreadCallBack; aData: Pointer);
  266. begin
  267.   FThread.Post(aMethod, Args, Done, aCallBack, aData);
  268. end;
  269.  
  270. end.

usage change

- no more fixed record but variable 'arrays of consts'
- removed desync callbacks, only sychronized at the moment (I read some delphi articles around the difficulty/hell to manage desynchronized callbacks while results may change in returned values if Queued)
- added a TThreadedComponent template to be derived from ;-)
« Last Edit: July 14, 2018, 05:21:13 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #7 on: July 14, 2018, 03:37:26 am »
line 209.    SetLength(c^.params, 0); // ;-) thanks Mr. sam708 : :P
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

marcov

  • Administrator
  • Hero Member
  • *
  • Posts: 11383
  • FPC developer.
Re: TPowerThread
« Reply #8 on: July 15, 2018, 02:10:09 pm »
Why do you create/release a tevent for every push?

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #9 on: July 16, 2018, 01:14:51 am »
Why do you create/release a tevent for every push?
easy :
when you 1st create a TEvent, you are the owner (depending platform and murky PSecurityAttributes)
when you want to read the state it is a good habit to create a checker, and not modify what the owner have set or reset (it is the 'safe' way). I know I can use the original FReady in that particular case directly. But as it is a case study made to be extended/improved... it is far from optimized (also I do not need PeekItem yet but I wrote it)
« Last Edit: July 16, 2018, 09:11:36 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #10 on: July 16, 2018, 01:26:50 am »
so... I do NOT create a TEvent every push but an acccessor to FReady  :D in a safe way, at that point of non optimized template

nothing to do with that post, I say hello here to my son Alexandre P.-C.
« Last Edit: July 16, 2018, 01:33:38 am by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #11 on: July 16, 2018, 05:31:24 pm »
fresh compiled version 0.6.
- removed unnecessary TEvent to please @marcov
- commented PeekItem for now
- replaced 'Sleep(1)' with an Event triggered in Post Method and checked in PowerThread's loop
Code: Pascal  [Select][+][-]
  1. unit PowerThread;
  2.  
  3. { MERCVRHYO }
  4. { version 0.6 }
  5.  
  6. {$mode objfpc}{$H+}
  7.  
  8. interface
  9.  
  10. uses
  11.   Classes, SysUtils, contnrs, syncobjs;
  12.  
  13. type
  14.  
  15.   PThreadParams = ^TThreadParams;
  16.   TThreadParams = array of TVarRec;
  17.   TThreadMethod = procedure(Args: TThreadParams) of object;
  18.   TThreadCallBack = procedure(Data: Pointer) of object;
  19.  
  20.  
  21.   { TPowerThread }
  22.  
  23.   TPowerThread = class(TThread)
  24.   private
  25.     type
  26.     PCmdRec = ^TcmdRec;
  27.  
  28.     TCmdRec = record
  29.       method: TThreadMethod;
  30.       params: TThreadParams;
  31.       Done: PBoolean;
  32.       CallBack: TThreadCallBack;
  33.       Data: Pointer;
  34.     end;
  35.  
  36.     { TLockQueue }
  37.  
  38.     TLockQueue = class(TQueue)
  39.     private
  40.       FLock: TCriticalSection;
  41.       FReady: TEvent;
  42.     protected
  43.       procedure PushItem(AItem: Pointer); override;
  44.       function PopItem: Pointer; override;
  45.       // function PeekItem: Pointer; override; --- no need yet
  46.     public
  47.       constructor Create;
  48.       destructor Destroy; override;
  49.     end;
  50.  
  51.   private
  52.   var
  53.     FWake: TEvent;
  54.     FJobQueue: TLockQueue;
  55.     FSyncCall: TThreadCallBack;
  56.     FData: Pointer;
  57.  
  58.     procedure DoSyncCallBack;
  59.   protected
  60.     procedure Execute; override;
  61.   public
  62.     constructor Create(CreateSuspended: boolean;
  63.       const StackSize: SizeUInt = DefaultStackSize);
  64.     destructor Destroy; override;
  65.     procedure Post(aMethod: TThreadMethod; Args: array of const;
  66.       var Done: boolean; aCallBack: TThreadCallBack = nil; aData: Pointer = nil);
  67.   end;
  68.  
  69.   { TThreadedComponent }
  70.  
  71.   TThreadedComponent = class(TComponent)
  72.   private
  73.     FThread: TPowerThread;
  74.     function GetSuspended: boolean; inline;
  75.     procedure SetSuspended(AValue: boolean); inline;
  76.   public
  77.     constructor Create(AOwner: TComponent; CreateSuspended: boolean;
  78.       const StackSize: SizeUInt = DefaultStackSize); virtual; overload;
  79.     destructor Destroy; override;
  80.     procedure Post(aMethod: TThreadMethod; Args: array of const;
  81.       var Done: boolean; aCallBack: TThreadCallBack = nil; aData: Pointer = nil);
  82.     property Suspended: boolean read GetSuspended write SetSuspended;
  83.   end;
  84.  
  85. implementation
  86.  
  87. var
  88.   EvQueueFmt: PChar = 'EvPTQ_%.5d';
  89.   EvWakeFmt: PChar = 'EvPTW_%.5d';
  90.   EvQRef, EvWRef: smallint;
  91.  
  92. { TPowerThread.TCmdQueue }
  93.  
  94. procedure TPowerThread.TLockQueue.PushItem(AItem: Pointer);
  95. begin
  96.   FLock.Enter;
  97.   if FReady.WaitFor(10) <> wrSignaled then
  98.   begin
  99.     FLock.Leave;
  100.     raise ESyncObjectException.Create('PowerThread Queue error');
  101.   end;
  102.   FReady.ResetEvent;
  103.   inherited PushItem(AItem);
  104.   FReady.SetEvent;
  105.   FLock.Leave;
  106. end;
  107.  
  108. function TPowerThread.TLockQueue.PopItem: Pointer;
  109. begin
  110.   FLock.Enter;
  111.   if FReady.WaitFor(10) <> wrSignaled then
  112.   begin
  113.     FLock.Leave;
  114.     raise ESyncObjectException.Create('PowerThread Queue error');
  115.   end;
  116.   FReady.ResetEvent;
  117.   Result := inherited PopItem;
  118.   FReady.SetEvent;
  119.   FLock.Leave;
  120. end;
  121.  
  122. { *=* no need yet
  123. function TPowerThread.TLockQueue.PeekItem: Pointer;
  124. begin
  125.   FLock.Enter;
  126.   if FReady.WaitFor(10) <> wrSignaled then
  127.   begin
  128.     FLock.Leave;
  129.     raise ESyncObjectException.Create('PowerThread Queue error');
  130.   end;
  131.   FReady.ResetEvent;
  132.   Result := inherited PeekItem;
  133.   FReady.SetEvent;
  134.   FLock.Leave;
  135. end;
  136. *=* }
  137.  
  138. constructor TPowerThread.TLockQueue.Create;
  139. begin
  140.   FLock := TCriticalSection.Create;
  141.   Inc(EvQRef);
  142.   FReady := TEvent.Create(nil, True, True, Format(EvQueueFmt, [EvQRef]));
  143.   inherited Create;
  144. end;
  145.  
  146. destructor TPowerThread.TLockQueue.Destroy;
  147. begin
  148.   FReady.Free;
  149.   FLock.Free;
  150.   inherited Destroy;
  151. end;
  152.  
  153. { TPowerThread }
  154.  
  155. procedure TPowerThread.DoSyncCallBack;
  156. begin
  157.   FSyncCall(FData);
  158. end;
  159.  
  160. procedure TPowerThread.Execute;
  161. var
  162.   c: PCmdRec;
  163.   w: TWaitResult;
  164. begin
  165.   while not Terminated do
  166.   begin
  167.     w := FWake.WaitFor(100);
  168.     if w = wrTimeout then
  169.       Continue;
  170.     if w = wrSignaled then
  171.     begin
  172.       c := FJobQueue.Pop;
  173.       if Assigned(c) then
  174.       begin
  175.         c^.method(c^.params);
  176.         c^.Done^ := True;
  177.         if Assigned(c^.CallBack) then
  178.         begin
  179.           FSyncCall := c^.CallBack;
  180.           FData := c^.Data;
  181.           Synchronize(@DoSyncCallBack);
  182.         end;
  183.         SetLength(c^.params, 0);
  184.         Dispose(c);
  185.       end
  186.       else
  187.         FWake.ResetEvent;
  188.     end
  189.     else
  190.       raise ESyncObjectException.Create('TPowerThread WakeUp error');
  191.   end;
  192. end;
  193.  
  194. constructor TPowerThread.Create(CreateSuspended: boolean; const StackSize: SizeUInt);
  195. begin
  196.   FJobQueue := TLockQueue.Create;
  197.   Inc(EvWRef);
  198.   FWake := TEvent.Create(nil, True, False, Format(EvWakeFmt, [EvWRef]));
  199.   inherited Create(CreateSuspended, StackSize);
  200. end;
  201.  
  202. destructor TPowerThread.Destroy;
  203. var
  204.   c: PCmdRec;
  205. begin
  206.   while FJobQueue.Count > 0 do
  207.   begin
  208.     c := FJobQueue.Pop;
  209.     SetLength(c^.params, 0); // ;-)
  210.     Dispose(c);
  211.   end;
  212.   FWake.Free;
  213.   inherited Destroy;
  214. end;
  215.  
  216. procedure TPowerThread.Post(aMethod: TThreadMethod; Args: array of const;
  217.   var Done: boolean; aCallBack: TThreadCallBack; aData: Pointer);
  218. var
  219.   c: PCmdRec;
  220.   i: smallint;
  221. begin
  222.   New(c);
  223.   with c^ do
  224.   begin
  225.     method := aMethod;
  226.     SetLength(params, 1 + High(Args));
  227.     for i := 0 to High(Args) do
  228.       params[i] := Args[i];
  229.     CallBack := aCallBack;
  230.     Data := aData;
  231.   end;
  232.   c^.Done := @Done;
  233.   c^.Done^ := False;
  234.   FJobQueue.Push(c);
  235.   FWake.SetEvent;
  236. end;
  237.  
  238. { TThreadedComponent }
  239.  
  240. function TThreadedComponent.GetSuspended: boolean;
  241. begin
  242.   Result := FThread.Suspended;
  243. end;
  244.  
  245. procedure TThreadedComponent.SetSuspended(AValue: boolean);
  246. begin
  247.   FThread.Suspended := AValue;
  248. end;
  249.  
  250. constructor TThreadedComponent.Create(AOwner: TComponent;
  251.   CreateSuspended: boolean; const StackSize: SizeUInt);
  252. begin
  253.   inherited Create(AOwner);
  254.   FThread := TPowerThread.Create(True, StackSize);
  255.   FThread.FreeOnTerminate := True;
  256.   FThread.Suspended := CreateSuspended;
  257. end;
  258.  
  259. destructor TThreadedComponent.Destroy;
  260. begin
  261.   FThread.Terminate;
  262.   if FThread.Suspended then
  263.     FThread.Suspended := False;
  264.   FThread.WaitFor;
  265.   inherited Destroy;
  266. end;
  267.  
  268. procedure TThreadedComponent.Post(aMethod: TThreadMethod; Args: array of const;
  269.   var Done: boolean; aCallBack: TThreadCallBack; aData: Pointer);
  270. begin
  271.   FThread.Post(aMethod, Args, Done, aCallBack, aData);
  272. end;
  273.  
  274. end.
  275.  

usage :

1) create a procedure in a class that way:

procedure MyProc(Args: TThreadParams);
your procedures are responsible to parse parameters as 'array of const

2) create a TPowerThread, a boolean 'Done' VARiable

3) to run 'MyProc' in the thread:
MyThread.Post(@MyProc,['hello',Now],Done);

as you see, you pass parameters just like with 'Format' method.
more advanced programmers can use a synchronized callback using a pointer to wanted results
while the callback running in the mainthread, the 'powerthread' is suspended
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #12 on: July 16, 2018, 05:44:15 pm »
not tested, I am going to use my TPowerThread with synapse TTCPBlockSocket

procedure Connect(Args: TThreadParams,Done)
with Args[0] as string url or ip address
and Args[1] as integer port number
something like

procedure TClient.Connect(Args: TThreadParams);
var
  addr, port: string;
begin
  addr := FSock.ResolveName(string(Args[0].VAnsiString));
  port := IntToStr(Args[1].VInteger);
  FSock.Connect(addr, port);
« Last Edit: July 16, 2018, 05:52:09 pm by mercurhyo »
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

mercurhyo

  • Full Member
  • ***
  • Posts: 242
Re: TPowerThread
« Reply #13 on: July 16, 2018, 05:50:31 pm »
This should be a cool way to 'communicate'  ::) upon communications
DEO MERCHVRIO - Linux, Win10pro - Ryzen9XT 24threads + Geforce Rtx 3080SUPRIM
god of financial gain, commerce, eloquence (and thus poetry), messages, communication (including divination), travelers, boundaries, luck, trickery and thieves; he also serves as the guide of souls to the underworld

marcov

  • Administrator
  • Hero Member
  • *
  • Posts: 11383
  • FPC developer.
Re: TPowerThread
« Reply #14 on: July 16, 2018, 06:36:56 pm »
Maybe it is the choice for named events. I have no experience/need for those.

 

TinyPortal © 2005-2018