Well, it’s time to create our first implementation, first, we need a place to store the deltas generated in the process of tracking changes in a data object.
To keep the Implementation simple, we will create a delta store that saves the deltas in memory. This delta store can also be used for testing purposes
MemoryDeltaStore
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/MemoryDeltaStore.cs
public class MemoryDeltaStore : BIT.Data.Sync.DeltaStoreBase { IList<IDelta> Deltas; public MemoryDeltaStore(IEnumerable<IDelta> Deltas) { this.Deltas = new List<IDelta>(Deltas); } protected MemoryDeltaStore() { } //TODO fix the use of MemoryDb public MemoryDeltaStore(DeltaStoreSettings deltaStoreSettings) : base(deltaStoreSettings) { } public async override Task SaveDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); foreach (IDelta delta in deltas) { cancellationToken.ThrowIfCancellationRequested(); Deltas.Add(new Delta(delta)); } } public override Task<IEnumerable<IDelta>> GetDeltasFromOtherNodes(Guid startindex, string identity, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var result = Deltas.Where(d => d.Index.CompareTo(startindex) > 0 && string.Compare(d.Identity, identity, StringComparison.Ordinal) != 0); return Task.FromResult(result.Cast<IDelta>()); } public override Task<IEnumerable<IDelta>> GetDeltasAsync(Guid startindex, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); return Task.FromResult(Deltas.Where(d => d.Index.CompareTo(startindex) > 0).ToList().Cast<IDelta>()); } Guid LastProcessedDelta; public override async Task<Guid> GetLastProcessedDeltaAsync(CancellationToken cancellationToken = default) { return LastProcessedDelta; } public override async Task SetLastProcessedDeltaAsync(Guid Index, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); LastProcessedDelta = Index; } Guid LastPushedDelta; public async override Task<Guid> GetLastPushedDeltaAsync(CancellationToken cancellationToken) { return LastPushedDelta; } public async override Task SetLastPushedDeltaAsync(Guid Index, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); LastPushedDelta = Index; } public async override Task<int> GetDeltaCountAsync(Guid startindex, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); return Deltas.Count(d => d.Index.CompareTo(startindex) > 0); } public async override Task PurgeDeltasAsync(CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); Deltas.Clear(); } }
Now that we have a delta store in place, we need a data object, something that we can use to generate data and track how the data is changing, so again for test purposes, I have implemented a small in-memory database
SimpleDatabase
https://github.com/egarim/SyncFramework/blob/main/src/BIT.Data.Sync/Imp/SimpleDatabase.cs
public class SimpleDatabase { public IDeltaProcessor DeltaProcessor { get; set; } public string Identity { get; set; } public IDeltaStore DeltaStore { get; set; } public SimpleDatabase(IDeltaStore deltaStore, string identity, List<SimpleDatabaseRecord> Data) { Identity = identity; DeltaStore = deltaStore; this.Data= Data; } List<SimpleDatabaseRecord> Data; public async void Update(SimpleDatabaseRecord Instance) { var ObjectToUpdate = Data.FirstOrDefault(x => x.Key == Instance.Key); if (ObjectToUpdate != null) { var Index = Data.IndexOf(ObjectToUpdate); Data[Index] = Instance; SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Update, Instance); await SaveDelta(item); } } private async Task SaveDelta(SimpleDatabaseModification item) { var Delta = DeltaStore.CreateDelta(Identity,item); await DeltaStore.SaveDeltasAsync(new List<IDelta>() { Delta }, default); } public void Delete(SimpleDatabaseRecord Instance) { var ObjectToDelete= Data.FirstOrDefault(x=>x.Key==Instance.Key); if(ObjectToDelete!=null) { Data.Remove(ObjectToDelete); } } public async Task Add(SimpleDatabaseRecord Instance) { Data.Add(Instance); SimpleDatabaseModification item = new SimpleDatabaseModification(OperationType.Add, Instance); await SaveDelta(item); } }
In the class above I have implemented methods to add, delete and update a record. Inside each method I create an instance of an object called SimpleDatabaseModification, I used that object to keep track of which operation is happening and keep a copy of the instance being handle at the moment, that is what we are going to save as a delta.
SimpleDatabaseModification
public class SimpleDatabaseModification { public OperationType Operation { get; set; } public SimpleDatabaseModification(OperationType operation, SimpleDatabaseRecord record) { Operation = operation; Record = record; } public SimpleDatabaseRecord Record { get; set; } }
Now since the SimpleDatabase is saving the records on a list the next step is to create a processor that gets the information out of the delta and use it to recreate that list, so here is the delta processor
SimpleDatabaseDeltaProcessor
public class SimpleDatabaseDeltaProcessor :DeltaProcessorBase { List<SimpleDatabaseRecord> _CurrentText; public SimpleDatabaseDeltaProcessor(DeltaStoreSettings deltaStoreSettings, List<SimpleDatabaseRecord> CurrentData) : base(deltaStoreSettings) { _CurrentText= CurrentData; } public override Task ProcessDeltasAsync(IEnumerable<IDelta> deltas, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); foreach (IDelta delta in deltas) { cancellationToken.ThrowIfCancellationRequested(); var Modification= this.GetDeltaOperations<SimpleDatabaseModification>(delta); switch (Modification.Operation) { case OperationType.Add: this._CurrentText.Add(Modification.Record); break; case OperationType.Delete: var ObjectToDelete= this._CurrentText.FirstOrDefault(x=>x.Key==Modification.Record.Key); this._CurrentText.Remove(ObjectToDelete); break; case OperationType.Update: var ObjectToUpdate = this._CurrentText.FirstOrDefault(x => x.Key == Modification.Record.Key); var Index= this._CurrentText.IndexOf(ObjectToUpdate); this._CurrentText[Index] = Modification.Record; break; } } return Task.CompletedTask; } }
Well, that is for this post, in the next post we will create some test scenarios to test our implementations