Recent

Author Topic: [Synapse]Multythreaded TCP Server example.  (Read 49505 times)

CynicRus

  • New Member
  • *
  • Posts: 49
[Synapse]Multythreaded TCP Server example.
« on: May 29, 2013, 11:18:31 am »
Hey guys. I have a lot of time looking for a suitable example for this, but could not find it. So I had to write its own implementation. One thread listens to a socket, and if it recieved packet he make processing thread. Tested by telnet on Win and *nix. I'll just leave this here in case someone else is looking for it.

Code: [Select]
program Srv;

{$mode objfpc}

uses
  {$IFDEF UNIX}
 cthreads,
 {$ENDIF}
  Classes, Sysutils,  syncobjs, blcksock,synsock;

type

 TThreadManager = class;

 { TManagedThread }

 TManagedThread = class(TThread)
 public
   constructor Create(waiting : Boolean);
   function    isDone()     : Boolean;
   function    isErroneus() : Boolean;

 protected
   done_,
   erroneous_ : Boolean;
end;

  { TTCPThread }

TTCPThread = class(TManagedThread)
    private
     fSock: TTCPBlockSocket;
     fIP: string;
     FPort: integer;
     FNumber: integer;
     procedure SetSocket(aSock: TSocket);
    protected
     procedure Execute; override;
    public
     constructor Create();
     destructor Destroy; override;
     procedure ProcessingData(procSock: TSocket;Data: string);
     Property Number: integer read Fnumber Write FNumber;
end;

 { TListenerThread }

 TListenerThread = class(TThread)
  private
    ListenerSocket: TTCPBlockSocket;
    FThreadManager: TThreadManager;
  protected
    procedure Execute; override;
  public
    constructor Create;
    destructor Destroy; override;
end;

 { TThreadManager }

 TThreadManager = class(TObject)
private
FItemList: TThreadList;
FAbort: Boolean;
FThreadList: TList;
FMaxThreadCount: Integer;
procedure SetMaxThreadCount(Count: Integer);
public
constructor Create(MaxThreads: integer);
destructor Destroy; override;
procedure AddItem(Item: TTCPThread);
function GetSuspendThread(aSock: TSocket): TTCPThread;
                procedure clearFinishedThreads;
function GetActiveThreadCount: Integer;
property MaxThreadCount: Integer read FMaxThreadCount write SetMaxThreadCount;
end;

{ TThreadManager }

procedure TThreadManager.SetMaxThreadCount(Count: Integer);
begin
  FMaxThreadCount := Count;
end;

constructor TThreadManager.Create(MaxThreads: integer);
begin
  inherited Create;
FItemList := TThreadList.Create;
FThreadList := TList.Create;
        FMaxThreadCount := MaxThreads;
end;

destructor TThreadManager.Destroy;
var
i: Integer;
begin
    FThreadList.Pack;
for i := FThreadList.Count - 1 downto 0 do begin
    TTCPThread(FThreadList[i]).Free;
end;
    FThreadList.Capacity := FThreadList.Count;
FThreadList.Free;
    FItemList.Clear;
FItemList.Free;
inherited;
end;

procedure TThreadManager.AddItem(Item: TTCPThread);
begin
  FItemList.Add(Pointer(Item));
end;

function TThreadManager.GetSuspendThread(aSock: TSocket): TTCPThread;
var
i: Integer;
TCPThread: TTCPThread;
begin
Result := nil;
if GetActiveThreadCount >= FMaxThreadCount then Exit;
for i := 0 to FThreadList.Count - 1 do begin
if TTCPThread(FThreadList[i]).Suspended then
                 begin
TCPThread := TTCPThread(FThreadList[i]);
                        TCPThread.SetSocket(aSock);
                        TCPThread.Resume;
Break;
end;
end;
if (Result = nil) and (FMaxThreadCount > FThreadList.Count) then begin
TCPThread := TTCPThread.Create;
TCPThread.FreeOnTerminate := False;
                TCPThread.SetSocket(aSock);
TCPThread.Number := FThreadList.Count;
FThreadList.Add(TCPThread);
Result := TCPThread;
end;
end;

procedure TThreadManager.clearFinishedThreads;
var
i: Integer;
begin
for i := 0 to FThreadList.Count - 1 do
         begin
           if (TTCPThread(FThreadList[i]) <> nil) and TTCPThread(FThreadList[i]).isDone() then
               begin
                 TTCPThread(FThreadList[i]).WaitFor;
                 TTCPThread(FThreadList[i]).Free;
         end;

end;
end;

function TThreadManager.GetActiveThreadCount: Integer;
var
i: Integer;
begin
Result := 0;
for i := 0 to FThreadList.Count - 1 do begin
if not TTCPThread(FThreadList[i]).Suspended then
Inc(Result);
end;
end;

{ TManagedThread }

constructor TManagedThread.Create(waiting : Boolean);
begin
 inherited Create(waiting);
 done_ := false;
 erroneous_ := false;
end;

function  TManagedThread.isDone()     : Boolean;
begin
 Result := done_;
end;


function  TManagedThread.isErroneus() : Boolean;
begin
 Result := erroneous_;
end;

{ TListenerThread }

procedure TListenerThread.Execute;
var
ClientSock: TSocket;
ClientThread: TTCPThread;
begin
   with ListenerSocket do
     begin
       CreateSocket;
        if LastError = 0 then
           WriteLn('Socket successfully initialized')
          else
           WriteLn('An error occurred while initializing the socket: '+GetErrorDescEx);
   Family := SF_IP4;
   setLinger(true,10000);
   bind('0.0.0.0', '5050');
    if LastError = 0 then
      WriteLn('Bind on 5050')
     else
      WriteLn('Bind error: '+GetErrorDescEx);
      listen;
      repeat
        if CanRead(100) then
         begin
           ClientSock := Accept;
            if LastError = 0
             then
              begin
              //TTCPThread.Create()
             ClientThread:=FThreadManager.GetSuspendThread(ClientSock);
              WriteLn('We have '+ IntToStr(FThreadManager.GetActiveThreadCount)+#32+'client threads!');
              end
             else
              WriteLn('TCP thread creation error: '+GetErrorDescEx);
         end;
        FThreadManager.clearFinishedThreads;
      sleep(10);
     until false;
    end;
end;

constructor TListenerThread.Create;
begin
FreeOnTerminate := True;
ListenerSocket := TTCPBlockSocket.Create;
FThreadManager:=TThreadManager.Create(20000);
if ListenerSocket.LastError = 0
  then
     WriteLn('Listener has been created')
  else
      WriteLn('Listener creation error: '+ListenerSocket.GetErrorDescEx);
inherited Create(False);
end;

destructor TListenerThread.Destroy;
begin
 ListenerSocket.Free;
   if
     ListenerSocket.LastError = 0
       then
           WriteLn('Listener has been deleted')
          else
            WriteLn('Listener deleting error: '+ListenerSocket.GetErrorDescEx);
  inherited;
end;

{ TTCPThread }

procedure TTCPThread.SetSocket(aSock: TSocket);
begin
   fSock.Socket := aSock;
   fSock.GetSins;
end;

procedure TTCPThread.Execute;
var
  s: ansistring;
begin
  fIp:=fSock.GetRemoteSinIP;
  fPort:=fSock.GetRemoteSinPort;
  WriteLn(format('Accepted connection from %s:%d',[fIp,fPort]));
  while not isDone  do
   begin
    if fSock.WaitingData > 0 then
     begin
      s:=fSock.RecvPacket(2000);
      if fSock.LastError <> 0 then
       WriteLn(fSock.GetErrorDescEx);
       ProcessingData(fSock.Socket,S);
      end;
    sleep(10);
   end;
end;

constructor TTCPThread.Create();
begin
 FreeOnTerminate := True;
 fSock := TTCPBlockSocket.Create;
 inherited Create(false);
end;

destructor TTCPThread.Destroy;
begin
  WriteLn(format('Disconnect from %s:%d',[fIp,fPort]));
  fSock.Free;
  inherited;
end;

procedure TTCPThread.ProcessingData(procSock: TSocket; Data: string);
begin
  if data <> '' then
   WriteLn(data+#32+'we get it from '+IntToStr(number)+' thread');
end;
 var
   Server: TListenerThread;
begin
   Server:=TListenerThread.Create;
   ReadLn;
end.

User137

  • Hero Member
  • *****
  • Posts: 1791
    • Nxpascal home
Re: [Synapse]Multythreaded TCP Server example.
« Reply #1 on: May 29, 2013, 12:23:46 pm »
Thanks for posting that. I have previously made 1 similar for my game engine nxPascal
https://code.google.com/p/nxpascal/source/browse/trunk/src/nxNetwork.pas
It gives you TTCPServer, TUDPServer and TClient, and the unit compiles on FPC and Delphi. I have also tested them all on both, with small demo that comes with the engine. It supports xor-key-masking of all traffic, and first-time quick authentication when joining network. Another test i made was a small game i tested with my US friend to here on EU. There was occasional "warping", in the player movement, and i'm not completely sure why. Is it a bug in the unit or i should handle that some other way... I should make a file-transfer demo to be more confident it works, because that's 1 ultimate test for big data stream. If not, i might look for tips in your code  :)

jwdietrich

  • Hero Member
  • *****
  • Posts: 1232
    • formatio reticularis
Re: [Synapse]Multythreaded TCP Server example.
« Reply #2 on: May 29, 2013, 01:24:59 pm »
Great! Your code is a very valuable resource.

Would you mind to post it to the Lazarus and Free Pascal Wiki, e.g. at http://wiki.lazarus.freepascal.org/Synapse?
function GetRandomNumber: integer; // xkcd.com
begin
  GetRandomNumber := 4; // chosen by fair dice roll. Guaranteed to be random.
end;

http://www.formatio-reticularis.de

Lazarus 2.2.6 | FPC 3.2.2 | PPC, Intel, ARM | macOS, Windows, Linux

User137

  • Hero Member
  • *****
  • Posts: 1791
    • Nxpascal home
Re: [Synapse]Multythreaded TCP Server example.
« Reply #3 on: May 29, 2013, 04:23:39 pm »
Also 1 requirement for the socket class is ability to send and receive single or broadcast packets of strings, or binary data, mainly binary (fixed size records).

Code: [Select]
WriteLn(data+#32+'we get it from '+IntToStr(number)+' thread');Isn't #32 just space-character?  :P  It could be added like ' we get...'.

snorkel

  • Hero Member
  • *****
  • Posts: 817
Re: [Synapse]Multythreaded TCP Server example.
« Reply #4 on: May 29, 2013, 04:56:58 pm »
The echo server example included with Synapse is a excellent multi threaded server example.
I have used that as a base to build all sorts of servers including chat servers that broadcast binary data to all connected
clients.
***Snorkel***
If I forget, I always use the latest stable 32bit version of Lazarus and FPC. At the time of this signature that is Laz 3.0RC2 and FPC 3.2.2
OS: Windows 10 64 bit

CynicRus

  • New Member
  • *
  • Posts: 49
Re: [Synapse]Multythreaded TCP Server example.
« Reply #5 on: May 30, 2013, 01:05:18 pm »
Great! Your code is a very valuable resource.

Would you mind to post it to the Lazarus and Free Pascal Wiki, e.g. at http://wiki.lazarus.freepascal.org/Synapse?

Thank you for your kind words. I would have done it, but I'm afraid my knowledge about Wiki is not enough for that.:(

merdjpatts

  • Newbie
  • Posts: 1
Re: [Synapse]Multythreaded TCP Server example.
« Reply #6 on: March 22, 2014, 02:28:07 pm »
Hello User137,

i downloaded and try your binary sample regarding the network thing and it is good. would you mind to help me please on how to send/receive file?

your kind help is highly appreciated.

God bless and thanks a lot
merdj

Richard_1024

  • Newbie
  • Posts: 3
Re: [Synapse]Multythreaded TCP Server example.
« Reply #7 on: July 07, 2014, 11:51:18 pm »
Hello CynicRus,

I'm trying to learn from your code. It is really good. It works great. But I need some help, so maybe you can give me some advise/sample?. I have several clients connecting, after connecting they send data (just an asci string). After sending data clients dis-connect and after a few minutes they connect again and send data again. How can I detect that a client is disconnected and remove Thread/Socket from server. Now everytime a client connect and disconnects an extra thread is generated. So my question is: how do I cleanup threads that are no longer in use?

Thank you for your help

Richard..




Hey guys. I have a lot of time looking for a suitable example for this, but could not find it. So I had to write its own implementation. One thread listens to a socket, and if it recieved packet he make processing thread. Tested by telnet on Win and *nix. I'll just leave this here in case someone else is looking for it.

Code: [Select]
program Srv;

{$mode objfpc}

uses
  {$IFDEF UNIX}
 cthreads,
 {$ENDIF}
  Classes, Sysutils,  syncobjs, blcksock,synsock;

type

 TThreadManager = class;

 { TManagedThread }

 TManagedThread = class(TThread)
 public
   constructor Create(waiting : Boolean);
   function    isDone()     : Boolean;
   function    isErroneus() : Boolean;

 protected
   done_,
   erroneous_ : Boolean;
end;

  { TTCPThread }

TTCPThread = class(TManagedThread)
    private
     fSock: TTCPBlockSocket;
     fIP: string;
     FPort: integer;
     FNumber: integer;
     procedure SetSocket(aSock: TSocket);
    protected
     procedure Execute; override;
    public
     constructor Create();
     destructor Destroy; override;
     procedure ProcessingData(procSock: TSocket;Data: string);
     Property Number: integer read Fnumber Write FNumber;
end;

 { TListenerThread }

 TListenerThread = class(TThread)
  private
    ListenerSocket: TTCPBlockSocket;
    FThreadManager: TThreadManager;
  protected
    procedure Execute; override;
  public
    constructor Create;
    destructor Destroy; override;
end;

 { TThreadManager }

 TThreadManager = class(TObject)
private
FItemList: TThreadList;
FAbort: Boolean;
FThreadList: TList;
FMaxThreadCount: Integer;
procedure SetMaxThreadCount(Count: Integer);
public
constructor Create(MaxThreads: integer);
destructor Destroy; override;
procedure AddItem(Item: TTCPThread);
function GetSuspendThread(aSock: TSocket): TTCPThread;
                procedure clearFinishedThreads;
function GetActiveThreadCount: Integer;
property MaxThreadCount: Integer read FMaxThreadCount write SetMaxThreadCount;
end;

{ TThreadManager }

procedure TThreadManager.SetMaxThreadCount(Count: Integer);
begin
  FMaxThreadCount := Count;
end;

constructor TThreadManager.Create(MaxThreads: integer);
begin
  inherited Create;
FItemList := TThreadList.Create;
FThreadList := TList.Create;
        FMaxThreadCount := MaxThreads;
end;

destructor TThreadManager.Destroy;
var
i: Integer;
begin
    FThreadList.Pack;
for i := FThreadList.Count - 1 downto 0 do begin
    TTCPThread(FThreadList[i]).Free;
end;
    FThreadList.Capacity := FThreadList.Count;
FThreadList.Free;
    FItemList.Clear;
FItemList.Free;
inherited;
end;

procedure TThreadManager.AddItem(Item: TTCPThread);
begin
  FItemList.Add(Pointer(Item));
end;

function TThreadManager.GetSuspendThread(aSock: TSocket): TTCPThread;
var
i: Integer;
TCPThread: TTCPThread;
begin
Result := nil;
if GetActiveThreadCount >= FMaxThreadCount then Exit;
for i := 0 to FThreadList.Count - 1 do begin
if TTCPThread(FThreadList[i]).Suspended then
                 begin
TCPThread := TTCPThread(FThreadList[i]);
                        TCPThread.SetSocket(aSock);
                        TCPThread.Resume;
Break;
end;
end;
if (Result = nil) and (FMaxThreadCount > FThreadList.Count) then begin
TCPThread := TTCPThread.Create;
TCPThread.FreeOnTerminate := False;
                TCPThread.SetSocket(aSock);
TCPThread.Number := FThreadList.Count;
FThreadList.Add(TCPThread);
Result := TCPThread;
end;
end;

procedure TThreadManager.clearFinishedThreads;
var
i: Integer;
begin
for i := 0 to FThreadList.Count - 1 do
         begin
           if (TTCPThread(FThreadList[i]) <> nil) and TTCPThread(FThreadList[i]).isDone() then
               begin
                 TTCPThread(FThreadList[i]).WaitFor;
                 TTCPThread(FThreadList[i]).Free;
         end;

end;
end;

function TThreadManager.GetActiveThreadCount: Integer;
var
i: Integer;
begin
Result := 0;
for i := 0 to FThreadList.Count - 1 do begin
if not TTCPThread(FThreadList[i]).Suspended then
Inc(Result);
end;
end;

{ TManagedThread }

constructor TManagedThread.Create(waiting : Boolean);
begin
 inherited Create(waiting);
 done_ := false;
 erroneous_ := false;
end;

function  TManagedThread.isDone()     : Boolean;
begin
 Result := done_;
end;


function  TManagedThread.isErroneus() : Boolean;
begin
 Result := erroneous_;
end;

{ TListenerThread }

procedure TListenerThread.Execute;
var
ClientSock: TSocket;
ClientThread: TTCPThread;
begin
   with ListenerSocket do
     begin
       CreateSocket;
        if LastError = 0 then
           WriteLn('Socket successfully initialized')
          else
           WriteLn('An error occurred while initializing the socket: '+GetErrorDescEx);
   Family := SF_IP4;
   setLinger(true,10000);
   bind('0.0.0.0', '5050');
    if LastError = 0 then
      WriteLn('Bind on 5050')
     else
      WriteLn('Bind error: '+GetErrorDescEx);
      listen;
      repeat
        if CanRead(100) then
         begin
           ClientSock := Accept;
            if LastError = 0
             then
              begin
              //TTCPThread.Create()
             ClientThread:=FThreadManager.GetSuspendThread(ClientSock);
              WriteLn('We have '+ IntToStr(FThreadManager.GetActiveThreadCount)+#32+'client threads!');
              end
             else
              WriteLn('TCP thread creation error: '+GetErrorDescEx);
         end;
        FThreadManager.clearFinishedThreads;
      sleep(10);
     until false;
    end;
end;

constructor TListenerThread.Create;
begin
FreeOnTerminate := True;
ListenerSocket := TTCPBlockSocket.Create;
FThreadManager:=TThreadManager.Create(20000);
if ListenerSocket.LastError = 0
  then
     WriteLn('Listener has been created')
  else
      WriteLn('Listener creation error: '+ListenerSocket.GetErrorDescEx);
inherited Create(False);
end;

destructor TListenerThread.Destroy;
begin
 ListenerSocket.Free;
   if
     ListenerSocket.LastError = 0
       then
           WriteLn('Listener has been deleted')
          else
            WriteLn('Listener deleting error: '+ListenerSocket.GetErrorDescEx);
  inherited;
end;

{ TTCPThread }

procedure TTCPThread.SetSocket(aSock: TSocket);
begin
   fSock.Socket := aSock;
   fSock.GetSins;
end;

procedure TTCPThread.Execute;
var
  s: ansistring;
begin
  fIp:=fSock.GetRemoteSinIP;
  fPort:=fSock.GetRemoteSinPort;
  WriteLn(format('Accepted connection from %s:%d',[fIp,fPort]));
  while not isDone  do
   begin
    if fSock.WaitingData > 0 then
     begin
      s:=fSock.RecvPacket(2000);
      if fSock.LastError <> 0 then
       WriteLn(fSock.GetErrorDescEx);
       ProcessingData(fSock.Socket,S);
      end;
    sleep(10);
   end;
end;

constructor TTCPThread.Create();
begin
 FreeOnTerminate := True;
 fSock := TTCPBlockSocket.Create;
 inherited Create(false);
end;

destructor TTCPThread.Destroy;
begin
  WriteLn(format('Disconnect from %s:%d',[fIp,fPort]));
  fSock.Free;
  inherited;
end;

procedure TTCPThread.ProcessingData(procSock: TSocket; Data: string);
begin
  if data <> '' then
   WriteLn(data+#32+'we get it from '+IntToStr(number)+' thread');
end;
 var
   Server: TListenerThread;
begin
   Server:=TListenerThread.Create;
   ReadLn;
end.

Leledumbo

  • Hero Member
  • *****
  • Posts: 8757
  • Programming + Glam Metal + Tae Kwon Do = Me
Re: [Synapse]Multythreaded TCP Server example.
« Reply #8 on: July 08, 2014, 01:43:03 am »
So my question is: how do I cleanup threads that are no longer in use?
Simply set FreeOnTerminate to true

Richard_1024

  • Newbie
  • Posts: 3
Re: [Synapse]Multythreaded TCP Server example.
« Reply #9 on: July 09, 2014, 08:55:58 pm »
Thanks for your reply. I changed TCPThread.FreeOnTerminate to "True". But nothing changed. Everytime a client connects (and disconnects) the thread counter is "adding" 1 thread. What I would like to achieve is that after 'processing data' the then active thread for that client is destroyed and used again. I have about 20 remote clients that send data (simple asci string/xml file) every 10/20 seconds. As far as I can see the threads never gets destroyed so after awhile the server no longer accepts new connections. Do I need to explicit call Destroy or Free after 'processing data'? Sorry for these questions I'm a newby trying to learn. Hope you can help me with this one.


User137

  • Hero Member
  • *****
  • Posts: 1791
    • Nxpascal home
Re: [Synapse]Multythreaded TCP Server example.
« Reply #10 on: July 10, 2014, 02:35:21 am »
FreeOnTerminate is responsible of freeing the thread after Execute method is finished running. If that is not happening you have bug in the code.

Leledumbo

  • Hero Member
  • *****
  • Posts: 8757
  • Programming + Glam Metal + Tae Kwon Do = Me
Re: [Synapse]Multythreaded TCP Server example.
« Reply #11 on: July 10, 2014, 10:06:28 am »
Where's the thread counter? Who's responsible of increasing/decreasing it?

Richard_1024

  • Newbie
  • Posts: 3
Re: [Synapse]Multythreaded TCP Server example.
« Reply #12 on: July 10, 2014, 02:48:51 pm »
I'm using the code as shown on top of this thread (by CynicRus)..

Leledumbo

  • Hero Member
  • *****
  • Posts: 8757
  • Programming + Glam Metal + Tae Kwon Do = Me
Re: [Synapse]Multythreaded TCP Server example.
« Reply #13 on: July 10, 2014, 05:31:13 pm »
I'm using the code as shown on top of this thread (by CynicRus)..
Not clear enough to me, how do you conclude that the thread counter is increasing despite to indicate that the thread is still alive after execute? Because I don't see any Dec against an ordinal var, nor a Remove against FThreadList (which seems to be used to get the number of threads). Note that dead thread won't get removed automatically from the list.

xardomain

  • New Member
  • *
  • Posts: 31
Re: [Synapse]Multythreaded TCP Server example.
« Reply #14 on: February 17, 2016, 05:52:05 pm »
Hello,
I have implemented a version of this Multithreaded Server, and I am experimenting a problem in this procedure:

procedure TThreadManager.clearFinishedThreads;
var
   i: Integer;
begin
   for i := 0 to FThreadList.Count - 1 do
         begin
           if (TTCPThread(FThreadList) <> nil) then
           begin
             if (TTCPThread(FThreadList.items).isDone = TRUE) then
             begin
                 TTCPThread(FThreadList).WaitFor;
                 TTCPThread(FThreadList).Free;
             end
           end
         end
end;

It does seem that it is unable to properly find the Thread (finished) that needs to be freed. It loops on each pointer in the FthreadList but is unable to find the one that is done and thus it never terminates it.  As far I could understand, this doesn't trigger:

if (TTCPThread(FThreadList.items).isDone = TRUE)

I am unable to find out if this is supposed to work, but putting a breakpoint here never gets true for any thread.

That's the reason why the counter in the example never decrements, because the counter is a related to the number of items of the FThreadList (that never gets the relevant element removed).

Could you kindly tell me what is wrong?
Thanks in advance.

Giuseppe Marullo

 

TinyPortal © 2005-2018