So far, all our test exists inside the same process, so they communicate through variables, in real-life scenarios nodes won’t exist in the same process and most of the time not even in the same location.

The easiest and most standard way to implement client-server communication in the dotnet world is a rest API, so let’s define an API client and a server service, lets start with the API client

ISyncFrameworkClient

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncFrameworkClient.cs

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public interface ISyncFrameworkClient
{
Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken);
Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);
}
public interface ISyncFrameworkClient { Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken); Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken); }
public interface ISyncFrameworkClient
{
    Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken);
    Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);
}

As you can see in the code above, this interface is really simple because the client only implements 2 operations

  • Fetch: downloads the deltas from the server
  • Push: upload the deltas from our delta store to the server

Now we should create an implementation for this interface, see the code below

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/SyncFrameworkHttpClient.cs

 

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public class SyncFrameworkHttpClient : ISyncFrameworkClient
{
HttpClient _httpClient;
public string DeltaStoreId { get; }
public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId)
{
this.DeltaStoreId = NodeId;
_httpClient = httpClient;
_httpClient.DefaultRequestHeaders.Add("NodeId", NodeId);
this.DeltaStoreId = NodeId;
}
public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId)
{
}
public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken = default)
{
try
{
List<Delta> toserialzie = new List<Delta>();
foreach (IDelta delta in Deltas)
{
toserialzie.Add(new Delta(delta));
}
cancellationToken.ThrowIfCancellationRequested();
DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>));
MemoryStream msObj = new MemoryStream();
js.WriteObject(msObj, toserialzie);
msObj.Position = 0;
StreamReader sr = new StreamReader(msObj);
string jsonDeltas = sr.ReadToEnd();
var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json");
await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
var message = ex.Message;
throw;
}
}
public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
var QueryParams = new Dictionary<string, string>();
QueryParams.Add(nameof(startindex), startindex.ToString());
QueryParams.Add(nameof(identity), identity);
cancellationToken.ThrowIfCancellationRequested();
var query = HttpUtility.ParseQueryString("");
foreach (KeyValuePair<string, string> CurrentParam in QueryParams)
{
query[CurrentParam.Key] = CurrentParam.Value;
}
var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false);
using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse)))
{
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>));
List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms);
return Deltas;
}
//List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse);
return null;
}
}
public class SyncFrameworkHttpClient : ISyncFrameworkClient { HttpClient _httpClient; public string DeltaStoreId { get; } public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId) { this.DeltaStoreId = NodeId; _httpClient = httpClient; _httpClient.DefaultRequestHeaders.Add("NodeId", NodeId); this.DeltaStoreId = NodeId; } public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId) { } public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken = default) { try { List<Delta> toserialzie = new List<Delta>(); foreach (IDelta delta in Deltas) { toserialzie.Add(new Delta(delta)); } cancellationToken.ThrowIfCancellationRequested(); DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>)); MemoryStream msObj = new MemoryStream(); js.WriteObject(msObj, toserialzie); msObj.Position = 0; StreamReader sr = new StreamReader(msObj); string jsonDeltas = sr.ReadToEnd(); var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json"); await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { var message = ex.Message; throw; } } public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default) { var QueryParams = new Dictionary<string, string>(); QueryParams.Add(nameof(startindex), startindex.ToString()); QueryParams.Add(nameof(identity), identity); cancellationToken.ThrowIfCancellationRequested(); var query = HttpUtility.ParseQueryString(""); foreach (KeyValuePair<string, string> CurrentParam in QueryParams) { query[CurrentParam.Key] = CurrentParam.Value; } var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false); using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse))) { DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>)); List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms); return Deltas; } //List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse); return null; } }
public class SyncFrameworkHttpClient : ISyncFrameworkClient
{
    HttpClient _httpClient;
    public string DeltaStoreId { get; }
    public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId)
    {
        this.DeltaStoreId = NodeId;
        _httpClient = httpClient;
        _httpClient.DefaultRequestHeaders.Add("NodeId", NodeId);
      
        this.DeltaStoreId = NodeId;
    }
    public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId)
    {
       
    }
    public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken  = default)
    {
       
        try
        {
            List<Delta> toserialzie = new List<Delta>();
            foreach (IDelta delta in Deltas)
            {
                toserialzie.Add(new Delta(delta));
            }
            cancellationToken.ThrowIfCancellationRequested();

            DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>));
            MemoryStream msObj = new MemoryStream();
            js.WriteObject(msObj, toserialzie);
            msObj.Position = 0;
            StreamReader sr = new StreamReader(msObj);
            string jsonDeltas = sr.ReadToEnd();

            var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json");
            await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            var message = ex.Message;
            throw;
        }
      
    }
    public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default)
    {
        var QueryParams = new Dictionary<string, string>();
        QueryParams.Add(nameof(startindex), startindex.ToString());
        QueryParams.Add(nameof(identity), identity);
        cancellationToken.ThrowIfCancellationRequested();
        var query = HttpUtility.ParseQueryString("");
        foreach (KeyValuePair<string, string> CurrentParam in QueryParams)
        {
            query[CurrentParam.Key] = CurrentParam.Value;
        }
        var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false);

        using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse)))
        {

            DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>));
            List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms);

            return Deltas;
        }

        //List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse);
        return null;

    }

}

 

it’s an implementation of the ISyncFrameworkClient interface using HTTP communication

  • Fetch: uses an HTTP get request
  • Push: uses an HTTP post request

Also, the “nodeid” header is added to the request, you will understand why when we implement the server part.

Now that we have defined the contract for the client and also provided the base implementation using an HTTP client, its time to define what a client node is, please take a look at the code below

ISyncClientNodeExtensions

https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncClientNodeExtensions.cs

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public static class ISyncClientNodeExtensions
{
public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false);
return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false);
}
public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false);
Guid index = Deltas.Max(d => d.Index);
await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false);
}
}
public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false);
var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
var Max = Deltas.Max(d => d.Index);
await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false);
await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false);
}
}
}
public static class ISyncClientNodeExtensions { public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false); return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false); } public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false); if (Deltas.Any()) { await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false); Guid index = Deltas.Max(d => d.Index); await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false); } } public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false); var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false); if (Deltas.Any()) { var Max = Deltas.Max(d => d.Index); await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false); await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false); } } }
    public static class ISyncClientNodeExtensions
    {

        public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false);
            return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false);
        }
        public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false);
            if (Deltas.Any())
            {
                await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false);
                Guid index = Deltas.Max(d => d.Index);
                await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false);
            }

        }
        public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false);
            var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false);
            if (Deltas.Any())
            {
                var Max = Deltas.Max(d => d.Index);
                await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false);
                await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false);
            }



        }
    }

so, this is how the SyncClientNode is structured

Let’s move to the server-side now, here the idea is to be able to host multiple delta store and delta processors and also to be able to introduce custom logic either saving the deltas into the delta store or processing the deltas into a data object

ISyncServerNode

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServerNode.cs

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public interface ISyncServerNode
{
string NodeId { get; set; }
Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, string identity, CancellationToken cancellationToken);
Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}
public interface ISyncServerNode { string NodeId { get; set; } Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken); Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, string identity, CancellationToken cancellationToken); Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken); }
public interface ISyncServerNode
{
    string NodeId { get; set; }
    Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
    Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, string identity, CancellationToken cancellationToken);
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

Now we need to define a server, so here is the interface for the SyncServer

ISyncServer

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServer.cs

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public interface ISyncServer
{
IEnumerable<ISyncServerNode> Nodes { get; }
Task<IEnumerable<IDelta>> GetDeltasAsync(string name, Guid startindex, string identity, CancellationToken cancellationToken);
Task ProcessDeltasAsync(string Name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
Task SaveDeltasAsync(string name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}
public interface ISyncServer { IEnumerable<ISyncServerNode> Nodes { get; } Task<IEnumerable<IDelta>> GetDeltasAsync(string name, Guid startindex, string identity, CancellationToken cancellationToken); Task ProcessDeltasAsync(string Name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken); Task SaveDeltasAsync(string name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken); }
public interface ISyncServer
{
    IEnumerable<ISyncServerNode> Nodes { get; }
    Task<IEnumerable<IDelta>> GetDeltasAsync(string name, Guid startindex, string identity, CancellationToken cancellationToken);
    Task ProcessDeltasAsync(string Name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
    Task SaveDeltasAsync(string name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see, the members are almost the same as the sync node, this design allows us to have more than one node on the server-side

Here is the implementation of the SyncServer

SyncServer

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/SyncServer.cs

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public class SyncServer: ISyncServer
{
IEnumerable<ISyncServerNode> _Nodes;
public SyncServer(params ISyncServerNode[] Nodes)
{
this._Nodes = Nodes;
}
public IEnumerable<ISyncServerNode> Nodes => _Nodes;
public async Task<IEnumerable<IDelta>> GetDeltasAsync(string NodeId, Guid startindex, string identity, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return await Node.GetDeltasAsync(startindex, identity, cancellationToken).ConfigureAwait(false);
}
IEnumerable<IDelta> result = new List<IDelta>();
return result;
}
public Task ProcessDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return Node.ProcessDeltasAsync(deltas, cancellationToken);
}
return null;
}
private ISyncServerNode GetNode(string NodeId)
{
return Nodes.FirstOrDefault(node => node.NodeId == NodeId);
}
public Task SaveDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return Node.SaveDeltasAsync(deltas, cancellationToken);
}
return Task.CompletedTask;
}
}
public class SyncServer: ISyncServer { IEnumerable<ISyncServerNode> _Nodes; public SyncServer(params ISyncServerNode[] Nodes) { this._Nodes = Nodes; } public IEnumerable<ISyncServerNode> Nodes => _Nodes; public async Task<IEnumerable<IDelta>> GetDeltasAsync(string NodeId, Guid startindex, string identity, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); ISyncServerNode Node = GetNode(NodeId); if (Node != null) { return await Node.GetDeltasAsync(startindex, identity, cancellationToken).ConfigureAwait(false); } IEnumerable<IDelta> result = new List<IDelta>(); return result; } public Task ProcessDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); ISyncServerNode Node = GetNode(NodeId); if (Node != null) { return Node.ProcessDeltasAsync(deltas, cancellationToken); } return null; } private ISyncServerNode GetNode(string NodeId) { return Nodes.FirstOrDefault(node => node.NodeId == NodeId); } public Task SaveDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); ISyncServerNode Node = GetNode(NodeId); if (Node != null) { return Node.SaveDeltasAsync(deltas, cancellationToken); } return Task.CompletedTask; } }
public class SyncServer: ISyncServer
{


    IEnumerable<ISyncServerNode> _Nodes;
   
    public SyncServer(params ISyncServerNode[] Nodes)
    {
        this._Nodes = Nodes;
    }

    public IEnumerable<ISyncServerNode> Nodes => _Nodes;

   

    public async Task<IEnumerable<IDelta>> GetDeltasAsync(string NodeId, Guid startindex, string identity, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();
        ISyncServerNode Node = GetNode(NodeId);
        if (Node != null)
        {
            return await Node.GetDeltasAsync(startindex, identity, cancellationToken).ConfigureAwait(false);
        }

        IEnumerable<IDelta> result = new List<IDelta>();
        return result;
    }
    public Task ProcessDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();
        ISyncServerNode Node = GetNode(NodeId);
        if (Node != null)
        {
            return Node.ProcessDeltasAsync(deltas, cancellationToken);
        }
        return null;

    }

    private ISyncServerNode GetNode(string NodeId)
    {
        return Nodes.FirstOrDefault(node => node.NodeId == NodeId);
    }

    public Task SaveDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        ISyncServerNode Node = GetNode(NodeId);

        if (Node != null)
        {
            return Node.SaveDeltasAsync(deltas, cancellationToken);
        }
        return Task.CompletedTask;
    }
}

the following pictures show the 2 possible server implementations

 

and that’s it for this post, in the next post I will show the test cases for this implementation