Dela via


Modifying PeerChat to use Synchronization

We return to our favorite application, the Chat room.  In this example, we want members who enter and leave the mesh to be able to catch up on what was said while they were gone, so all chat messages will be synchronized throughout the mesh.  This will require writing a simple synchronization layer on top of PeerChannel in our PeerChat example.

Note: We are going to start off by NOT enforce ordering of records, but we will discuss that at the end. In a Chat scenario, you would definitely want your synchronized data to be ordered as well.

Let us begin by walking through the 5 steps mentioned in our previous blog entry.

1. Determine the Data Set - Since every member of the mesh will need to view the entire chat log, and since chat logs are not typically too large, we will have every member of the mesh store the entire chat conversation in a local data store called ChatLog.

2.  Determine the Data Structure - We will use a typical Dictionary structure for chats.  Keys will be chat message IDs (GUIDs), and the records will be a ChatMessage object, containing the message ID, source ID (name of person who sent the chat), and the Chat message itself (a string).  Note that this structure itself is not ordered, so we will need to do some additional work later on to satisfy that constraint.  Note that we also include a hopcount field, since all sync messages will only be send to immediate neighbors.

[

MessageContract]
public class ChatMessage
{
[MessageBodyMember]
    public string msgTxt;

[

MessageBodyMember]
    public Guid msgId;

[

MessageBodyMember]
    public string msgSource;

[

PeerHopCount]
    public int hops;

    public ChatMessage() { }

    public ChatMessage (Guid id, string text, string source)
{
msgId = id;
msgTxt = text;
msgSource = source;
        hops = 1; // Immediate neighbors only
    }
}

3. Sync protocol - We will use a relatively simple, 6-step algorithm as follows:

I) MessageIDRequest: Syncing Node requests a list of message IDs from its immediate neighbors
II) MessageIDResponse: Each neighbor responds with a complete list of message IDs contained in its local data store
III) The syncing Node examines the list of message IDs and generates a list of IDs corresponding to messages missing from its local data store.
IV) MessageContentRequest: The syncing sends its neighbors a content request with a list of message IDs.
V) MessageContentResponse: Each neighbor sends back the complete messages corresponding to the message IDs containing in the content request.
VI) Node A receives the responses containing the messages and adds them to its local data store

This method has two main advantages: the syncing node only receives messages it knows are missing from its local data store, and it can be easily modified such that no two neighbors are required to send back the same messages, reducing duplication.  For example, if the syncing node is joining for the first time, and finds out that it is missing messages 1-9, it can request each of its three neighbors to send the content of 3 messages, thereby distributing the sync load and minimized excessive bandwidth use.  (This last optimization will be left out of this example for simplicity.)

 4. New Message Contract: Below is a interface based off the PeerChat sample in the .NET 3.0 SDK.  After the standard Join, Chat, Leave operations, the sync messages definitions follow.

[

ServiceContract(Namespace = "https://Microsoft.ServiceModel.Samples", CallbackContract = typeof(IChat))]
public interface IChat
{
[OperationContract(IsOneWay = true)] // Sent when a node joins the mesh
    void Join(string member);

    [OperationContract(IsOneWay = true)] // Send to indicate that the node is leaving the mesh
    void Leave(string member);

    [OperationContract(IsOneWay = true)] // A chat message
    void Chat(ChatMessage msg);

    //**** SYNC OPERATIONS
    //******************************

    [OperationContract(IsOneWay = true)] // Requests a list of message IDs
    void MessageIDRequest();

    [OperationContract(IsOneWay = true)] // Responds with a list of message IDs
    void MessageIDReply(List<Guid> messageIDs);

    [OperationContract(IsOneWay = true)] // Requests the messages corresponding to a list of IDs
    void MessageContentRequest(List<Guid> messageIDs);

    [OperationContract(IsOneWay = true)] // Responds with a list of messages
    void MessageContentReply(List<ChatMessage> messages);
}

5. When to run synchronization: In this simple example, we will assume that no messages are dropped when a node is fully connected to the mesh. However, we will still run synchronization every time a node joins the mesh.

Next, let's go over the implementation of the sync functions.  We are introducing a new enumeraion, SyncStatus, to indicate the state of the syncing node.  This allows the node to ignore or process the appropriate messages in the process.  We add this enumeration as a local member variable named synching, to our node implementation.

public enum SyncStatus
{
NotSynching,
IDRequest,
ContentRequest
}

The following functions are being added to ChatApp, our implementation of IChat.  Remember also that the following functions will be executed when the node RECEIVES the given message.

public void MessageIDRequest()
{
    if (this.synching == SyncStatus.NotSynching)
{
        List<Guid> requestIDs = new List<Guid>(this.chatLog.Keys);
        this.participant.MessageIDReply(requestIDs);
}
}

Here, we first ensure that we are not current syncing (and therefore have a complete data set).  The non-syncing node then create a list of message IDs created from its local data store, chatLog (a Dictionary structure), and sends it back to the mesh.

public void MessageIDReply(List<Guid> messageIDs)
{
    if (this.synching == SyncStatus.IDRequest)
{
        this.synching = SyncStatus.ContentRequest;
        List<Guid> responseIDs = new List<Guid>();
        foreach (Guid id in messageIDs)
{
            if (!this.chatLog.ContainsKey(id))
{
                responseIDs.Add(id);
}
}
        if (responseIDs.Count != 0)
        {
            this.participant.MessageContentRequest(responseIDs);
}
else
        {
this.synching = SyncStatus.NotSynching;
        }
    }
}

In this function, the syncing node receives a list of message IDs from a neighbor.  It checks its local data store against each message in the list and generates a list of messages missing from its local data store.  If no messages are missing, syncing is done.  Otherwise, it sends a list of message IDs to its neighbors, requesting the content of those messages.  If the node knows the member IDs of its neighbors, it could split up the list between them and send each a unique content request.  As a computer science professor would say, "that is left as an excercise for the interested student."

public void MessageContentRequest(List<Guid> messageIDs)
{

    if (this.synching == SyncStatus.NotSynching)
    {
        List<ChatMessage> responseContent = new List<ChatMessage>();
        foreach (Guid id in messageIDs)
{
            responseContent.Add(this.chatLog[id]);
}
        if (responseContent.Count != 0)
        {
            this.participant.MessageContentReply(responseContent);
}
}
}

Upon receiving the message content request, a non-syncing node creates a list of chat messages corresponding to the message IDs in the list contained in the MessageContentRequest, and sends back the messages to the syncing node.

public

void MessageContentReply(List<ChatMessage> messages)
{
if (this.synching == SyncStatus.ContentRequest)
{
foreach (ChatMessage msg in messages)
{
this.chatLog.Add(msg.msgId, msg);
}
this.synching = SyncStatus.NotSynching;
}
}

Finally, once the syncing node receives the messages, it adds the messages to its local data store.

Now, all we have left to do with our chat sample is add the following local class varibles to ChatApp:

public

Dictionary<Guid, ChatMessage> chatLog;
public SyncStatus synching;

We will also have a new constructor:

public

ChatApp(string member)
{
this.member = member;
this.chatLog = new Dictionary<Guid, ChatMessage>();
this.synching = SyncStatus.NotSynching;
}

We also need to log each chat message now. The Chat function in ChatApp will need to the following line of code:

this

.chatLog.Add(msg.msgId, msg);

And finally, the node will need to perform the following upon entering the mesh (after calling Join):

instance.synching =

SyncStatus.IDRequest;
instance.participant.MessageIDRequest();

At this point, we now have a chat app that contains a synchronized chat log! Displaying the contents of the chat log can be accomplished by enumerating and printing out to the console the content of the ChatLog variable. For example, if "instance" is the name of our ChatApp instance:

ICollection<ChatMessage> messages = instance.chatLog.Values;
foreach (ChatMessage message in messages)
{
Console.WriteLine("=> [" + message.msgSource + "] " + message.msgTxt);
}

And there you have it! With relatively minor changes, our chat application now contains some basic synchronization, useful for those who come late to the chat room and want to catch up on what they missed.

Extra Credit: Ordering Synchronized Messages

In many applications such as chat, it is vital that the order in which messages are created is maintained. Our example neglects this need in exchange for simplicity. However, a real production chat application would definitely need to address this issue. There are a few complications with this, since messages may arrive in different orders to different nodes, so syncing to different nodes will result in different results. For example, if two nodes on either end of a large mesh send a message, nodes in between may differ as to which message arrives first. Here are a couple potential options that we will leave up to you to implement:

- Add timestamps member to each message and attach the current system time to each message before sending. When displaying a message log, the messages can be sorted by timestamp. The difficulty of this approach is that all machines may not be time synchronized, and so inconsistencies may still result.

- Assign a single member of the mesh to determine the definitive ordering of messages. Naturally, this will make things especially difficult if that member goes down, but has the advantage of creating a single, unified ordering across the entire mesh.

- Add to each message the ID of the message that immediately preceeded it. The data store can then link messages together in a hierarchal tree in case two messages arrive with the same previous ID attached. This has the advantage of maintaining order while keeping track of ambiguities, but nevertheless does not resolve the problem.

Best of luck in implmenting your synchronization! -Jonathan

Comments

  • Anonymous
    August 30, 2007
    This is very interesting! Just a quick question: The underlying p2p framework already provides support for syncing data in a group, right? There is this whole data store, record sync stuff in the win32 API. Any plans to provide managed code support for that?

  • Anonymous
    September 06, 2007
    Hi, the underlying p2p framework (ie Peer channel) does not support synch-support by default. The Win32 apis that you refer to and our Grouping/Graphing algorithms are from our team but totally separate platforms. Not sure there is any immediate plan for providing managed support for those APIs though you might find a better answer if you ask this question on our p2p team blog - http://blogs.msdn.com/p2p Hope that helps!