Share |

Common.Sockets

From Matrix Platform

Jump to: navigation, search

Documentation Common.Sockets



Contents

Introduction

This article presents a small, simple, lightweight, practical, fast, zero-configuration, asynchronous TCP/IP socket communication library. The library is built using .NET 2.0 and fully utilizes the capabilities of asynchronous socket communication. It also provides some commonly requested features like ability to add custom serialization for messages; automatic reconnection. The article below does not give detailed information on the operation of .NET asynchronous sockets, since there is plenty of it already available, but instead presents the details of a “ready to go” solution based on it. A few advanced techniques have been utilized to produce a highly optimized solution, and those shall be the focus of this presentation.

The Source Code, Related Toolkits and Frameworks and the Matrix Platform

This Asynchronous Socket System is a part of the Matrix Platform library. The source code of the library contains a few of the common classes used in the Matrix Platform as well as the socket communication system, these classes are stored in the Matrix.Common.Core folder of the project. If you want to get the remaining source code of the platform, go to www.matrixplatform.com web site and see downloads section; the Matrix Platform is an LGPL Open Source Platform. The Matrix Platform also offers one more library that allows to use message based communication – the Message Bus. It is a very versatile solution providing a higher level of communication with many advanced features, and is designed to work not only in remote mode (TCP/IP) but also in local mode; it can be used for inter component communication inside your solution. You can find the article presenting it here (http://matrixplatform.com/index.php?title=Framework.MessageBus).


Functionality and Implementation Overview

Much like any other network connectivity solution, the library offers an implementation of server and client class. The server is capable of communicating with all the clients, as well as managing their state (for ex. disconnecting any specific ones). It also has full support for connected/disconnected and data sent/received events. The client is able to connect to the server, send messages etc.


Class Diagram

The following diagram shows the classes of the library, along with public properties and methods. As you can see, the library has extremely compact and simplified implementation with only 7 classes:

UML diagram of the classes in the library.

The SocketCommunicator class

The core functionality of the library is hosted inside the SocketCommunicator class. It handles managing the actual socket connection, performs sending and provides confirmation; a confirmation is generated when the underlying TCP/IP layer transports a chunk of data (a message) successfully. The SendAsync() is used to send data; it allows to track each message based on the Id returned by the method. Here is how to use the method:

long messageId = client.SendAsync(myMessage, null);

The messageId will appear in events like the SendAsyncCompletedEvent so it can be used to track the progress of the message.


The SocketCommunicatorEx class

This class provides the “Keep Alive” functionality. It sends a “keep-alive” system message every X seconds to notify the other party in the communication that we are still alive. It is useful in cases where the connection dies, but the underlying TCP/IP layer does not detect this. In this case the keep alive mechanism will detect the connection has been destroyed shortly and notify you. The operation of this feature is controlled by 2 timers.

  • The KeepAliveTimerInterval (45sec default) controls the period of the “polling”.
  • The KeepAliveTimeoutInterval (160sec default) controls the maximum time that is allowed to pass with no messages incoming, prior to declaring a connection to be dead.


The SocketClientCommunicator class

This class provides the “AutoReconnect” feature. The feature is off by default, so make sure to set it the AutoReconnect to true if it is needed. It allows to automatically (re)establish a connection, if the connection has been lost or is not yet established. Since this feature is client side specific, it resides in this class that is not used by the server part. The reconnection monitoring is controlled by a timer mechanism, polling by default on a 5 seconds period.


The SocketMessageClient class

This is the bottom class for the client side as shown on the UML diagram. Currently it extends the SocketClientCommunicator client with no significant functionality, however this class is a placeholder for any client side functionality that will come in future versions of this library. See the sample section of this article for an example of using this class.


The SocketMessageServer class

This class encapsulates all the server side functions; this class is instantiated to create a server. The class uses a list of many SocketCommunicatorEx instances; each of those corresponds to a connection to a client so that each connection is managed by an instance if the SocketCommunicatorEx class. To use the SocketMessageServer class, create an instance and call the Start(IPEndPoint endPoint) method with the local endpoint as parameter. This will initiate the server on this endpoint (typically a port on the localhost) and wait for incoming connections. If the port is taken the server will fail to start. To stop the operation call Stop().

The class also allows the following operations:

  • SendAsync() send a message asynchronously
  • SendToAll() sends a message asynchronously to all currently connected clients
  • DisconnectClient() drops a connection to a specific client

See the sample section of this article for more details on how to use the class.


The SystemMessage class

This class represents a system message. There are 2 types of messages in the library – system and client. Client messages are not bound by any pre-requirement, and are handled as “Object” class instances. System messages need to inherit the SystemMessage class – they are used for Keep Alive and similar system level communications.


Diagnostics

The library also optionally uses some functionalities related to Diagnostics. These are part of the Matrix Platform, and the platform uses the diagnostics mechanism intensively to track application state. The diagnostics allows to track key events, methods, class etc. while the application is running. To evade mandatory referencing of diagnostics assemblies and allow optional usage of the diagnostics functionality, all the diagnostics code in the library is defined within the Matrix_Diagnostics defines like so:

#if Matrix_Diagnostics 	
...diagnostics code here...
#endif

The version of the project attached to this article does not include this define, so diagnostics source code is inactive and no references to diagnostics assemblies are made. If you wish to utilize this part of functionality, download the full version of the platform from the Matrix Platform website (www.matrixplatform.com).


Serialization

Both the server SocketMessageServer and the client SocketMessageClient class require a serializer to convert the incoming messages into bytes and vice versa. By default, a BinarySerializer is provided that uses the .NET framework serializer. If you plan to use it make sure all messages are serializable and bear the [Serializable] attribute. It is also possible to implement and use custom serializers, by simply implementing the ISerializable interface and feeding them into the client and server classes. Creating a custom serializer is a good option if very high performance is required, since it can be set up to work optimally with custom message types.


Security Extendibility

The current version of this library does not offer any pre-provided security features. However it is very easy to implement the following:

  • Connection control security

If a server needs to control incoming connections based on some criteria (for ex. IP filtering, frequency etc.) this can be done using the ClientConnectedEvent. The event contains an instance of the SocketCommunicatorEx that can be used to access all the details of the connection. Once the connection is established, if it does not fit the requirements of the server it can be immediately dropped.

  • Connection data encryption

The best place to add this functionality would be in the message serializer. The pre-provided serializers can easily be extended to also perform encryption/decryption, or a brand new serializer can be easily created to serve this purpose– the ISerializer interface that needs to be implemented is very simple.


Why do we need yet another .NET TCP/IP socket communication library?

I was surprised to discover that my search in the internet produced no good .NET socket communicator library, based on asynchronous sockets. Not even one, that covers all the criteria of being a purpose built, lightweight, optimized solution, with a built in ability to reconnect. Also simplicity of usage was of highest importance as well as minimum configuration – the library had to handle as much as it can, without putting the burden on the user and be easy to utilize. After spending quite some time trying to fit the .NET 3.5 WCF into these criteria, the result was a fairly convoluted solution with poor performance (WCF is SOAP/XML based).There is a fair amount of open source communication libraries available on the web, however really few make usage of the Asynchronous sockets capabilities and this plays a decisive role when performance is concerned.


Usage Sample

Here is how to create a server using the default serializer(this is part of the source code from the Sample Server project):

            SocketMessageServer server = new SocketMessageServer(new BinarySerializer());
 
            // Handling those events is optional.
            server.ClientConnectedEvent += new SocketMessageServer.ServerClientUpdateDelegate(messageServer_ClientConnectedEvent);
            server.ClientDisconnectedEvent += new SocketMessageServer.ServerClientUpdateDelegate(messageServer_ClientDisconnectedEvent);
 
            IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, SocketMessageServer.DefaultPort);
            server.Start(endPoint);

One thing to note is, each server requires a serializer. The serializer converts messages you try to send to binary information that can be transported over the TCP/IP connection. Since there is no limit on the type of data transported, this serializer is needed. Creating a client is just as easy (this is part of the source code from the Sample Client project):

            SocketMessageClient messageClient = new SocketMessageClient(EndPoint, new BinarySerializer());
 
            // Handling these events is optional.
            messageClient.ConnectedEvent += new SocketCommunicator.HelperUpdateDelegate(Helper_ConnectedEvent);
            messageClient.DisconnectedEvent += new SocketCommunicator.HelperUpdateDelegate(Helper_DisconnectedEvent);
 
            // Enable auto reconnect; this will force a connection attempt initially, 
            // or we can use the ConnectAsync() method.
            messageClient.AutoReconnect = true;
 
            ...
 
            // Once we are connected we can send some data over to the other side (in this case an instance called myMessage)
            messageClient.SendAsync(myMessage, null);


Implementation Details

The most important part of the implementation rests in the SocketCommunicator class.


Sending Data

Since the asynchronous sockets work in a “stateless” way, the AsyncMessageSendInfo class was introduced to keep track of things. It stores “per message” information like Id, the message itself as well as the memory stream that serves as a buffer for the data prior to its transportation. The AsyncMessageSendInfo instance is stored inside the SocketAsyncEventArgs.UserToken field so that it can be later reused when one of the SocketAsyncEventArgs events comes in.

This is a slightly simplified version of the AsyncSend method:

        public long SendAsync(object message, TimeSpan? requestConfirmTimeout)
        {
            System.Net.Sockets.Socket socket = _socket;
            if (IsConnected == false || socket == null)
            {
                return InvalidSendIndex;
            }
 
            ISerializer serializer = _serializer;
            if (serializer == null)
            {
                return InvalidSendIndex;
            }
 
            // Event used for confirmed calls
            ManualResetEvent sendCompleteEvent = null;
            if (requestConfirmTimeout.HasValue)
            {
                sendCompleteEvent = new ManualResetEvent(false);
            }
 
            AsyncMessageSendInfo messageSendInfo = new AsyncMessageSendInfo() {
                                                                                  Id = PendingSendId,
                                                                                  Socket = socket,
                                                                                  Message = message,
                                                                                  ConfirmationEvent = sendCompleteEvent,
                                                                              };
 
            SocketAsyncEventArgs e = new SocketAsyncEventArgs();
            e.UserToken = messageSendInfo;
            e.Completed += new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_SendComplete);
 
            messageSendInfo.Stream = new MemoryStream();
            if (serializer.Serialize(messageSendInfo.Stream, message) == false)
            {
                messageSendInfo.Dispose();
                return InvalidSendIndex;
            }
 
            e.SetBuffer(messageSendInfo.Stream.GetBuffer(), 0, (int)messageSendInfo.Stream.Length);
            if (messageSendInfo.Socket.SendAsync(e) == false)
            {
                messageSendInfo.Dispose();
            }
 
            // Reaquire the event, to lessen the chance of [ObjectDisposedException]
            // when the connection is not established and so we get the errors on complete instantly.
            sendCompleteEvent = messageSendInfo.ConfirmationEvent;
            if (sendCompleteEvent != null)
            {
                try
                {
                    if (sendCompleteEvent.WaitOne(requestConfirmTimeout.Value) == false)
                    {
                        return InvalidSendIndex;
                    }
                }
                catch (ObjectDisposedException ex)
                {
			  { Log error details here }
                    return InvalidSendIndex;
                }
            }
 
            return messageSendInfo.Id;
        }

It is interesting to note, since we need to keep track of the SocketAsyncEventArgs operation, we need to subscribe to the SocketAsyncEventArgs.Completed event, to make sure we get notified when the send operation is complete. Also the AsyncMessageSendInfo maintains a ManualResetEvent on its own and this is used when we need to receive confirmation for the send complete. The confirmation wait timeout is set by the requestConfirmTimeout parameter taken by the method. If the operation was a success the result is the unique Id of the operation, that was also assigned to the AsyncMessageSendInfo. This allows to track each call individually if needed. The SocketCommunicator.SendAsyncCompleteEvent can be used to track when each send operation gets completed.


Receiving Data

Receiving data implementation is a little bit more complex, since it requires us to track incoming events as well as incoming data size to make sure we have enough data to compose the following incoming message. There are 2 helper methods that assist with operations related to the asynchronous event args used by async sockets – the AssignAsyncReceiveArgs and the ReleaseAsyncReceiveArgs. There are also some code specific that came up once real testing was concluded and are hard to generate initially by only reading the documentation.

Receiving logic is implemented mainly in the SocketAsyncEventArgs_Received() method, that gets invoked whenever a receive has been completed. It is important to note that upon each receive, the async arguments need to be updated. Also it is important to make sure concurrent access protection is in place, since the asynchronicity implies the usage of multiple threads.

Here is the main receive method. It does process the incoming event arguments, and stores the extracted data inside the pending receive stream (_pendingReceiveStream) that holds it until we have enough to construct a message. In case the incoming data is corrupted and a message can not be generated properly, an InvalidDataException is thrown, otherwise the stream gets reset and waits for another batch of incoming data.

        void SocketAsyncEventArgs_Received(object sender, SocketAsyncEventArgs e)
        {
 
            bool resetReceiveArgs = true;
            try
            {
                if (e != _lastReceiveArgs)
                {// We must make sure that we only handle these one at a time.
                    e.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Received);
                    e.Dispose();
 
                    resetReceiveArgs = false;
                    return;
                }
 
                if (e.SocketError == SocketError.ConnectionReset)
                {// Connection was reset.
                    RaiseDisconnectedEvent();
                    resetReceiveArgs = false;
                    return;
                }
 
                System.Net.Sockets.Socket socket = _socket;
                if (socket == null)
                {
                    resetReceiveArgs = false;
                    return;
                }
 
                if (e.SocketError != SocketError.Success || socket.Connected == false)
                {
                    resetReceiveArgs = false;
                    return;
                }
 
                if (e.BytesTransferred == 0)
                {
                    resetReceiveArgs = false;
                    return;
                }
 
                lock (_syncRoot)
                {// Start the stream operations.
 
                    long streamStartPosition = _pendingReceiveStream.Position;
 
                    _pendingReceiveStream.Seek(0, SeekOrigin.End);
                    _pendingReceiveStream.Write(e.Buffer, 0, e.BytesTransferred);
 
                    _pendingReceiveStream.Seek(streamStartPosition, SeekOrigin.Begin);
                }
 
                ISerializer serializer = _serializer;
                if (serializer == null)
                {
                    return;
                }
 
                object message = null;
                do
                {
                    lock (_syncRoot)
                    {
                        if (_pendingReceiveStream.Length <= _pendingReceiveStream.Position)
                        {// Already read to the end of stream.
                            break;
                        }
 
                        long startPosition = _pendingReceiveStream.Position;
                        try
                        {
                            message = serializer.Deserialize(_pendingReceiveStream);
                            if (message == null && _pendingReceiveStream.Position != startPosition)
                            {// No message was retrieved, and stream was corrupted.
                                throw new InvalidDataException();
                            }
                        }
                        catch (InvalidDataException ex)
                        {   
                            // The serialization routine has failed, or the stream is corrupt;
                            // clear everything and try to start over (error recovery).
                            message = null;
                            _pendingReceiveStream.SetLength(0);
                        }
                    }
 
                    if (message != null)
                    {
                        if (message is SystemMessage)
                        {// System message received.
                            ProcessSystemMessage(message as SystemMessage);
                        }
                        else
                        {// Custom user message.
                            MessageUpdateDelegate delegateInstance = MessageReceivedEvent;
                            if (delegateInstance != null)
                            {
                                delegateInstance(this, message);
                            }
                        }
                    }
 
 
                } 
                while (message != null);
 
 
                lock (_syncRoot)
                {
                    if (_pendingReceiveStream.Position == _pendingReceiveStream.Length)
                    {// Reset the receive stream.
                        _pendingReceiveStream.SetLength(0);
                    }
                    else
                    {
                        // Swap primary and secondary streams, copying over the remaining data.
                        MemoryStream existingStream = _pendingReceiveStream;
                        MemoryStream newStream = _pendingReceiveStreamSecondary;
 
                        newStream.SetLength(existingStream.Length - existingStream.Position);
                        // Copy the left over data.
                        existingStream.Read(newStream.GetBuffer(), 0, (int)newStream.Length);
                        newStream.Seek(0, SeekOrigin.Begin);
 
                        CommonHelper.Swap<MemoryStream>(ref _pendingReceiveStream, ref _pendingReceiveStreamSecondary);
                        _pendingReceiveStreamSecondary.SetLength(0);
 
                        if (existingStream.Position != existingStream.Length)
                        {
                            throw new SystemException("Data propagation error.");
                        }
                    }
                }
 
            }
            finally
            {
                if (resetReceiveArgs)
                {
                    AssignAsyncReceiveArgs(true);
                }
                else
                {
                    ReleaseAsyncReceiveArgs();
                }
            }
        }


Advanced system messaging extendibility

The implementation also allows to implement advanced behavior, by using system messages. Currently the only used system message is the base SystemMessage class, that serves for the “keep alive” tracking. If there is need to extend the communication on this level, one can override the preprovided ProcessSystemMessage method and handle accordingly. The only request for a system message is that it inherits from the SystemMessage class and is binary serializable.


Performance Overview

Performance does vary allot, and depends on things like message size, serialization modes, confirmation requirements and computer configuration. In test environment, with proper configuration and setup, the library was able to transport up to 100 000 messages per second; however this was achieved with a very small simple predefined message and no serialization, confirmation or duplex confirmations. In live environment a 5 000-20 000 messages / second is much more common, depending on hardware, network capacity etc.


Sample & Test projects

The included source code package also contains 2 test projects – Sample Server and Sample Client. Those demonstrate the usage of the library in a typical server client scenario; both the server and client send random size messages once the connection gets establish and report progress in the console window.


Conclusion

A simple, versatile library like this has many applications; despite the abundance of high level communication APIs that has come to be in the last few years a simple straightforward solution is often preferable and cost effective. The library has excellent performance, is very easy to integrate and is a great “drop-in” solution for transporting information between any 2 points of your system.

modified on 30 June 2010 at 21:47 ••• 12,136 views