Share |

Framework.MessageBus

From Matrix Platform

Jump to: navigation, search

Documentation Framework.MessageBus

Contents



Introduction

The Message Bus is a message distribution and delivery framework built on C# .NET 2.0. It is highly optimized, fast and lightweight solution for distributing information between the different parts of your solution. It also comes with a few extras, like the ability to automatically match messages based on method input parameters and execute multiple messages in parallel on separate threads.


The Message Bus also comes with integrated TCP.IP connectivity capabilities that allow to natively distribute your solution across multiple locations. The distribution process is transparent to the overlaying elements of the framework, meaning no code changes are required on the communication side to utilize these abilities – this is achieved through one time configuration and setup only.

Finally, the Message Bus collects information from each participating component (or client) regarding its type and implemented interfaces. It stores and automatically distributes this information to other clients, in an error tolerant way (so that no crashes occur if one part has no knowledge on a specific type of the other).

The following diagram shows a typical configuration of 2 message bus instances (one server one client), connected together with a TCP.IP connection. Each of them has clients attached to it, and all of the clients are able communicate with each other through the use of messages.

Purpose and Functionality

Messages

The framework can transport any kind of data; it uses the [System.Object] class as payload type so data from any type can be transported. A special class that stores the transported payload is used. This container class is called an Envelope and it stores all information related to where the message is coming from and where it is going to, as well as the actual message data itself. In order to be able to send a message you need to put it in an envelope instance and specify the receiver(s), like this:

            Envelope envelope = new Envelope();
            envelope.Message = new MyPayload();
            bus.Send(senderId, receiverId, envelope, null, false);

Also note, you need to set the first parameter of the Send method to be the Id of the sending client – this allows the receiver of the message to send a response if needed. The last 2 parameters are for advanced usage (related to confirmation timeout and diagnostics) and are set to blank for the current test.


Here is the full source code for the sample (you can also find it in the Matrix.Framework.MessageBus.UnitTest assembly):

            MessageBus.Core.MessageBus bus = new MessageBus.Core.MessageBus("Bus");
 
            ActiveClient client1 = new ActiveClient("ClientOne");
            ActiveClient client2 = new ActiveClient("ClientTwo");
            client2.EnvelopeReceivedEvent += new MessageBusClient.EnvelopeUpdateDelegate(client2_EnvelopeReceivedEvent);
 
            bus.AddClient(client1);
            bus.AddClient(client2);
 
            Envelope envelope = new Envelope();
            envelope.Message = new MessagePayload();
 
            bus.Send(client1.Id, client2.Id, envelope, null, false);


Execution

In order to utilize the abilities of modern hardware to run multiple threads at the same time, the framework incorporates a simple yet powerful model of execution parallelism. This sub-framework (called Execution Strategies) allows to specify what, when and how many threads process the messages that are coming to a given client. It takes for granted that a client can receive and process in its methods multiple messages at the same time. It is also only active for clients that are of type ActiveClient or a type that inherits it, the default MessageBusClient class does not utilize these capabilities.


The behavior and runtime control of executing threads on an ActiveClient is done inside the Execution Strategy that is assigned for that client. There are two pre-provided execution strategies available:

• First one based on the default .NET framework thread pool is called FrameworkThreadPoolExecutionStrategy; it is a very simple implementation, is versatile and can be used in many scenarios where very fine grained control over the executions is not needed; also note that since the .NET framework thread pool is a single instance in your application, all the clients that use this strategy will reuse the same threads and have common maximum thread count limitation etc.

• The second one is based on a custom thread pool implementation, and is called ThreadPoolFastExecutionStrategy; it is very fast (in certain conditions faster than the .NET thread pool), allows much more detailed control over the executing threads type and number; it also allows to have a dedicated thread pool assigned to a single client, so there is no sharing of threads In case you do not specifically assign an execution strategy to a client, it will reuse the default strategy of the Message bus it belongs to.

Client Types

MessageBusClient is an abstract class, parent for all client types. It stores common features for a client like an Id and ExecutionStrategy.

ActiveClient is the basic client class, it can send and receive messages, and it also specifies the used execution strategy that defines how messages are executed once they arrive at a client.

ActiveInvocatorClient extends the active client, with ability to automatically receive corresponding messages directly to client object methods. To take advantage of this functionality, simply mark the method with [EnvelopeReceiver] attribute.


Here is an example of working with ActiveClient and ActiveInvocatorClient capabilities:

        /// <summary>
        /// Class shows how to automatiaclly accept messages coming from the Message bus
        /// in corresponding methods. Make sure to mark with [EnvelopeReceiver] attribute.
        /// </summary>
        public class MessageReceiver
        {
            [EnvelopeReceiver]
            public void Receive(int data)
            {// Consume the received integer data.
            }
 
            [EnvelopeReceiver]
            public void Receive(Envelope envelope, string data)
            {// Consume the received integer data.
                // NOTE: as shown in this method - you can put the Envelope class as first parameter, 
                // so to be able to access the actual envelope instance if needed.
            }
 
            [EnvelopeReceiver]
            public void Receive(double data)
            {// Consume the received double data.
            }
        }
 
        /// <summary>
        /// Shows how messages are received in clients.
        /// </summary>
        [Test]
        public void SimplestTest2()
        {
            // Create the message bus.
            MessageBus.Core.MessageBus bus = new.MessageBus.Core.MessageBus("Bus");
 
            // Create the first client and add it to the bus.
            ActiveClient client1 = new ActiveClient("ClientOne");
            bus.AddClient(client1);
            // Assign message handling for this client - all messages will be received in the client_EnvelopeReceivedEvent method.
            client1.EnvelopeReceivedEvent += new MessageBusClient.EnvelopeUpdateDelegate(client_EnvelopeReceivedEvent);
 
            // Create the second cliend and add it to the bus.
            ActiveInvocatorClient client2Invocator = new ActiveInvocatorClient("ClientTwo");
            bus.AddClient(client2Invocator);
 
            // Assign a receiver source for the ActiveInvocation client. It will automatically receive
            // messages data inside its methods.
            MessageReceiver receiver = new MessageReceiver();
            client2Invocator.Source = receiver;
 
            // Create the envelope and assign the data.
            Envelope envelope = new Envelope();
            envelope.Message = (int)(121);
 
            // Send the envelope from client 1 to client 2... data will be received in the MessageReceiver::Receive(int) method.
            bus.Send(client1.Id, client2Invocator.Id, envelope, null, false);
        }
 
        void client_EnvelopeReceivedEvent(MessageBusClient stub, Envelope envelope)
        {// Client 1 will receive all messages here.
        }

This example showed how to use the different type of client classes. Please note, you can also access the Envelope instance in the ActiveInvocatorClient usage mode, by specifying it as a first parameter in the method, as shown in the MessageReceiver::Receive(Envelope envelope, string data)

Connectivity

The TCP.IP integrated functionality works on top of a Sockets dedicated assembly, that provides the connectivity abilities. The model of operations is Server-Client, meaning a message bus can be either a server, a client or none (in that case it is not linked to anything). To create a Message bus in server mode use the ServerMessageBus class, for client – the ClientMessageBus and for none the MessageBus. To instantiate a server message bus you need to specify TCP port that the server will operate upon (or leave empty for default). To instantiate a client message bus, you need to specify both port and address (can be any valid IP address). See corresponding class constructors for more details.

The TCP.IP also has integrated user / password access control capabilities. Those are fairly basic, still the support is provided and can easily be extended to a full blown detailed access control system, by implementing one or two custom classes.

The following example shows how to setup and use the Message bus in remote communication mode (you can see a full working unit test of this in the MessageBusNetTest class inside the Matrix.Framework.MessageBus.UnitTest project):

        /// <summary>
        /// Setup the server and the proxy at compatible addresses, and add clients to both. Connection should 
        /// be established automatically in 1-2 secs.
        /// </summary>
        public void Init()
        {
            _bus = new ServerMessageBus("Bus1", 15726, null);
            _proxy = new ClientMessageBus(new IPEndPoint(System.Net.IPAddress.Loopback, 15726), string.Empty, null);
 
            _client1 = new ActiveInvocatorClient("c1");
            _client1.EnvelopeReceivedEvent += new MessageBusClient.EnvelopeUpdateDelegate(client_EnvelopeReceivedEvent);
 
            _bus.AddClient(_client1);
 
            _client2 = new ActiveInvocatorClient("c2");
            _client2.EnvelopeReceivedEvent += new MessageBusClient.EnvelopeUpdateDelegate(client_EnvelopeReceivedEvent2);
 
            _proxy.AddClient(_client2);
 
            // Give some time for the TCP to complete connection.
            Thread.Sleep(2000);
        }
 
        /// <summary>
        /// This tests sending and receiving messages on both sides at the same time.
        /// </summary>
        public void CombinedTest()
        {
            bool errored = false;
            Envelope envelope = new Envelope() { Message = 12 };
            if (_proxy.Send(_client2.Id, _client1.Id, envelope, null, true) != OutcomeEnum.Success)
            {
                errored = true;
                Console.WriteLine("Failed to send 1");
            }
 
            envelope = new Envelope() { Message = 88 };
            if (_bus.Send(_client1.Id, _client2.Id, envelope, null, true) != OutcomeEnum.Success)
            {
               errored = true;
               Console.WriteLine("Failed to send 2");
            }
        }


Sister Frameworks, Toolkits & The Matrix Platform (www.matrixplatform.com)

The Message Bus is a part of the Matrix Platform - a base for a few frameworks that are designed to work together. Those all come as parts of the Matrix Platform. It is a fully open source (LGPL) licensed solution that combines a few solution development frameworks under one roof. Currently the platform provides 4 distinguished parts –The Super Pool Framework, Message Bus, Diagnostics Toolkit as well as a Data Storage solution. You can find more information on each of those on the Matrix Platform Web Site (www.matrixplatform.com) as well as the dedicated Code Project articles.

The following diagram demonstrates the assemblies involved in the Matrix Platform and how they stack one next to the other. The Message bus can be used by referencing the Matrix.Framework.MessageBus assembly (along with using the all the remaining infrastructure assemblies from the Matrix Platform marked with blue color) or in standalone mode – by referencing only the Matrix.Framework.SuperPool.Standalone assembly.



Important

The Message Bus is a versatile and coherent framework; however it is entirely based around the concept of messages. Prior to utilizing it, make sure to explore the Super Pool framework, that is a strong evolution of this concept, and allows performing similar operations using interface based calls instead of formatting messages. The Super Pool is also part of the Matrix Platform and it uses the Message Bus for its operation. You can read more about it here: http://matrixplatform.com/index.php?title=Framework.SuperPool

Assemblies and Source Code

The Message Bus implementation is entirely stored inside the Matrix.Framework.MessageBus assembly, however there are a few other assemblies referenced, that provide needed functionality like TCP.IP connectivity or Diagnostics. It is strongly advisable to download the latest full Matrix Platform solution, since it contains not only all the mandatory needed projects, but also extras like unit tests and samples. To download it, go to www.matrixplatform.com and enter the Downloads section, or use this link:

http://matrixplatform.com/index.php?title=Downloads

Standalone usage

In case you wish to use the Message Bus as a standalone solution, with a single assembly, you can use the Matrix Framework Super.Pool.Standalone assembly. It contains all the elements that are required to run the Message Bus as well as the Super Pool framework. The only functionality missing is the Diagnostic system integration.

You can find this standalone DLL as part of the full Matrix Platform solution (see web site: www.matrixplatform.com and go to Downloads, Source Code).

Technical Details

Overview

The implementation of the toolkit contains 4 main parts:

• MessageBus transports actual messages to and from clients, and manages these type of operations; it also manages clients instances and Ids to that each message can be routed to the proper receiver.

• Envelope stores the transported information, as well as information on the “route” the message has traveled

• Client classes receive and process the received messages

• Network classes allow to propagate information over TCP/IP networking and also to build server and clients that construct it.

Class Diagram

The following diagram shows the relations between core classes as well as their properties, events and methods.


Implementation – Message Bus Core

The majority of the Message Bus Core functionality is stored inside the MessageBusBase and MessageBus classes. What follows is the MessageBus.DoSendToClient() implementation. It shows the core of the message bus in action, and at the same time is a simple and easy to understand method:

        protected virtual SendToClientResultEnum DoSendToClient(ClientId senderId, ClientId receiverId,
                                                                Envelope envelope, TimeSpan? requestConfirmTimeout)
        {
            if (receiverId.MessageBus != this)
            {
                return SendToClientResultEnum.ClientNotFound;
            }
 
            // Obtain client instance.
            MessageBusClient client = GetLocalClientByIndex(receiverId.LocalMessageBusIndex);
            if (client == null)
            {
                return SendToClientResultEnum.ClientNotFound;
            }
 
            ISerializer serializer = _serializer;
            if (serializer == null)
            {
                return SendToClientResultEnum.Failure;
            }
 
            // Duplicate what (if anything) as according to envelope duplication model.
            envelope = envelope.Duplicate(serializer);
            envelope.History.PushStamp(new EnvelopeStamp(PendingStampId, receiverId, senderId));
            // Deliver envelope to client.
            if (client.Receive(envelope))
            {
                return SendToClientResultEnum.Success;
            }
            else
            {
                return SendToClientResultEnum.Failure;
            }
        }


The method retrieves the client instance, obtains the default serializer and uses it to duplicate the envelope. The new envelope is then marked with a “stamp” indicating where it is coming from and delivered to client.

Implementation – Client Classes

As I have already mentioned in the functionality overview section, there are 3 pre-provided types of clients: MessageBusClient, ActiveClient and ActiveInvocatorClient.


Execution Model

The execution model is a very important part of the client functionality. An overview of the model is seen in the first diagram of this article – the red circles “Default Execution Strategy” reflect the working thread pools that process the messages. Each time a message is received, the execution model defines how this message will be executed upon the client. By default, messages coming in from the Message Bus are executed on a thread pool, so the code that processes the actual message on the client side is executing in a separate dedicated thread.

This approach has many advantages like exception safety, multi-core utilization and natural inclination toward distributed execution. It also allows to have the message bus operate separately from the clients executions and thus the danger of a client clogging up the entire pipeline is reduced greatly.


This behavior is entirely controlled by the execution strategy. By default 2 strategies are provided:

• the FrameworkThreadPoolExecutionStrategy executed messages on the .NET framework thread pool

• the ThreadPoolFastExecutionStrategy executes using a custom thread pool implementation; it has advanced features like thread count control per strategy

If you wish to implement a custom execution strategy simply inherit the ExecutionStrategy class and feed your child class it to the message bus or message bus client to use it.

The following diagram gives an idea how the system operates:



Client A sends a message trough the message bus. When the message arrives at the Client B (ClientB inherits ActiveClient or ActiveInvocatorClient) the execution strategy consumes it and puts it for execution at the thread pool. The thread pool launches a new thread and this new thread enters the custom message processing code.


Automatic Message Type Handling


The most interesting technical aspect here is the way the ActiveInvocatorClient class is capable of delivering messages directly to the corresponding methods of its “source” class (as seen in the MessageReceiver example). To achieve this reflection is employed, however to improve on the relatively slow execution speed of purely reflected execution, a FastInvokeHelper is used. This helper is based on a fellow Codeproject article you can find here (http://www.codeproject.com/KB/cs/FastMethodInvoker.aspx).

All of the information related to handling a message type is stored in a ActiveInvocatorClient.TypeHandler class instance. When a new message execution arrives to the ActiveInvocatorClient class, the following method dispatches it to the corresponding message type handler, that in turn passes it to the source method:

        protected override void OnPerformExecution(Envelope envelope)
        {
            Dictionary<Type, TypeHandler> typeHandlersHotSwap = _typeHandlersHotSwap;
            Type messageType = envelope.Message.GetType();
            object target = _source;
            if (target != null && envelope.Message != null && typeHandlersHotSwap != null)
            {
                TypeHandler handler;
                if (typeHandlersHotSwap.TryGetValue(messageType, out handler))
                {
                    handler.Invoke(target, envelope);
                }
            }
        }

Implementation – Network Classes

Note: You can find samples on setting up a network connection in the Connectivity section of this article.

The network server (ServerMessageBus) and client (ClientMessageBus) classes both inherit the MessageBus class and extend it with abilities to communicate trough TCP/IP as a server or as a client. The concept behind network connectivity between multiple MessageBuses is there is one central MessageBus that plays the role of “Master” (aka. Server) and all the remaining client Bus instances connect to it. This implements a “star” type architecture, as opposed to, for ex. a peer-to-peer model where all participants are equal. The “star” architecture was chosen for its simplicity.


The ServerMessageBus and ClientMessageBus talk to each other through the usage of the following messages: AccessMessage, ClientsListMessage, ClientUpdateMessage, CommonResponseMessage, EnvelopeMessage, RequestClientListUpdateMessage and StateUpdateMessage. I shall not put description for each message here since their names are highly descriptive and also each message class has a comment describing its purpose – in case you one needs to know.


Another aspect of network functionality is the access control classes (ServerAccessControl and ClientAccessControl). They provide a very simple authorization mechanism that (when enabled) requires a username and password from the connecting client message bus, in order for it to gain access to the server side message bus. This mechanism is easy to extend if needed, both the AccessMessage and the routines in the message bus classes can be further enhanced for a more detailed control over log in.


The most advanced feature of the Server-Client integration is the smooth way all the clients of all the message buses connected are able to communicate with each other. The architecture makes it so that a client does not need to have any awareness when communicating to another client, as to whether this other client is local or remote.


Finally, here is a simplified version of the ServerMessageBus._server_ClientMessageReceivedEvent() method. It shows some of the operations that are done upon receiving a message, and gives idea on how the implementation is achieved. The idea of this method is to process each individual message type and extract whatever information is needed for its processing. Please note some diagnostics code was omitted in order to simplify this example:


        void _server_ClientMessageReceivedEvent(SocketMessageServer server, SocketCommunicatorEx client, object message)
        {
            ServerAccessControl accessControl = AccessControl;
            // Check security first.
            if (accessControl != null && message is AccessMessage == false)
            {
                if (accessControl.IsAllowed(ObtainClientAccessControl(client.Id)) == false)
                {
                    return;
                }
            }
 
            if (message is EnvelopeMessage)
            {// Envelope user message.
 
                EnvelopeMessage envelopeMessage = (EnvelopeMessage)message;
 
                // Remove the remote message bus index association.
                envelopeMessage.Sender.LocalMessageBusIndex = ClientId.InvalidMessageBusClientIndex;
 
                foreach (ClientId id in envelopeMessage.Receivers)
                {
                    // Assign the id as local id, if it is, otherwise skip it.
                    id.LocalMessageBusIndex = base.GetClientIndexByGuid(id.Guid);
                    if (id.IsMessageBusIndexValid)
                    {
                        id.MessageBus = this;
                        if (DoSendToClient(envelopeMessage.Sender, id, envelopeMessage.Envelope, null) != SendToClientResultEnum.Success)
                        {// Report error.
                        }
                    }
                }
            }
            else if (message is ClientsListMessage)
            {// Message bus system message.
 
                ClientsListMessage updateMessage = (ClientsListMessage)message;
                for (int i = 0; i < updateMessage.Ids.Count; i++)
                {
                    RegisterClientId(client.Id, updateMessage.Ids[i], updateMessage.Types[i], updateMessage.SourcesTypes[i]);
                }
 
            }
            else if (message is RequestClientListUpdateMessage)
            {
                SendClientsUpdate(client.Id);
            }
            else if (message is ClientUpdateMessage)
            {
                ClientUpdateMessage updateMessage = (ClientUpdateMessage)message;
 
                bool validClient;
                lock (_syncRoot)
                {
                    validClient = _remoteClientNetId.ContainsKey(updateMessage.ClientId);
                }
 
                if (validClient)
                {
                    RaiseClientAddedEvent(updateMessage.ClientId);
                }
            }
            else if (message is AccessMessage)
            {
                ClientAccessControl control = ObtainClientAccessControl(client.Id);
                if (control != null)
                {
                    control.Update(message as AccessMessage);
                }
            }
            else if (message is StateUpdateMessage)
            {
                RaiseCounterPartyUpdateEvent("Client:" + client.Id.ToString(), ((StateUpdateMessage)message).State.ToString());
            }
        }

Other

An interesting detail of implementation is type information is distributed using string descriptors for types, instead of Type class instances. This is done so that to evade de-serialization issues when receiving type information in assemblies that have no knowledge of this type. The implementation that matches the string identifiers to the actual types has been done using cached containers for optimized performance, and due to that the speed of execution does not suffer. The Message Bus also makes extensive usage of Hot Swap collections. You can find a dedicated an article on this topic here {on codeproject}.


Duplication Modes

Duplication modes are set inside the Envelope class, and are specific for each Envelope sent. They specify how the Envelope and the data it transports are to be handles while transporting. Here is a short description of each mode (those correspond to the Envelope. DuplicationModeEnum values):

None – duplicate nothing, both the Envelope and the message it transports will be delivered in the same reference as sent (note: this is not valid for remote connections, where passing trough TCP/IP connection mandates a serialization of both); this is the default mode and also the fastest, since no additional operations are performed on the data.

DuplicateMessage will duplicate only the message (aka. payload) that is transported inside the Envelope.

DuplicateEnvelope will duplicate only the envelope, leaving the data inside the same exact reference.

DuplicateBoth will duplicate both the Envelope and the message.


Note that sending messages trough remote connections mandates a serialization, so setting a duplication mode for this usage is obsolete.

Duplication for the message sent (aka. payload) will be performed following this guideline:

• If message class implements ICloneable interface, this interface will be used to generate the new instance

• Otherwise, if the message is a “class”, the default serializer assigned for the Envelope will be used to perform the duplication (this is usually slower; the default serializer is usually .NET BinarySerializer so make sure to make your transported classes [Serializable] if you plan to use it)

Optimization Techniques

The Message Bus supports a fast serialization routine, trough the usage of fast serialization sub-framework (you can read more about it here: http://www.codeproject.com/KB/dotnet/FastSerializer.aspx). In case you need extremely high speed of serializing when transporting your messages, you may want to use the same technique. The process is actually rather simple, and only requires to inherit ISerializable and override the serialization routines manually. Read the article for more details. The Message bus uses this routine for its own internal message types, in order to gain maximum speed where needed. You can look at the Envelope class for an example of how this works.


Implementing the ICloneable interface

If you plan to transport custom objects trough the message bus, and with to have them duplicated along the way, it is best to have them implement the ICloneable interface. It only requires one method, so is rather easy to do. The speed gains that this will bring, when duplicating a locally transported message can be well more than 10x as compared to a standard custom class. In case a class does not implement ICloneable and needs to be duplicated, the Message Bus will try to use the MessageBus.Serializer to clone the item; by default this is the standard .NET Framework BinarySerializer, so make sure to have your classes marked with the [Serializable] attribute.


Implementing the ISerializable interface

Note - this section is intended for usage with the default BinarySerializer; if custom serializers are used, they may or may not make use of this.


Implementing the ISerializable interface on your classes assures that when serializing a message during transport, the most optimal path will be taken, using your custom code.


The Matrix Framework includes an optimized serialization set of classes that can be found under the Matrix.Common.Extended.FastSerialization namespace. You can find more information on the topic here: article. To increase drastically the local speed - implement IClonable on the message class transported, and for remote (trough TCP/IP) invocation use the ISerialization technique.

Performance

The framework has been optimized to deliver high performance, using different optimization techniques – like for ex. lock free multi-thread access (hot swapping).

The actual message performance strongly depends on the type of message transported, the selected execution strategy and the selected duplication strategy. A best case scenario can give well more than 500 000 messages per second on a typical dual core CPU. This figure will also scale nicely on future generation of CPUs, since the framework makes full usage of multi-threading support.

As can be expected – performance when using a TCP.IP connection between two parts of the system (remote connections) is allot lower, and ranges in the 5-10K messages per second on a similar machine. This however strongly depends on the transported message size and is also subject to significant further optimizations. Where needed, performance upgrade can be achieved of up to 100K messages trough the network channel. To do this all advanced features like confirmation must be stripped down, and the serialization of messages optimized as much as possible.

Additional Examples, Test and Demonstration Projects

The full solution of the Matrix Platform (available for download here: http://matrixplatform.com/index.php?title=Downloads) contains a set of samples showing how to use the Message Bus. Those are inside the Matrix.Framework.MessageBus.UnitTest project, and can be executed as a unit test assembly, by using the nUnit testing framework. The full Matrix Platform solution contains unit tests, performance tests and demonstration projects. Results from the performance tests can be found in the Performance section of this article.

modified on 7 July 2010 at 10:47 ••• 35,195 views