Outbound Pipeline: Difference between revisions
No edit summary |
No edit summary |
||
Line 19: | Line 19: | ||
== Databases == | == Databases == | ||
* Multiple databases are used in the outbound transformation pipeline | |||
* Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance | |||
* In production DDS instance, there are approx. 10 database instances | |||
* Most database access layer code in EdsCore repository | |||
{| class="wikitable" | {| class="wikitable" | ||
! | ! | ||
! | ! | ||
Line 27: | Line 32: | ||
|Admin | |Admin | ||
|MySQL | |MySQL | ||
| | |Stores details on services publishing to and subscribing from the DDS | ||
|- | |- | ||
|Audit | |Audit | ||
|MySQL | |MySQL | ||
| | |Stores audit of published data and transformations to it | ||
|- | |- | ||
|Config | |Config | ||
|MySQL | |MySQL | ||
| | |Stores most configuration including database connection strings, logging, RabbitMQ routing | ||
|- | |- | ||
|Data_generator | |Data_generator | ||
|MySQL | |MySQL | ||
| | |Staging area for sending transformed data to externally hosted databases | ||
|- | |- | ||
|Ehr | |Ehr | ||
|MySQL | |MySQL | ||
| | |Stores all patient and reference data in FHIR JSON. There is support for multiple ehr databases, with each publishing service configured to write to a specific ehr (many to one) | ||
|- | |- | ||
|Enterprise | |Enterprise | ||
|MySQL | |MySQL | ||
| | |Original schema of the “subscriber” customer database for storing patient records in a relational format. Also known as “compass” or “compass v1”. There are multiple instances of this database, each for a specific customer. | ||
|- | |- | ||
|Subscriber | |Subscriber | ||
|MySQL & SQL Server | |MySQL & SQL Server | ||
| | |Newer schema of the “subscriber” customer database. Also known as “compass v2”. There are multiple instances of this database, each for a specific customer. | ||
|- | |- | ||
|Subscriber_transform | |Subscriber_transform | ||
|MySQL | |MySQL | ||
|Stores persistent mappings between FHIR UUIDs from ehr database and record ID integers in enterprise and subscriber databases. Each enterprise and subscriber database has a corresponding subscriber_transform database. | |||
|} | |||
== Subscriber Database Schemas == | |||
The term '''subscriber database''' is an umbrella term for databases of three different schemas: | |||
{| class="wikitable" | |||
!Enterprise PI | |||
!Enterprise Psudeo | |||
!Subscriber | |||
|- | |||
|Original schema | |||
|Same schema as Enterprise PI except modified to support de-identified (pseudonymised) data only | |||
|Newest schema | |||
|- | |||
|Supports only patient identifiable data | |||
|Schema only supports MySQL | |||
|Similar to the older ones, but with some revisions | |||
|- | |||
|Schema only supports MySQL | |||
|Same transform code as Enterprise PI with de-identification handled by configuration | |||
|Supports both patient identifiable and de-identified data in same schema | |||
|- | |||
|Transform code in org.endeavourhealth.transform.enterprise package of transform repository | |||
| | |||
|Same schema works on both MySQL and SQL Server | |||
|- | |||
| | |||
| | | | ||
|Transform code in org.endeavourhealth.transform.subscriber package of transform repository | |||
|} | |||
In this document, any '''subscriber database''' mentioned may be any of the above three schemas, ''except'' when a subscriber database is hosted by a third party, in which case the database will always be the newest schema. | |||
== Application: Protocol Queue Reader == | |||
* Same Queue Reader application as used for inbound pipeline, but configured to run as a Protocol Queue Reader | |||
* Consumes messages from RabbitMQ EdsProtocol queue Determines which Sharing Agreements are relevant for new published data (i.e. which subscribers the new published data should go to) | |||
* Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement | |||
[[File:Protocol Queue Reader.jpg|frameless|400x400px]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''eds-queuereader''' module in '''eds''' repository | |||
|- | |||
|Arguments: | |||
|Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. | |||
|} | |||
'''Processing steps:''' | |||
# Consume message from EdsProtocol queue (message contains metadata about new FHIR data for a specific patient) | |||
# Determine which data sharing agreements the publishing organisation is part of | |||
# For each protocol, determine if the new data for the patient falls under the data sharing agreement | |||
# For each matching protocol, a message is published to the EdsTransform exchange (containing same metadata as consumed, plus the protocol ID) | |||
# ACK message in EdsProtocol queue to remove it | |||
[[File:Protocol Queue Reader 2.jpg|frameless|500x500px]] | |||
== Transform Queue Reader == | |||
* Same Queue Reader application as used for inbound pipeline, but configured to run as a Transform Queue Reader | |||
* Consumes messages from RabbitMQ EdsTransform queue | |||
* Performs outbound FHIR to CSV transform on the new data, CSV mapping directly to tables in subscriber DB | |||
* Stores resulting CSV data in holding table | |||
* Publishes messages to RabbitMQ EdsSubscriber exchange | |||
[[File:Transform Queue Reader.jpg|frameless|400x400px]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''eds-queuereader''' module in '''eds''' repository, but all transform logic in '''transforms''' repository | |||
|- | |||
|Arguments: | |||
|Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. | |||
|} | |||
'''Processing steps:''' | |||
# Consume message from EdsTransform queue (message contains metadata about new FHIR data for a specific patient and a protocol ID) | |||
# Perform relevant FHIR to CSV transform for newly published data (FHIR retrieved from ehr DB), storing persistent mappings in subscriber transform DB | |||
# Store CSV data (as base64 encoded zip file) in holding table | |||
# Publish message to EdsSubscriber exchange (message containing same metadata as consumed, plus reference to the holding table record) | |||
# ACK message in EdsTransform queue to remove it | |||
[[File:Transform Queue Reader 2.jpg|frameless|500x500px]] | |||
== Subscriber Queue Reader == | |||
* Same Queue Reader application as used for inbound pipeline, but configured to run as a Subscriber Queue Reader | |||
* Consumes messages from RabbitMQ EdsSubscriber queue | |||
* Writes transformed CSV data to subscriber DB where possible | |||
* Stages transformed CSV data for sending to third party hosting otherwise | |||
[[File:Subscriber Queue Reader 1.jpg|frameless|400x400px]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''eds-queuereader''' module in '''eds''' repository | |||
|- | |||
|Arguments: | |||
|Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. | |||
|} | |||
'''Processing steps:''' | |||
# Consume message from EdsSubscriber queue (message contains metadata about new FHIR data for a specific patient, a protocol ID, and a reference to the transformed CSV data) | |||
# Read CSV data from holding table | |||
# Check if subscriber is locally hosted or remote | |||
# If locally hosted, write CSV data directly to subscriber DB | |||
# If remote hosted, write CSV data to staging table for sending externally | |||
# ACK message in EdsSubscriber queue to remove it | |||
[[File:Subscriber Queue Reader 2.jpg|frameless|500x500px]] | |||
== Database File Sender == | |||
* When subscriber DB is remotely hosted, this application is used to make the transformed CSV data available over SFTP | |||
* Transformed CSV is encrypted and written to directory on SFTP server for download by external party | |||
* Run multiple times per day by cron | |||
[[File:Database File Sender 1.jpg|frameless|450x450px]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''cegdatabasefilesender''' module in '''DataGenerator''' repository | |||
|- | |||
|Arguments: | |||
|“sending” as argument to run the application in sending mode | |||
“feedback” as argument to run the application in feedback mode | |||
|} | |||
Processing steps: | |||
# Retrieve the new CSV data in the staging table | |||
# PGP encrypt the data | |||
# Encrypted data written to folder on SFTP server | |||
# Updated staging table to record data has been sent for filing | |||
# Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table | |||
[[File:Database File Sender 2.jpg|frameless|500x500px]] | |||
== Remote Server Filer == | |||
* When subscriber DB is remotely hosted, this application is used by the third party in their hosted environment | |||
* Downloads CSV data from the DDS SFTP server | |||
* Applies the CSV data to the subscriber DB in this hosted environment | |||
* Writes back feedback files to report outcome of the above | |||
* Run several times per day by cron | |||
[[File:Remote server filer 1.jpg|frameless]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''filer''' module in '''DataGenerator''' repository | |||
|- | |||
|Arguments: | |||
|Configuration provided by resource file | |||
|} | |||
'''Processing steps:''' | |||
# Connect to DDS SFTP server | |||
# Download any new files and write to local disk | |||
# Decrypt CSV files | |||
# Apply CSV data to subscriber DB | |||
# Write feedback files to SFTP server to report outcome of updating database | |||
# Delete files | |||
[[File:Remote server filer 2.jpg|frameless|500x500px]] | |||
== Enterprise Age Updater == | |||
* When the older “enterprise” DB schema is configured to de-identify patients, the patient’s age is stored as an integer in the database | |||
* To keep the age value correct, the '''Enterprise Age Updater''' application is run daily, via cron, to re-calculate the age for patients in these databases | |||
[[File:Enterprise Age Updater.jpg|frameless|300x300px]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Java application | |||
|- | |||
|Code location: | |||
|'''utility-enterprise-age_updater''' module in '''Eds''' repository | |||
|- | |||
|Arguments: | |||
|Configuration record name for the subscriber DB to be updated | |||
|} | |||
Processing steps: | |||
# Finds all patients whose age will have changed since the last run date | |||
# Calculates the new age for each patient | |||
# Updates the subscriber DB with the new age | |||
[[File:Enterprise Age Updater 2.jpg|frameless|450x450px]] | |||
== Population Health == | |||
* Population Health is a web application that provides a UI for performing analytics on a subscriber DB | |||
* The intention is for customers to use this application to run reports and queries on their patient data | |||
* This will mean customers can perform powerful analytics without needing direct database access | |||
* Still essentially in beta | |||
[[File:Population Health.jpg|frameless]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Tomcat servlet and Angular4 frontend (will be migrated to Angular8) | |||
|- | |||
|Code location: | |||
|'''enterprise-web''' module in '''Enterprise''' repository | |||
|- | |||
|Arguments: | |||
|ID for config table record provided by web.xml in war file | |||
|} | |||
== DDS-UI == | |||
* Same application as used to monitor inbound pipeline also has similar functionality for outbound pipeline | |||
* Used to configure subscribing services | |||
* Used to queue messages into RabbitMQ when new subscribers are added to DDS – publishes to the EdsProtocol queue to send existing published data through outbound transformation pipeline | |||
[[File:DDS-UI.png|frameless]] | |||
{| class="wikitable" | |||
!Property | |||
!Value | |||
|- | |||
|Component type: | |||
|Tomcat servlet and Angular2 frontend (no plans to migrate to Angular8 as it’s an internal tool) | |||
|- | |||
|Code location: | |||
|'''eds-ui''' module in '''eds''' repository | |||
|- | |||
|Arguments: | |||
|none | |||
|} | |} |
Revision as of 13:46, 22 April 2021
Introduction
DDS outbound transformation pipeline takes published data, already standardized into FHIR and transforms to customer databases
Input: FHIR resources, stored as JSON in “ehr” databases
Output: Populated customer databases of patient data, some hosted internally in DDS environment and some hosted by third parties
Technical Overview
Multiple applications make up the outbound transformation pipeline:
- White boxes are DDS applications
- Arrows show communication method between components
- Not all databases shown
- Interaction with databases simplified
Databases
- Multiple databases are used in the outbound transformation pipeline
- Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance
- In production DDS instance, there are approx. 10 database instances
- Most database access layer code in EdsCore repository
Admin | MySQL | Stores details on services publishing to and subscribing from the DDS |
Audit | MySQL | Stores audit of published data and transformations to it |
Config | MySQL | Stores most configuration including database connection strings, logging, RabbitMQ routing |
Data_generator | MySQL | Staging area for sending transformed data to externally hosted databases |
Ehr | MySQL | Stores all patient and reference data in FHIR JSON. There is support for multiple ehr databases, with each publishing service configured to write to a specific ehr (many to one) |
Enterprise | MySQL | Original schema of the “subscriber” customer database for storing patient records in a relational format. Also known as “compass” or “compass v1”. There are multiple instances of this database, each for a specific customer. |
Subscriber | MySQL & SQL Server | Newer schema of the “subscriber” customer database. Also known as “compass v2”. There are multiple instances of this database, each for a specific customer. |
Subscriber_transform | MySQL | Stores persistent mappings between FHIR UUIDs from ehr database and record ID integers in enterprise and subscriber databases. Each enterprise and subscriber database has a corresponding subscriber_transform database. |
Subscriber Database Schemas
The term subscriber database is an umbrella term for databases of three different schemas:
Enterprise PI | Enterprise Psudeo | Subscriber |
---|---|---|
Original schema | Same schema as Enterprise PI except modified to support de-identified (pseudonymised) data only | Newest schema |
Supports only patient identifiable data | Schema only supports MySQL | Similar to the older ones, but with some revisions |
Schema only supports MySQL | Same transform code as Enterprise PI with de-identification handled by configuration | Supports both patient identifiable and de-identified data in same schema |
Transform code in org.endeavourhealth.transform.enterprise package of transform repository | Same schema works on both MySQL and SQL Server | |
Transform code in org.endeavourhealth.transform.subscriber package of transform repository |
In this document, any subscriber database mentioned may be any of the above three schemas, except when a subscriber database is hosted by a third party, in which case the database will always be the newest schema.
Application: Protocol Queue Reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Protocol Queue Reader
- Consumes messages from RabbitMQ EdsProtocol queue Determines which Sharing Agreements are relevant for new published data (i.e. which subscribers the new published data should go to)
- Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsProtocol queue (message contains metadata about new FHIR data for a specific patient)
- Determine which data sharing agreements the publishing organisation is part of
- For each protocol, determine if the new data for the patient falls under the data sharing agreement
- For each matching protocol, a message is published to the EdsTransform exchange (containing same metadata as consumed, plus the protocol ID)
- ACK message in EdsProtocol queue to remove it
Transform Queue Reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Transform Queue Reader
- Consumes messages from RabbitMQ EdsTransform queue
- Performs outbound FHIR to CSV transform on the new data, CSV mapping directly to tables in subscriber DB
- Stores resulting CSV data in holding table
- Publishes messages to RabbitMQ EdsSubscriber exchange
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository, but all transform logic in transforms repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsTransform queue (message contains metadata about new FHIR data for a specific patient and a protocol ID)
- Perform relevant FHIR to CSV transform for newly published data (FHIR retrieved from ehr DB), storing persistent mappings in subscriber transform DB
- Store CSV data (as base64 encoded zip file) in holding table
- Publish message to EdsSubscriber exchange (message containing same metadata as consumed, plus reference to the holding table record)
- ACK message in EdsTransform queue to remove it
Subscriber Queue Reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Subscriber Queue Reader
- Consumes messages from RabbitMQ EdsSubscriber queue
- Writes transformed CSV data to subscriber DB where possible
- Stages transformed CSV data for sending to third party hosting otherwise
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsSubscriber queue (message contains metadata about new FHIR data for a specific patient, a protocol ID, and a reference to the transformed CSV data)
- Read CSV data from holding table
- Check if subscriber is locally hosted or remote
- If locally hosted, write CSV data directly to subscriber DB
- If remote hosted, write CSV data to staging table for sending externally
- ACK message in EdsSubscriber queue to remove it
Database File Sender
- When subscriber DB is remotely hosted, this application is used to make the transformed CSV data available over SFTP
- Transformed CSV is encrypted and written to directory on SFTP server for download by external party
- Run multiple times per day by cron
Property | Value |
---|---|
Component type: | Java application |
Code location: | cegdatabasefilesender module in DataGenerator repository |
Arguments: | “sending” as argument to run the application in sending mode
“feedback” as argument to run the application in feedback mode |
Processing steps:
- Retrieve the new CSV data in the staging table
- PGP encrypt the data
- Encrypted data written to folder on SFTP server
- Updated staging table to record data has been sent for filing
- Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table
Remote Server Filer
- When subscriber DB is remotely hosted, this application is used by the third party in their hosted environment
- Downloads CSV data from the DDS SFTP server
- Applies the CSV data to the subscriber DB in this hosted environment
- Writes back feedback files to report outcome of the above
- Run several times per day by cron
Property | Value |
---|---|
Component type: | Java application |
Code location: | filer module in DataGenerator repository |
Arguments: | Configuration provided by resource file |
Processing steps:
- Connect to DDS SFTP server
- Download any new files and write to local disk
- Decrypt CSV files
- Apply CSV data to subscriber DB
- Write feedback files to SFTP server to report outcome of updating database
- Delete files
Enterprise Age Updater
- When the older “enterprise” DB schema is configured to de-identify patients, the patient’s age is stored as an integer in the database
- To keep the age value correct, the Enterprise Age Updater application is run daily, via cron, to re-calculate the age for patients in these databases
Property | Value |
---|---|
Component type: | Java application |
Code location: | utility-enterprise-age_updater module in Eds repository |
Arguments: | Configuration record name for the subscriber DB to be updated |
Processing steps:
- Finds all patients whose age will have changed since the last run date
- Calculates the new age for each patient
- Updates the subscriber DB with the new age
Population Health
- Population Health is a web application that provides a UI for performing analytics on a subscriber DB
- The intention is for customers to use this application to run reports and queries on their patient data
- This will mean customers can perform powerful analytics without needing direct database access
- Still essentially in beta
Property | Value |
---|---|
Component type: | Tomcat servlet and Angular4 frontend (will be migrated to Angular8) |
Code location: | enterprise-web module in Enterprise repository |
Arguments: | ID for config table record provided by web.xml in war file |
DDS-UI
- Same application as used to monitor inbound pipeline also has similar functionality for outbound pipeline
- Used to configure subscribing services
- Used to queue messages into RabbitMQ when new subscribers are added to DDS – publishes to the EdsProtocol queue to send existing published data through outbound transformation pipeline
Property | Value |
---|---|
Component type: | Tomcat servlet and Angular2 frontend (no plans to migrate to Angular8 as it’s an internal tool) |
Code location: | eds-ui module in eds repository |
Arguments: | none |