This project looks to document a service that demonstrates one approach to fetching, processing, and loading the data files created by the Gentrack data replication service into a replica RDBMS.
The tool has been written in a modular fashion with the intention that it could be extended, maintained and productionised by a 3rd party.
The providers are an access layer to lower-level services
This interface allows for an implementation of file fetching operations from a remote source
Current methods allow for
- List files at the source
- Download an object
- Delete an object
Currently, there is one implementation of this called S3Service to perform these actions with an AWS S3 bucket
This interface allows for an implementation of operations on the required RDBMS target
methods allow for
- Bulk Load a file to target database/table
- Bulk load a file to a temp table and then UPSERT
Currently, there is one implementation of this called SqlServerDatabaseService targeted at Microsoft SQL Server 2016+
This interface implements operations to store and cache files in a local file system.
Storing the file locally prior to using the target RDBMS's bulk insert functionality is seen as the most memory-efficient approach
Current methods allow for
- list files that were downloaded but not processed
- mark a file as "done" (renames file to add .done extension)
- get Local file object
A simple class that contains common details of files stored in both the remote source and local cache
It should be expected that the source endpoint from the replication service stores files using the following path/naming scheme
/<database name>/<table name>/<file name>
The file object allows storing the file location as well as the path information is broken down to help downstream processes both local and remote
A producer is targeted at performing the following logic
polling a source endpoint fetching any files found storing into the local cache deleting file at the source add file pointer to the shared queue
This interface currently implements a single generic method expected to perform the actions above called "StartPolling"
Currently, there is one implementation of this called S3FileProducer which uses the providers implemented in the section above to achieve this logic
A consumer is targeted at performing the following logic
polling the shared queue dequeuing a file entry with respect to concurrency calling the appropriate load operation for the file entry marking the file as "done"
This interface currently implements a single generic method expected to perform the general actions called "StartPolling"
Currently, the single implementation of this called FullLoadFileConsumer will perform the actions above with the following additional logic
- looks for files starting with the name "LOAD" to ensure we only look at full load files
- if a file that does not begin with "LOAD" is found it is assumed to be a delta file, the consumer enters shutdown mode, the assumption is that the files are being added to the queue in the correct order and a delta file in the queue means all full load files have been processed
- the full load process works using multiple threads, given there is no issue with concurrency and ordering for the initial load this allows the initial baseline to be loaded as fast as possible
This interface currently implements a single generic method expected to perform the general actions called "StartPolling"
Currently, the single implementation of this called DeltaFileConsumer will perform the actions with the following additional logic
- looks for files that do not start with the name "LOAD" to ensure we only look at delta files
- if a file that begins with "LOAD" is found it is assumed to be a full load file, he consumer enters shutdown mode, the assumption is that the files are being added to the queue in the correct order and a full load file in the queue means that the replication process has been restarted and requires investigation
- Unlike the full load process, the delta process will only process and load a single file at a time to ensure concurrency
This is the entry point for the solution
currently enforces that a command-line option of -t / -type is provided with one of the following values and will load and run the services based on this
"full load" "delta" the main method will also look for user input of "Q" and return at any time to shutdown
The startup class implements a standard configuration object using the appsettings.json file as well as dependency injection for all the services
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.EntityFrameworkCore": "Error"
}
},
"ConnectionStrings": {
"SqlServer": "Data Source=<hostname>;Initial Catalog=<initial database name>;User ID=<user>;Password=<password>"
},
"DatabaseMapping": [{
"SourceDatabaseKey": "<sourceKey>",
"TargetDatabaseKey": "<targetKey>"
},
{
"SourceDatabaseKey": "<sourceKey>",
"TargetDatabaseKey": "<targetKey>"
}
],
"replicationBucket": "<S3 bucket name>",
"replicationBucketPrefix": "<filter name if>",
"ProjectName": "<project name>",
"Environment": "<environment name>",
"LocalCachePath": "<local folder to store cached files>",
"ParallelFullLoadStreams": "<number of streams for full load>",
"AWS": {
"Profile": "<aws profile name>",
"Region": "<aws region>"
}
}
The recomended option is "Information" logging only the "Debug" options is also supported
As per the example above, ensure that hostname, initial catalog, user id, password are populated Currently only supports the SqlServer value
Allows for mapping the source database names used and target database names you may want to point/rename at *You should request the support team to provide the source names an ensure a 1 - 1 mapping is created
The name of the S3 Bucket where the files can be found
This parameter allows the filtering of objects from the S3 Bucket, it is recommended to only be used when testing and wanting to limit the process to a single file
e.g by adding a value such as
\customerdbname\Account\LOAD000001.csv
The service will only look for the specified object
Currently unused
Currently unused
The local folder where the process can download and cache the replication files in to
The number of streams that should be used when processing full load files the recommendation is to choose a number based on the resources of the target database instance
This is the name of the AWS profile that should be created inside the AWS Credentials file matching the access details required for the source S3 Bucket that should be used
- Install the AWS CLI and ensure the AWS Credentials file is setup
- Add an entry to the credentials file using the access key details provided by Gentrack
- Ensure there is enough space provisioned and a folder created/identified for the local cache (referenced in appsettings.config
- Ensure there is an SQLServer instance provisioned where an initial database has been created based on the schema details provided by Gentrack
- Compile and create a release of the solution for the target architecture required
- Create an appsettings.config file as per the details in the section above
- Run the tool in full load mode (use -m "full")
- Run the tool in delta mode (use -m "delta")
If we want to acheive perfect consistancy with the source data, then we would need to look at changing the delta load process to only update the target tables once all records for all tables by "commit date"
The correct way to poll objects and order is by "modified date time", this will ensure we process files as execpted in a production use case The current demonstration code takes the approach of a) first take all the files starting with LOAD , not caring about the order of these b) then take all the delta files and order by name (the file names have the date time stamp of creation in it) The reason for this approach is when testing, AWS S3 Buckets do not allow preserving last modified date when copying test data The ideal solution here is to update the code to have both methods controlled by a flag
- Process a schema update file
- Impliment a Postgres version of the IDatabaseService
- Impliment a MySql version of the IDatabaseService
- Better error control and reporting