Last week, I decided to create a playground for the SyncFramework to demonstrate how synchronization works. The sync framework itself is not designed in a client-server architecture, but as a set of APIs that you can use to synchronize data.
Synchronization scenarios usually involve a client-server architecture, but when I created the SyncFramework, I decided that network communication was something outside the scope and not directly related to data synchronization. So, instead of embedding the client-server concept in the SyncFramework, I decided to create a set of extensions to handle these scenarios. If you want to take a look at the network extensions, you can see them here.
Now, let’s return to the playground. The main requirement for me, besides showing how the synchronization process works, was not having to maintain an infrastructure for it. You know, a Sync Server and a few databases that I would have to constantly delete. So, I decided to use Blazor WebAssembly and SQLite databases running in the browser. If you want to know more about how SQLite databases can run in the browser, take a look at this article.
Now, there’s still a problem. How do I run a server on the browser? I know it’s somehow possible, but I did not have the time to do the research. So, I decided to create my own HttpClientHandler.
How the HttpClientHandler works
HttpClientHandler offers a number of attributes and methods for controlling HTTP requests and responses. It serves as the fundamental mechanism for HttpClient’s ability to send and receive HTTP requests and responses.
The HttpClientHandler manages aspects like the maximum number of redirects, redirection policies, handling cookies, and automated decompression of HTTP traffic. It can be set up and supplied to HttpClient to regulate the HTTP requests made by HttpClient.
HttpClientHandler might be helpful in testing situations when it’s necessary to imitate or mock HTTP requests and responses. The SendAsync method of HttpMessageHandler, from which HttpClientHandler also descended, can be overridden in a new class to deliver any response you require for your test.
here is a basic example
public class TestHandler : HttpMessageHandler
{
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
// You can check the request details and return different responses based on that.
// For simplicity, we're always returning the same response here.
var responseMessage = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent("Test response.")
};
return await Task.FromResult(responseMessage);
}
}
And here’s how you’d use this handler in a test:
[Test]
public async Task TestHttpClient()
{
var handler = new TestHandler();
var client = new HttpClient(handler);
var response = await client.GetAsync("http://example.com");
var responseContent = await response.Content.ReadAsStringAsync();
Assert.AreEqual("Test response.", responseContent);
}
The TestHandler in this illustration consistently sends back an HTTP 200 response with the body “Test response.” In a real test, you might use SendAsync with more sophisticated logic to return several responses depending on the specifics of the request. By doing so, you may properly test your code’s handling of different answers without actually sending HTTP queries.
Going back to our main story
Now that we know we can catch the HTTP request and handle it locally, we can write an HttpClientHandler that takes the request from the client nodes and processes them locally. Now, we have all the pieces to make the playground work without a real server. You can take a look at the implementation of the custom handler for the playground here
Ok, so far, our synchronization framework is only implemented for an in-memory database that we use for testing purposes.
Now let’s implement a different use case, lets add synchronization functionality to an entity framework core DbContext.
As I explained before, the key part of synchronizing data using delta encoding is to be able to track the differences that happen to a data object, in this case, a relational database.
these are the task that we need to do to accomplish our goal
Find out how entity framework converts the changes that happen to the objects to SQL commands
Decide what information we need to track and save as a delta
Create the infrastructure to save deltas (IDeltaStore)
Create the infrastructure to process deltas (IDeltaProcessor)
Implement the synchronization node functionality in an Entity Framework DbContext(ISyncClientNode)
Create a test scenario
1 Find out how entity framework converts the changes that happen to the objects to SQL commands
In our companies (BitFrameworks & Xari) we have been working in data synchronization for a while, but all this work has been done in the XPO realm.
public abstract class ModificationCommandBatch
{
/// <summary>
/// The list of conceptual insert/update/delete <see cref="ModificationCommands" />s in the batch.
/// </summary>
public abstract IReadOnlyList<IReadOnlyModificationCommand> ModificationCommands { get; }
now let’s take look into the ModificationCommand https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/ModificationCommand.cs this class provides all the information about the changes that will be converted into SQL commands, which means that if we serialize this object and save it as a delta we can then send it to another node and replicate the changes…VOILA!!!
So now we know where the changes that we need to keep track of are, now let’s try to understand how those changes are converted into SQL commands and then executed into the database.
2 Decide what information we need to track and save as a delta
Entity framework core uses dependency injection to be able to handle different database engines so the idea here is that there are a lot of small services that can be replaced in other to create a different implementation, for example, SQLite, SqlServer, Postgres, etc …
After a lot of digging, I found that the service that is in charge of generating the update commands (insert, update and delete) UpdateSqlGenerator
this class implements IUpdateSqlGenerator https://github.com/dotnet/efcore/blob/main/src/EFCore.Relational/Update/IUpdateSqlGenerator.cs and as you can see all methods receive a string builder and a ModificationCommand so this is the service in charge of translating the ModificationCommand into SQL commands and SQL commands are easy to serialize because they are just text, so this is what we are going to serialize and save as a delta
public interface IUpdateSqlGenerator
{
/// <summary>
/// Generates SQL that will obtain the next value in the given sequence.
/// </summary>
/// <param name="name">The name of the sequence.</param>
/// <param name="schema">The schema that contains the sequence, or <see langword="null" /> to use the default schema.</param>
/// <returns>The SQL.</returns>
string GenerateNextSequenceValueOperation(string name, string? schema);
/// <summary>
/// Generates a SQL fragment that will get the next value from the given sequence and appends it to
/// the full command being built by the given <see cref="StringBuilder" />.
/// </summary>
/// <param name="commandStringBuilder">The builder to which the SQL fragment should be appended.</param>
/// <param name="name">The name of the sequence.</param>
/// <param name="schema">The schema that contains the sequence, or <see langword="null" /> to use the default schema.</param>
void AppendNextSequenceValueOperation(
StringBuilder commandStringBuilder,
string name,
string? schema);
/// <summary>
/// Appends a SQL fragment for the start of a batch to
/// the full command being built by the given <see cref="StringBuilder" />.
/// </summary>
/// <param name="commandStringBuilder">The builder to which the SQL fragment should be appended.</param>
void AppendBatchHeader(StringBuilder commandStringBuilder);
/// <summary>
/// Appends a SQL command for deleting a row to the commands being built.
/// </summary>
/// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
/// <param name="command">The command that represents the delete operation.</param>
/// <param name="commandPosition">The ordinal of this command in the batch.</param>
/// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
ResultSetMapping AppendDeleteOperation(
StringBuilder commandStringBuilder,
IReadOnlyModificationCommand command,
int commandPosition);
/// <summary>
/// Appends a SQL command for inserting a row to the commands being built.
/// </summary>
/// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
/// <param name="command">The command that represents the delete operation.</param>
/// <param name="commandPosition">The ordinal of this command in the batch.</param>
/// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
ResultSetMapping AppendInsertOperation(
StringBuilder commandStringBuilder,
IReadOnlyModificationCommand command,
int commandPosition);
/// <summary>
/// Appends a SQL command for updating a row to the commands being built.
/// </summary>
/// <param name="commandStringBuilder">The builder to which the SQL should be appended.</param>
/// <param name="command">The command that represents the delete operation.</param>
/// <param name="commandPosition">The ordinal of this command in the batch.</param>
/// <returns>The <see cref="ResultSetMapping" /> for the command.</returns>
ResultSetMapping AppendUpdateOperation(
StringBuilder commandStringBuilder,
IReadOnlyModificationCommand command,
int commandPosition);
}
3 Create the infrastructure to save deltas (Implementing IDeltaStore)
Now is time to create a delta store, this is an easy one since we only need to inherit from our delta store base and save the information in an entity framework DbContext, so here is the implementation
4 Create the infrastructure to process deltas (implementing IDeltaProcessor)
So far, we know what we need to store in the deltas which basically is SQL commands and their parameters so it means to process those SQL Commands our delta processor needs to create a database connection and execute SQL commands
public EFDeltaProcessor(DbContext dBContext)
{
_dBContext = dBContext;
}
public EFDeltaProcessor(string connectionstring, string DbEngineAlias, string ProviderInvariantName)
{
this.CurrentDbEngine = DbEngineAlias;
this.connectionString = connectionstring;
try
{
factory = DbProviderFactories.GetFactory(ProviderInvariantName);
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
throw new Exception("There was a problem creating the database connection using DbProviderFactories.GetFactory. Please your make sure the DbProviderFactory for your database is registered https://docs.microsoft.com/en-us/dotnet/api/system.data.common.dbproviderfactories.registerfactory?view=net-5.0", ex);
}
//TODO check provider registration later
//DbProviderFactories.RegisterFactory("Microsoft.Data.SqlClient", SqlClientFactory.Instance);
}
there are a few things to notice in that class, first, it has 2 constructors because we need 2 different ways to create the connection to the database, one using the entity framework DbContext and one using ADO.NET DbProviderFactory
All the magic happens in the ProcessDeltas method, this method is in charge of, extract the content of the deltas and transforming them into SQL commands and parameters, and then executing the command.
please notice that the content of each delta is an instance of ModificationCommandData
which is a class that allows us to store multiple SQL commands (for different database engines) and their parameters
5 Implement the synchronization node functionality in an Entity Framework DbContext(ISyncClientNode)
At the moment we are able to produce and process deltas for entity framework relational, so the next step is to implement the functionality of synchronization client node by implementing the following interface
I’m not going to show the implementation of the server since that implementation is generic and uses the same delta store and delta processor that we created at the beginning of this article. for more information check the following links
So far, all our test exists inside the same process, so they communicate through variables, in real-life scenarios nodes won’t exist in the same process and most of the time not even in the same location.
The easiest and most standard way to implement client-server communication in the dotnet world is a rest API, so let’s define an API client and a server service, lets start with the API client
public class SyncFrameworkHttpClient : ISyncFrameworkClient
{
HttpClient _httpClient;
public string DeltaStoreId { get; }
public SyncFrameworkHttpClient(HttpClient httpClient,string NodeId)
{
this.DeltaStoreId = NodeId;
_httpClient = httpClient;
_httpClient.DefaultRequestHeaders.Add("NodeId", NodeId);
this.DeltaStoreId = NodeId;
}
public SyncFrameworkHttpClient(string BaseAddress, string DeltaStoreId):this(new HttpClient() { BaseAddress=new Uri(BaseAddress)},DeltaStoreId)
{
}
public virtual async Task PushAsync(IEnumerable<IDelta> Deltas, CancellationToken cancellationToken = default)
{
try
{
List<Delta> toserialzie = new List<Delta>();
foreach (IDelta delta in Deltas)
{
toserialzie.Add(new Delta(delta));
}
cancellationToken.ThrowIfCancellationRequested();
DataContractJsonSerializer js = new DataContractJsonSerializer(typeof(List<Delta>));
MemoryStream msObj = new MemoryStream();
js.WriteObject(msObj, toserialzie);
msObj.Position = 0;
StreamReader sr = new StreamReader(msObj);
string jsonDeltas = sr.ReadToEnd();
var data = new StringContent(jsonDeltas, Encoding.UTF8, "application/json");
await _httpClient.PostAsync("/Sync/Push", data, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
var message = ex.Message;
throw;
}
}
public virtual async Task<List<Delta>> FetchAsync(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
var QueryParams = new Dictionary<string, string>();
QueryParams.Add(nameof(startindex), startindex.ToString());
QueryParams.Add(nameof(identity), identity);
cancellationToken.ThrowIfCancellationRequested();
var query = HttpUtility.ParseQueryString("");
foreach (KeyValuePair<string, string> CurrentParam in QueryParams)
{
query[CurrentParam.Key] = CurrentParam.Value;
}
var reponse = await _httpClient.GetStringAsync($"/Sync/Fetch?{query.ToString()}").ConfigureAwait(false);
using (var ms = new MemoryStream(Encoding.Unicode.GetBytes(reponse)))
{
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(List<Delta>));
List<Delta> Deltas = (List<Delta>)deserializer.ReadObject(ms);
return Deltas;
}
//List<Delta> Deltas = JsonConvert.DeserializeObject<List<Delta>>(reponse);
return null;
}
}
it’s an implementation of the ISyncFrameworkClient interface using HTTP communication
Fetch: uses an HTTP get request
Push: uses an HTTP post request
Also, the “nodeid” header is added to the request, you will understand why when we implement the server part.
Now that we have defined the contract for the client and also provided the base implementation using an HTTP client, its time to define what a client node is, please take a look at the code below
public static class ISyncClientNodeExtensions
{
public static async Task<List<Delta>> FetchAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastDetalIndex = await instance.DeltaStore.GetLastProcessedDeltaAsync(cancellationToken).ConfigureAwait(false);
return await instance.SyncFrameworkClient.FetchAsync(LastDetalIndex, instance.Identity, cancellationToken).ConfigureAwait(false);
}
public static async Task PullAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var Deltas = await instance.FetchAsync(cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
await instance.DeltaProcessor.ProcessDeltasAsync(Deltas, cancellationToken).ConfigureAwait(false);
Guid index = Deltas.Max(d => d.Index);
await instance.DeltaStore.SetLastProcessedDeltaAsync(index, cancellationToken).ConfigureAwait(false);
}
}
public static async Task PushAsync(this ISyncClientNode instance, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var LastPushedDelta = await instance.DeltaStore.GetLastPushedDeltaAsync(cancellationToken).ConfigureAwait(false);
var Deltas = await instance.DeltaStore.GetDeltasAsync(LastPushedDelta,cancellationToken).ConfigureAwait(false);
if (Deltas.Any())
{
var Max = Deltas.Max(d => d.Index);
await instance.SyncFrameworkClient.PushAsync(Deltas, cancellationToken).ConfigureAwait(false);
await instance.DeltaStore.SetLastPushedDeltaAsync(Max,cancellationToken).ConfigureAwait(false);
}
}
}
so, this is how the SyncClientNode is structured
Let’s move to the server-side now, here the idea is to be able to host multiple delta store and delta processors and also to be able to introduce custom logic either saving the deltas into the delta store or processing the deltas into a data object
Well, it’s time to create our first implementation, first, we need a place to store the deltas generated in the process of tracking changes in a data object.
To keep the Implementation simple, we will create a delta store that saves the deltas in memory. This delta store can also be used for testing purposes
public class MemoryDeltaStore : BIT.Data.Sync.DeltaStoreBase
{
IList<IDelta> Deltas;
public MemoryDeltaStore(IEnumerable<IDelta> Deltas)
{
this.Deltas = new List<IDelta>(Deltas);
}
protected MemoryDeltaStore()
{
}
//TODO fix the use of MemoryDb
public MemoryDeltaStore(DeltaStoreSettings deltaStoreSettings) : base(deltaStoreSettings)
{
}
public async override Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IDelta delta in deltas)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Add(new Delta(delta));
}
}
public override Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var result = Deltas.Where(d => d.Index.CompareTo(startindex) > 0 && string.Compare(d.Identity, identity, StringComparison.Ordinal) != 0);
return Task.FromResult(result.Cast<IDelta>());
}
public override Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult(Deltas.Where(d => d.Index.CompareTo(startindex) > 0).ToList().Cast<IDelta>());
}
Guid LastProcessedDelta;
public override async Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default)
{
return LastProcessedDelta;
}
public override async Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastProcessedDelta = Index;
}
Guid LastPushedDelta;
public async override Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken)
{
return LastPushedDelta;
}
public async override Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
LastPushedDelta = Index;
}
public async override Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return Deltas.Count(d => d.Index.CompareTo(startindex) > 0);
}
public async override Task PurgeDeltasAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Deltas.Clear();
}
}
Now that we have a delta store in place, we need a data object, something that we can use to generate data and track how the data is changing, so again for test purposes, I have implemented a small in-memory database
public class SimpleDatabase
{
public IDeltaProcessor DeltaProcessor { get; set; }
public string Identity { get; set; }
public IDeltaStore DeltaStore { get; set; }
public SimpleDatabase(IDeltaStore deltaStore, string identity, List<SimpleDatabaseRecord> Data)
{
Identity = identity;
DeltaStore = deltaStore;
this.Data= Data;
}
List<SimpleDatabaseRecord> Data;
public async void Update(SimpleDatabaseRecord Instance)
{
var ObjectToUpdate = Data.FirstOrDefault(x => x.Key == Instance.Key);
if (ObjectToUpdate != null)
{
var Index = Data.IndexOf(ObjectToUpdate);
Data[Index] = Instance;
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Update, Instance);
await SaveDelta(item);
}
}
private async Task SaveDelta(SimpleDatabaseModification item)
{
var Delta = DeltaStore.CreateDelta(Identity,item);
await DeltaStore.SaveDeltasAsync(new List<IDelta>() { Delta }, default);
}
public void Delete(SimpleDatabaseRecord Instance)
{
var ObjectToDelete= Data.FirstOrDefault(x=>x.Key==Instance.Key);
if(ObjectToDelete!=null)
{
Data.Remove(ObjectToDelete);
}
}
public async Task Add(SimpleDatabaseRecord Instance)
{
Data.Add(Instance);
SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Add, Instance);
await SaveDelta(item);
}
}
In the class above I have implemented methods to add, delete and update a record. Inside each method I create an instance of an object called SimpleDatabaseModification, I used that object to keep track of which operation is happening and keep a copy of the instance being handle at the moment, that is what we are going to save as a delta.
public class SimpleDatabaseModification
{
public OperationType Operation { get; set; }
public SimpleDatabaseModification(OperationType operation, SimpleDatabaseRecord record)
{
Operation = operation;
Record = record;
}
public SimpleDatabaseRecord Record { get; set; }
}
Now since the SimpleDatabase is saving the records on a list the next step is to create a processor that gets the information out of the delta and use it to recreate that list, so here is the delta processor
Now that we have defined the bases contracts necessary for synchronization, we can define some base classes that implement those contracts, the main idea behind these base classes is to, later on, add the possibility to inject configurations with .net dependency injection.