by Joche Ojeda | Oct 17, 2021 | Data Synchronization
So far, all our test exists inside the same process, so they communicate through variables, in real-life scenarios nodes won’t exist in the same process and most of the time not even in the same location.
The easiest and most standard way to implement client-server communication in the dotnet world is a rest API, so let’s define an API client and a server service, lets start with the API client
ISyncFrameworkClient
https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncFrameworkClient.cs
public interface ISyncFrameworkClient
{
Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken);
Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);
}
As you can see in the code above, this interface is really simple because the client only implements 2 operations
- Fetch: downloads the deltas from the server
- Push: upload the deltas from our delta store to the server
Now we should create an implementation for this interface, see the code below
https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/SyncFrameworkHttpClient.cs
public class SyncFrameworkHttpClient : ISyncFrameworkClient
{
HttpClient _httpClient;
public string DeltaStoreId { get; }
public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId)
{
this.DeltaStoreId = NodeId;
_httpClient = httpClient;
_httpClient.DefaultRequestHeaders.Add("NodeId", NodeId);
this.DeltaStoreId = NodeId;
}
public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId)
{
}
public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken = default)
{
try
{
List<Delta> toserialzie = new List<Delta>();
foreach (IDelta delta in Deltas)
{
toserialzie.Add(new Delta(delta));
}
cancellationToken.ThrowIfCancellationRequested();
DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>));
MemoryStream msObj = new MemoryStream();
js.WriteObject(msObj, toserialzie);
msObj.Position = 0;
StreamReader sr = new StreamReader(msObj);
string jsonDeltas = sr.ReadToEnd();
var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json");
await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
var message = ex.Message;
throw;
}
}
public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
var QueryParams = new Dictionary<string, string>();
QueryParams.Add(nameof(startindex), startindex.ToString());
QueryParams.Add(nameof(identity), identity);
cancellationToken.ThrowIfCancellationRequested();
var query = HttpUtility.ParseQueryString("");
foreach (KeyValuePair<string, string> CurrentParam in QueryParams)
{
query[CurrentParam.Key] = CurrentParam.Value;
}
var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false);
using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse)))
{
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>));
List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms);
return Deltas;
}
//List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse);
return null;
}
}
it’s an implementation of the ISyncFrameworkClient interface using HTTP communication
- Fetch: uses an HTTP get request
- Push: uses an HTTP post request
Also, the “nodeid” header is added to the request, you will understand why when we implement the server part.
Now that we have defined the contract for the client and also provided the base implementation using an HTTP client, its time to define what a client node is, please take a look at the code below
ISyncClientNodeExtensions
https://github.com/egarim/SyncFramework/tree/main/src/BIT.Data.Sync/Client/ISyncClientNodeExtensions.cs
public static class ISyncClientNodeExtensions
{
public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false);
return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false);
}
public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false);
Guid index = Deltas.Max(d => d.Index);
await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false);
}
}
public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false);
var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
var Max = Deltas.Max(d => d.Index);
await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false);
await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false);
}
}
}
so, this is how the SyncClientNode is structured
Let’s move to the server-side now, here the idea is to be able to host multiple delta store and delta processors and also to be able to introduce custom logic either saving the deltas into the delta store or processing the deltas into a data object
ISyncServerNode
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServerNode.cs
public interface ISyncServerNode
{
string NodeId { get; set; }
Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, string identity, CancellationToken cancellationToken);
Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}
Now we need to define a server, so here is the interface for the SyncServer
ISyncServer
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/ISyncServer.cs
public interface ISyncServer
{
IEnumerable<ISyncServerNode> Nodes { get; }
Task<IEnumerable<IDelta>> GetDeltasAsync(string name, Guid startindex, string identity, CancellationToken cancellationToken);
Task ProcessDeltasAsync(string Name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
Task SaveDeltasAsync(string name, IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}
As you can see, the members are almost the same as the sync node, this design allows us to have more than one node on the server-side
Here is the implementation of the SyncServer
SyncServer
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Server/SyncServer.cs
public class SyncServer: ISyncServer
{
IEnumerable<ISyncServerNode> _Nodes;
public SyncServer(params ISyncServerNode[] Nodes)
{
this._Nodes = Nodes;
}
public IEnumerable<ISyncServerNode> Nodes => _Nodes;
public async Task<IEnumerable<IDelta>> GetDeltasAsync(string NodeId, Guid startindex, string identity, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return await Node.GetDeltasAsync(startindex, identity, cancellationToken).ConfigureAwait(false);
}
IEnumerable<IDelta> result = new List<IDelta>();
return result;
}
public Task ProcessDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return Node.ProcessDeltasAsync(deltas, cancellationToken);
}
return null;
}
private ISyncServerNode GetNode(string NodeId)
{
return Nodes.FirstOrDefault(node => node.NodeId == NodeId);
}
public Task SaveDeltasAsync(string NodeId, IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ISyncServerNode Node = GetNode(NodeId);
if (Node != null)
{
return Node.SaveDeltasAsync(deltas, cancellationToken);
}
return Task.CompletedTask;
}
}
the following pictures show the 2 possible server implementations
and that’s it for this post, in the next post I will show the test cases for this implementation
by Joche Ojeda | Oct 12, 2021 | Data Synchronization
Well, it’s time to create our first implementation, first, we need a place to store the deltas generated in the process of tracking changes in a data object.
To keep the Implementation simple, we will create a delta store that saves the deltas in memory. This delta store can also be used for testing purposes
MemoryDeltaStore
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/MemoryDeltaStore.cs
public class MemoryDeltaStore : BIT.Data.Sync.DeltaStoreBase
{
IList<IDelta> Deltas;
public MemoryDeltaStore(IEnumerable<IDelta> Deltas)
{
this.Deltas = new List<IDelta>(Deltas);
}
protected MemoryDeltaStore()
{
}
//TODO fix the use of MemoryDb
public MemoryDeltaStore(DeltaStoreSettings deltaStoreSettings) : base(deltaStoreSettings)
{
}
public async override Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IDelta delta in deltas)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Add(new Delta(delta));
}
}
public override Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var result = Deltas.Where(d => d.Index.CompareTo(startindex) > 0 && string.Compare(d.Identity, identity, StringComparison.Ordinal) != 0);
return Task.FromResult(result.Cast<IDelta>());
}
public override Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult(Deltas.Where(d => d.Index.CompareTo(startindex) > 0).ToList().Cast<IDelta>());
}
Guid LastProcessedDelta;
public override async Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default)
{
return LastProcessedDelta;
}
public override async Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastProcessedDelta = Index;
}
Guid LastPushedDelta;
public async override Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken)
{
return LastPushedDelta;
}
public async override Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastPushedDelta = Index;
}
public async override Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Deltas.Count(d => d.Index.CompareTo(startindex) > 0);
}
public async override Task PurgeDeltasAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Clear();
}
}
Now that we have a delta store in place, we need a data object, something that we can use to generate data and track how the data is changing, so again for test purposes, I have implemented a small in-memory database
SimpleDatabase
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabase.cs
public class SimpleDatabase
{
public IDeltaProcessor DeltaProcessor { get; set; }
public string Identity { get; set; }
public IDeltaStore DeltaStore { get; set; }
public SimpleDatabase(IDeltaStore deltaStore, string identity, List<SimpleDatabaseRecord> Data)
{
Identity = identity;
DeltaStore = deltaStore;
this.Data= Data;
}
List<SimpleDatabaseRecord> Data;
public async void Update(SimpleDatabaseRecord Instance)
{
var ObjectToUpdate = Data.FirstOrDefault(x => x.Key == Instance.Key);
if (ObjectToUpdate != null)
{
var Index = Data.IndexOf(ObjectToUpdate);
Data[Index] = Instance;
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Update, Instance);
await SaveDelta(item);
}
}
private async Task SaveDelta(SimpleDatabaseModification item)
{
var Delta = DeltaStore.CreateDelta(Identity,item);
await DeltaStore.SaveDeltasAsync(new List<IDelta>() { Delta }, default);
}
public void Delete(SimpleDatabaseRecord Instance)
{
var ObjectToDelete= Data.FirstOrDefault(x=>x.Key==Instance.Key);
if(ObjectToDelete!=null)
{
Data.Remove(ObjectToDelete);
}
}
public async Task Add(SimpleDatabaseRecord Instance)
{
Data.Add(Instance);
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Add, Instance);
await SaveDelta(item);
}
}
In the class above I have implemented methods to add, delete and update a record. Inside each method I create an instance of an object called SimpleDatabaseModification, I used that object to keep track of which operation is happening and keep a copy of the instance being handle at the moment, that is what we are going to save as a delta.
SimpleDatabaseModification
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabaseModification.cs
public class SimpleDatabaseModification
{
public OperationType Operation { get; set; }
public SimpleDatabaseModification(OperationType operation, SimpleDatabaseRecord record)
{
Operation = operation;
Record = record;
}
public SimpleDatabaseRecord Record { get; set; }
}
Now since the SimpleDatabase is saving the records on a list the next step is to create a processor that gets the information out of the delta and use it to recreate that list, so here is the delta processor
SimpleDatabaseDeltaProcessor
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabaseDeltaProcessor.cs
public class SimpleDatabaseDeltaProcessor :DeltaProcessorBase
{
List<SimpleDatabaseRecord> _CurrentText;
public SimpleDatabaseDeltaProcessor(DeltaStoreSettings deltaStoreSettings, List<SimpleDatabaseRecord> CurrentData) : base(deltaStoreSettings)
{
_CurrentText= CurrentData;
}
public override Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IDelta delta in deltas)
{
cancellationToken.ThrowIfCancellationRequested();
var Modification= this.GetDeltaOperations<SimpleDatabaseModification>(delta);
switch (Modification.Operation)
{
case OperationType.Add:
this._CurrentText.Add(Modification.Record);
break;
case OperationType.Delete:
var ObjectToDelete= this._CurrentText.FirstOrDefault(x=>x.Key==Modification.Record.Key);
this._CurrentText.Remove(ObjectToDelete);
break;
case OperationType.Update:
var ObjectToUpdate = this._CurrentText.FirstOrDefault(x => x.Key == Modification.Record.Key);
var Index= this._CurrentText.IndexOf(ObjectToUpdate);
this._CurrentText[Index] = Modification.Record;
break;
}
}
return Task.CompletedTask;
}
}
Well, that is for this post, in the next post we will create some test scenarios to test our implementations
by Joche Ojeda | Oct 12, 2021 | Data Synchronization
Now that we have defined the bases contracts necessary for synchronization, we can define some base classes that implement those contracts, the main idea behind these base classes is to, later on, add the possibility to inject configurations with .net dependency injection.
Let’s start with the delta implementation
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Delta.cs
/// <summary>
/// An implementation of the IDelta interface, this class is primary used for serialization and transportation purpose
/// </summary>
public class Delta : IDelta
{
public Delta()
{
}
public static Guid GenerateComb()
{
return Provider.PostgreSql.Create();
}
public Delta(string identity, byte[] operation, bool processed = false)
{
Identity = identity;
Operation = operation;
Processed = processed;
}
public Delta(IDelta Delta)
{
Identity = Delta.Identity;
Index = Delta.Index;
Operation = Delta.Operation;
}
public Delta(string identity, Guid index, byte[] operation, bool processed = false)
{
Identity = identity;
Index = index;
Operation = operation;
Processed = processed;
}
public virtual DateTime Date { get; set; }
public virtual string Identity { get; set; }
public virtual Guid Index { get; set; }
public virtual byte[] Operation { get; set; }
public virtual bool Processed { get; set; }
public virtual double Epoch { get; set; }
}
Now the delta store
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaStoreBase.cs
public abstract class DeltaStoreBase : IDeltaStore
{
protected DeltaStoreBase()
{
}
protected DeltaStoreSettings _deltaStoreSettings;
public string Identity { get; private set; }
public DeltaStoreBase(DeltaStoreSettings deltaStoreSettings)
{
this._deltaStoreSettings = deltaStoreSettings;
Setup();
}
protected virtual void Setup()
{
}
public abstract Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default);
public abstract Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default);
//public abstract Task<IEnumerable<IDelta>> GetDeltasToSendAsync(Guid startindex, CancellationToken cancellationToken = default);
public abstract Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default);
public abstract Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);
public abstract Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default);
public void SetIdentity(string Identity)
{
this.Identity = Identity;
}
public abstract Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken = default);
public abstract Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default);
public abstract Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken=default);
public abstract Task PurgeDeltasAsync(CancellationToken cancellationToken);
}
and finally, the delta processor
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/DeltaProcessorBase.cs
public abstract class DeltaProcessorBase : IDeltaProcessor
{
protected DeltaStoreSettings _deltaStoreSettings;
public DeltaProcessorBase(DeltaStoreSettings deltaStoreSettings)
{
_deltaStoreSettings = deltaStoreSettings;
}
public abstract Task ProcessDeltasAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken);
}
That’s it for this post, see you on the next post “Planning the first implementation”
by Joche Ojeda | Oct 11, 2021 | Data Synchronization
Ok in the last post we defined the conceptual parts of a synchronization framework, now let’s create the code that represents those parts
Delta
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/IDelta.cs
/// <summary>
/// Represents a transaction made to the database
/// </summary>
public interface IDelta
{
double Epoch { get; set; }
/// <summary>
/// Who created the delta
/// </summary>
string Identity { get; set; }
/// <summary>
/// The unique identifier of the delta
/// </summary>
Guid Index { get; }
/// <summary>
/// The database transaction(s) that represents this delta
/// </summary>
byte[] Operation { get; set; }
}
Epoch: The date when the operation happened
Identity: Who created the delta
Index: A sortable GUID
Operation: The database transaction(s) that represents this delta
Delta Processor
https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaProcessor.cs
public interface IDeltaProcessor
{
/// <summary>
/// Extracts the content of an IEnumerable of deltas and process it on the current data object
/// </summary>
/// <param name="deltas">an IEnumerable of deltas</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An empty task</returns>
Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}
As you can see the delta processor is really simple, it only contains one method that is in charge of getting the content of a group of deltas and process those differences in the current data object
Delta Store
https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaStore.cs
public interface IDeltaStore
{
string Identity { get; }
void SetIdentity(string Identity);
/// <summary>
/// Saves the IEnumerable<IDelta> of deltas in the current store
/// </summary>
/// <param name="deltas">The IEnumerable<IDelta> to be saved</param>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>An empty task</returns>
Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
/// <summary>
/// Gets an IEnumerable<IDelta> of deltas generated by other nodes with indeces greater than the start index
/// </summary>
/// <param name="startindex">The start index</param>
/// <param name="myIdentity">The identity of the current node </param>
/// <param name="cancellationToken">a Cancellation token</param>
/// <returns>An IEnumerable with deltas generated by other nodes</returns>
Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string myIdentity, CancellationToken cancellationToken);
/// <summary>
/// Get all deltas in the store with an index greater than the start index
/// </summary>
/// <param name="startindex">The start index</param>
/// <param name="cancellationToken">a cancellation token</param>
/// <returns>An IEnumerable of deltas</returns>
Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken);
/// <summary>
/// Gets the count of deltas with indeces greater that the start index
/// </summary>
/// <param name="startindex">The start index</param>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>The count</returns>
Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken);
/// <summary>
/// Gets the index of the last delta process by this data object
/// </summary>
/// <param name="cancellationToken"> cancellation token</param>
/// <returns>The index of the last delta process by this data object</returns>
Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken);
/// <summary>
/// Sets the index of the last delta process by this data object
/// </summary>
/// <param name="Index">The index to be saved</param>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>An empty task</returns>
Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken);
/// <summary>
/// Gets the index of the last delta pushed to the server node
/// </summary>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>the index of the last delta pushed to the server node</returns>
Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken);
/// <summary>
/// Sets the index of the last delta pushed to the server node
/// </summary>
/// <param name="Index">The index to be saved</param>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>An empty task</returns>
Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken);
/// <summary>
/// Delete all deltas in the store
/// </summary>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>An empty task</returns>
Task PurgeDeltasAsync(CancellationToken cancellationToken);
}
SaveDeltasAsync :Saves the IEnumerable<IDelta> of deltas in the current store
GetDeltasFromOtherNodes: Gets an IEnumerable<IDelta> of deltas generated by other nodes with indices greater than the start index
GetDeltasAsync: Get all deltas in the store with an index greater than the start index
GetDeltaCountAsync: Gets the count of deltas with indices greater than the start index
GetLastProcessedDeltaAsync: Gets the index of the last delta process by this data object
SetLastProcessedDeltaAsync: Sets the index of the last delta process by this data object
GetLastPushedDeltaAsync: Gets the index of the last delta pushed to the server node
SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken): Sets the index of the last delta pushed to the server node
PurgeDeltasAsync: Delete all deltas in the store
That’s all for this post in the next post we will define the bases classes that implement the interfaces described above
by Joche Ojeda | Oct 10, 2021 | Data Synchronization
In the last post, we talked about what are deltas and how by using them we can synchronize data structures.
So, in this post, I will describe the necessary parts needed to implement Delta-based synchronization, let’s start
- Data Object: any database, object, graph, or file system that we are tracking for synchronization
- Delta: a delta is a data difference between the original and the current state of a data object
- Node: is a point of the synchronization network, there are 2 types of nodes
- Client node: a node that is able to produce and process deltas
- Server node: a node that is used only to exchange deltas, it can optionally process deltas too.
- Delta Store: storage where you can save deltas so you can later exchange them with other nodes
- Delta Processor: a utility that helps you includes the deltas created in other nodes in your current copy of the data object
Now let’s implement this concept to synchronize a database
Ok so each database that we want to synchronize will be a client node and a client node should be able to produce and store deltas and to process deltas produced by other nodes, so our database node should look like the following diagram
The server node can be as simple as just delta storage exposed in an HTTP API as you can see in the basic server node diagram or use several delta storages and delta processors as show on the complex server node diagram
And with those diagrams, I finish this post, in the next post we will implement those concepts in C# code
by Joche Ojeda | Oct 10, 2021 | Data Synchronization
To Synchronize data is one of the most challenging tasks out there, especially if you are working with LOB applications
There are many ways to synchronize data, the most common technique is to compare records by modification date and then merge the data to create a master record.
Here the main problem is that you have to have a way to compare each type of record, so it gets cumbersome as soon as the number of types in your application begins to grow.
Also, you have to have a log of what gets created and what gets deleted so you can do the same changes on the other nodes
Delta-based synchronization
delta-based synchronization is a problem of identifying “what changed” between each execution of a sync process
A directed delta also called a change, is a sequence of (elementary) change operations which, when applied to one version V1, yields another version V2 (note the correspondence to transaction logs in databases). In computer implementations
In delta synchronization, there are 2 main tasks
- Record the deltas (or small difference of data between nodes)
- Transmit these differences to the other nodes so they can process them
Implementing Delta-based synchronization for relational databases
The schema above represents a blog’s database, it’s a really simple schema so its easy to understand, now this is the scenario
We have the main database that we will call “Master” and 2 other databases named client A and client B.
Now let insert data in the master
Each DML statement should be converted in a delta (or a data difference)
Δ1
Δ2
Δ3
Copy deltas Δ 1, Δ 2, Δ 3 to the clients
So, after processing the deltas on each client, the data in all databases should look like the picture below
So that’s it for this post, in the next post we will be examing each part that is necessary to do delta-based synchronization