Let’s write a Synchronization Framework in C#

Let’s write a Synchronization Framework in C#

Ok in the last post we defined the conceptual parts of a synchronization framework, now let’s create the code that represents those parts

Delta

https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/IDelta.cs

 

/// <summary>
/// Represents a transaction made to the database 
/// </summary>
public interface IDelta
{
   double Epoch { get; set; }
    /// <summary>
    /// Who created the delta
    /// </summary>
    string Identity { get; set; }
    /// <summary>
    /// The unique identifier of the delta
    /// </summary>
    Guid Index { get; }
    /// <summary>
    /// The database transaction(s) that represents this delta
    /// </summary>
    byte[] Operation { get; set; }  
}

 

Epoch: The date when the operation happened

Identity: Who created the delta

Index: A sortable GUID

Operation: The database transaction(s) that represents this delta

 

Delta Processor

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaProcessor.cs

public interface IDeltaProcessor
{
    /// <summary>
    /// Extracts the content of an IEnumerable of deltas and process it on the current data object
    /// </summary>
    /// <param name="deltas">an IEnumerable of deltas</param>
    /// <param name="cancellationToken">Cancellation token</param>
    /// <returns>An empty task</returns>
    Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
}

As you can see the delta processor is really simple, it only contains one method that is in charge of getting the content of a group of deltas and process those differences in the current data object

 

Delta Store

https://github.com/egarim/SyncFramework/blob/main/src/src/BIT.Data.Sync/IDeltaStore.cs

public interface IDeltaStore
    {
        string Identity { get; }
        void SetIdentity(string Identity);
        /// <summary>
        /// Saves the IEnumerable<IDelta> of deltas in the current store
        /// </summary>
        /// <param name="deltas">The IEnumerable<IDelta> to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken);
        /// <summary>
        /// Gets an IEnumerable<IDelta> of deltas generated by other nodes with indeces greater than the start index 
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="myIdentity">The identity of the current node </param>
        /// <param name="cancellationToken">a Cancellation token</param>
        /// <returns>An IEnumerable with deltas generated by other nodes</returns>
        Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string myIdentity, CancellationToken cancellationToken);
        /// <summary>
        /// Get all deltas in the store with an index greater than the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">a cancellation token</param>
        /// <returns>An IEnumerable of deltas</returns>
        Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the count of deltas with indeces greater that the start index
        /// </summary>
        /// <param name="startindex">The start index</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>The count</returns>
        Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken);
        /// <summary>
        /// Gets the index of the last delta process by this data object
        /// </summary>
        /// <param name="cancellationToken"> cancellation token</param>
        /// <returns>The index of the last delta process by this data object</returns>
        Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta process by this data object
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        ///  Gets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>the index of the last delta pushed to the server node</returns>
        Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken);
        /// <summary>
        /// Sets the index of the last delta pushed to the server node
        /// </summary>
        /// <param name="Index">The index to be saved</param>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken);
        /// <summary>
        /// Delete all deltas in the store
        /// </summary>
        /// <param name="cancellationToken">A cancellation token</param>
        /// <returns>An empty task</returns>
        Task PurgeDeltasAsync(CancellationToken cancellationToken);

    }

SaveDeltasAsync :Saves the IEnumerable<IDelta> of deltas in the current store

GetDeltasFromOtherNodes: Gets an IEnumerable<IDelta> of deltas generated by other nodes with indices greater than the start index

GetDeltasAsync: Get all deltas in the store with an index greater than the start index

GetDeltaCountAsync: Gets the count of deltas with indices greater than the start index

GetLastProcessedDeltaAsync: Gets the index of the last delta process by this data object

SetLastProcessedDeltaAsync: Sets the index of the last delta process by this data object

GetLastPushedDeltaAsync: Gets the index of the last delta pushed to the server node

SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken): Sets the index of the last delta pushed to the server node

PurgeDeltasAsync: Delete all deltas in the store

That’s all for this post in the next post we will define the bases classes that implement the interfaces described above

Parts of a Synchronization Framework

Parts of a Synchronization Framework

In the last post, we talked about what are deltas and how by using them we can synchronize data structures.

So, in this post, I will describe the necessary parts needed to implement Delta-based synchronization, let’s start

  • Data Object: any database, object, graph, or file system that we are tracking for synchronization
  • Delta: a delta is a data difference between the original and the current state of a data object
  • Node: is a point of the synchronization network, there are 2 types of nodes
    1. Client node: a node that is able to produce and process deltas
    2. Server node: a node that is used only to exchange deltas, it can optionally process deltas too.
  • Delta Store: storage where you can save deltas so you can later exchange them with other nodes
  • Delta Processor: a utility that helps you includes the deltas created in other nodes in your current copy of the data object

Now let’s implement this concept to synchronize a database

Ok so each database that we want to synchronize will be a client node and a client node should be able to produce and store deltas and to process deltas produced by other nodes, so our database node should look like the following diagram

The server node can be as simple as just delta storage exposed in an HTTP API as you can see in the basic server node diagram or use several delta storages and delta processors as show on the complex server node diagram

                                         

 

And with those diagrams, I finish this post, in the next post we will implement those concepts in C# code

Data synchronization in a few words

Data synchronization in a few words

To Synchronize data is one of the most challenging tasks out there, especially if you are working with LOB applications

There are many ways to synchronize data, the most common technique is to compare records by modification date and then merge the data to create a master record.

Here the main problem is that you have to have a way to compare each type of record, so it gets cumbersome as soon as the number of types in your application begins to grow.

Also, you have to have a log of what gets created and what gets deleted so you can do the same changes on the other nodes

Delta-based synchronization

delta-based synchronization is a problem of identifying “what changed” between each execution of a sync process

A directed delta also called a change, is a sequence of (elementary) change operations which, when applied to one version V1, yields another version V2 (note the correspondence to transaction logs in databases). In computer implementations

In delta synchronization, there are 2 main tasks

  • Record the deltas (or small difference of data between nodes)
  • Transmit these differences to the other nodes so they can process them

Implementing Delta-based synchronization for relational databases

The schema above represents a blog’s database, it’s a really simple schema so its easy to understand, now this is the scenario

We have the main database that we will call “Master” and 2 other databases named client A and client B.

Now let insert data in the master

Each DML statement should be converted in a delta (or a data difference)

Δ1

Δ2

Δ3

Copy deltas Δ 1, Δ 2, Δ 3 to the clients

So, after processing the deltas on each client, the data in all databases should look like the picture below

 

 

So that’s it for this post, in the next post we will be examing each part that is necessary to do delta-based synchronization