Xafari Server uses its own implementation of the Message Queue. Clients add tasks (hereinafter messages) to the Queue and the Server processes messages from the Queue. This post provides an example of how to implement client-server interaction via the Message Queue.

Message Queuing work includes the following stages:

  1. The client adds the message to the Queue, the message is decorated with a specified tag. The message is saved in the database.
  2. Xafari Server retrieves the message from the database, finds a handler with the corresponding tag and starts processing the message.
  3. The handler can save the results of their work to the database. The client can get them for further work.

What required, to use Message Queue

To use Message Queue, execute the following steps:

  1. Add Xafari.MQ and Xafari.Server modules to your module.
  2. Define the data type for the message. Create and send a message by calling Instance.InsertMessage<T> method. Also, specify the tag of the message handler.
  3. Define a message handler class, it must implement the IMessageHandler interface. Then register a message handler class on the server. Necessarily specify a tag, each type of message corresponds to a single unique tag. Tag binds message and handler.

Example

Let's develop XafariMQSample solution to examine the Xafari Message Queue. We will create a controller containing AddMessageToQueue Action, that will add a message to the queue. Message have the Index parameter indicating the number of the call Action. After the message is processed, the MessageLog object will be created and stored in the database.

Message

Xafari.MQ message is anĀ MQMessage Domain Component. The MessageData property stores the message data. It is a non-typed reference property of the XPWeakReferenceStruct type. In this example, the message data type will contain only one property.

To add a message to the queue, use the MQManager.InsertMessage<T>(T messageData, string tag, string description) method.

The first parameter specifies the message data, it will be transferred to an instance of the Xafari Server. Tag parameter specifies a tag to find the required handler. The method returns the message key to retrieve the result from the database.

MQManager class exposes CancelMessageExecution(MQMessage mqMessage) method, to cancel the processing of the message. You can cancel only the message that has not yet been processed, i.e. CurrentStatus is "WaitingForExecution".

MessageData class represents the message data of XafariMQSample application.

1
2
3
4
5
[DomainComponent]
public interface MessageData
{
    int Index { get; set; }
}

To add a message to Message Queue use following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private int _index = 0;
private const string MQTag = "MQTag";
 
private void OnAddMessageToQueueActionExecute(object sender, SimpleActionExecuteEventArgs e)
{
    using (var mqObjectSpace = MQManager.Instance.CreateObjectSpace())
    {
        var messageData = mqObjectSpace.CreateObject();
        ++_index;
        messageData.Index = _index;
        mqObjectSpace.CommitChanges();
        MQManager.Instance.InsertMessage(messageData, MQTag, "Message "+_index);
    }
}

Note: to work with messages it is necessary to use MQManager ObjectSpace returned by the MQManager.Instance.CreateObjectSpace() method.

Message Handler

To create a message handler you need to implement IMessageHandler interface. Then you need to register the handler by calling RegisterHandler(string tag, Func<XafApplication, object, IMessageHandler> handlerCreator) method.

The first parameter is a handler tag. For one tag, you can register only one handler. It will be called by Xafari Server to create a handler instance.

The second parameter is a delegate returns IMessageHandler implementation. The first parameter of the delegate specifies the Application, i.e. server runs under the specified Application. The second parameter (object type) specifies a message key, handler instance will retrieve a target message from the database using the key.

Handler registration:

1
2
3
private const string MQTag = "MQTag";
 
XafariServerModule.DataObserver.ServerHandlersStrategy.RegisterHandler(MQTag, (app, messageKey) =&gt; new MessageHandler(app,messageKey));

When implementing IMessageHandler you have to take into account some of the nuances:

  1. Raise OnStartedThread event before the core code of handler. Accordingly, raise OnCompleted and OnStoppedThread events after the end of the core code.
  2. To work with messages use MQManager ObjectSpace.
  3. When implement StartHandler() method immediately block the message by setting its status to "Processing" (see LockReportMessage() method in the sample).

The code snippet below demonstrates the handler implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class MessageHandler:IMessageHandler
{
    private static readonly Random TimeGenerator=new Random();
    private readonly object _messageKey;
    private CancellationToken _cancellationToken;
    private CancellationTokenSource _source;
 
    public event EventHandler OnCompleted = delegate { };
    public event EventHandler OnStartedThread = delegate { };
    public event EventHandler OnStoppedThread = delegate { };
 
    public MessageHandler(XafApplication application, object messageKey)
    {
        Guard.ArgumentNotNull(messageKey, "messageKey");
 
        _messageKey = messageKey;
        _source = new CancellationTokenSource();
    }
 
    public void StartHandler()
    {
        _cancellationToken = _source.Token;
 
        LockReportMessage();
 
        var handlerTask = Task.Factory.StartNew(ProcessMessage, _cancellationToken,      TaskCreationOptions.LongRunning, TaskScheduler.Current);
        Task.Factory.ContinueWhenAll(new[] { handlerTask }, tasks =&gt; FinishProcessing(),   _cancellationToken);
    }
 
    private void LockReportMessage()
    {
        if (!_cancellationToken.IsCancellationRequested)
        {
            using (var objectSpace = MQManager.Instance.CreateObjectSpace())
            {
                var updatedMessage = objectSpace.GetObjectByKey(_messageKey);
                if (updatedMessage.CurrentStatus != MessageStatus.Processing)
                {
                    updatedMessage.CurrentStatus = MessageStatus.Processing;
                    objectSpace.CommitChanges();
                }
             }
        }
    }
 
    private void ProcessMessage()
    {
        OnStartedThread(Thread.CurrentThread, EventArgs.Empty);
 
        if (!_cancellationToken.IsCancellationRequested)
        {
            int threadSleepTime = 1000*TimeGenerator.Next(10);
            Thread.Sleep(threadSleepTime);
 
            using (var objectSpace = MQManager.Instance.CreateObjectSpace())
            {
                var message = objectSpace.GetObjectByKey(_messageKey);
                var dbObjectSpace = XafariModule.Application.ObjectSpaceProvider.CreateObjectSpace();
                var messageData = (MessageData) message.MessageData.GetTargetObject(objectSpace);
                var log = dbObjectSpace.CreateObject();
                log.Index = messageData.Index;
                log.CreationTime = DateTime.Now;
                log.Name = message.Description;
                log.ThreadSleepTime = threadSleepTime;
                dbObjectSpace.CommitChanges();
 
                message.CurrentStatus = MessageStatus.Finished;
            }
        }
    }
 
    private void FinishProcessing()
    {
        OnCompleted(this, EventArgs.Empty);
        OnStoppedThread(Thread.CurrentThread, EventArgs.Empty);
        Dispose();
    }
}

Load solution: XafariMQSample.

Write US