Outbound Pipeline: Difference between revisions
m (JoC moved page Outbound Transformation Pipeline to Outbound Pipeline) |
No edit summary |
||
Line 1: | Line 1: | ||
== Introduction == | == Introduction == | ||
DDS outbound | The DDS outbound pipeline takes published data, that is already standardised into FHIR, and transforms it to customer databases. | ||
'''Input''': FHIR resources, stored as JSON in ''' | '''Input''': FHIR resources, stored as JSON in '''EHR''' databases. | ||
'''Output''': Populated customer databases of patient data, some hosted internally in DDS environment and some hosted by third parties. | '''Output''': Populated customer databases of patient data, some hosted internally in DDS environment and some hosted by third parties. | ||
== Technical overview == | == Technical overview == |
Revision as of 09:23, 24 June 2021
Introduction
The DDS outbound pipeline takes published data, that is already standardised into FHIR, and transforms it to customer databases.
Input: FHIR resources, stored as JSON in EHR databases.
Output: Populated customer databases of patient data, some hosted internally in DDS environment and some hosted by third parties.
Technical overview
Multiple applications make up the outbound transformation pipeline:
- White boxes are DDS applications
- Arrows show communication method between components
- Not all databases shown
- Interaction with databases simplified
Databases
- Multiple databases are used in the outbound transformation pipeline
- Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance
- In production DDS instance, there are approx. 10 database instances
- Most database access layer code in EdsCore repository
Database Name | Platform | Description |
---|---|---|
Admin | MySQL | Stores details on services publishing to and subscribing from the DDS |
Audit | MySQL | Stores audit of published data and transformations to it |
Config | MySQL | Stores most configuration including database connection strings, logging, RabbitMQ routing |
Data_generator | MySQL | Staging area for sending transformed data to externally hosted databases |
Ehr | MySQL | Stores all patient and reference data in FHIR JSON. There is support for multiple ehr databases, with each publishing service configured to write to a specific ehr (many to one) |
Enterprise | MySQL | Original schema of the “subscriber” customer database for storing patient records in a relational format. Also known as “compass” or “compass v1”. There are multiple instances of this database, each for a specific customer. |
Subscriber | MySQL & SQL Server | Newer schema of the “subscriber” customer database. Also known as “compass v2”. There are multiple instances of this database, each for a specific customer. |
Subscriber_transform | MySQL | Stores persistent mappings between FHIR UUIDs from ehr database and record ID integers in enterprise and subscriber databases. Each enterprise and subscriber database has a corresponding subscriber_transform database. |
Subscriber database schemas
The term subscriber database is an umbrella term for databases of three different schemas:
Enterprise PI | Enterprise Psudeo | Subscriber |
---|---|---|
Original schema | Same schema as Enterprise PI except modified to support de-identified (pseudonymised) data only | Newest schema |
Supports only patient identifiable data | Schema only supports MySQL | Similar to the older ones, but with some revisions |
Schema only supports MySQL | Same transform code as Enterprise PI with de-identification handled by configuration | Supports both patient identifiable and de-identified data in same schema |
Transform code in org.endeavourhealth.transform.enterprise package of transform repository | Same schema works on both MySQL and SQL Server | |
Transform code in org.endeavourhealth.transform.subscriber package of transform repository |
In this document, any subscriber database mentioned may be any of the above three schemas, except when a subscriber database is hosted by a third party, in which case the database will always be the newest schema.
Application: Protocol queue reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Protocol Queue Reader
- Consumes messages from RabbitMQ EdsProtocol queue Determines which Sharing Agreements are relevant for new published data (i.e. which subscribers the new published data should go to)
- Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsProtocol queue (message contains metadata about new FHIR data for a specific patient)
- Determine which data sharing agreements the publishing organisation is part of
- For each protocol, determine if the new data for the patient falls under the data sharing agreement
- For each matching protocol, a message is published to the EdsTransform exchange (containing same metadata as consumed, plus the protocol ID)
- ACK message in EdsProtocol queue to remove it
Transform queue reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Transform Queue Reader
- Consumes messages from RabbitMQ EdsTransform queue
- Performs outbound FHIR to CSV transform on the new data, CSV mapping directly to tables in subscriber DB
- Stores resulting CSV data in holding table
- Publishes messages to RabbitMQ EdsSubscriber exchange
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository, but all transform logic in transforms repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsTransform queue (message contains metadata about new FHIR data for a specific patient and a protocol ID)
- Perform relevant FHIR to CSV transform for newly published data (FHIR retrieved from ehr DB), storing persistent mappings in subscriber transform DB
- Store CSV data (as base64 encoded zip file) in holding table
- Publish message to EdsSubscriber exchange (message containing same metadata as consumed, plus reference to the holding table record)
- ACK message in EdsTransform queue to remove it
Subscriber queue reader
- Same Queue Reader application as used for inbound pipeline, but configured to run as a Subscriber Queue Reader
- Consumes messages from RabbitMQ EdsSubscriber queue
- Writes transformed CSV data to subscriber DB where possible
- Stages transformed CSV data for sending to third party hosting otherwise
Property | Value |
---|---|
Component type: | Java application |
Code location: | eds-queuereader module in eds repository |
Arguments: | Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation. |
Processing steps:
- Consume message from EdsSubscriber queue (message contains metadata about new FHIR data for a specific patient, a protocol ID, and a reference to the transformed CSV data)
- Read CSV data from holding table
- Check if subscriber is locally hosted or remote
- If locally hosted, write CSV data directly to subscriber DB
- If remote hosted, write CSV data to staging table for sending externally
- ACK message in EdsSubscriber queue to remove it
Database file sender
- When subscriber DB is remotely hosted, this application is used to make the transformed CSV data available over SFTP
- Transformed CSV is encrypted and written to directory on SFTP server for download by external party
- Run multiple times per day by cron
Property | Value |
---|---|
Component type: | Java application |
Code location: | cegdatabasefilesender module in DataGenerator repository |
Arguments: | “sending” as argument to run the application in sending mode
“feedback” as argument to run the application in feedback mode |
Processing steps:
- Retrieve the new CSV data in the staging table
- PGP encrypt the data
- Encrypted data written to folder on SFTP server
- Updated staging table to record data has been sent for filing
- Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table
Remote server filer
- When subscriber DB is remotely hosted, this application is used by the third party in their hosted environment
- Downloads CSV data from the DDS SFTP server
- Applies the CSV data to the subscriber DB in this hosted environment
- Writes back feedback files to report outcome of the above
- Run several times per day by cron
Property | Value |
---|---|
Component type: | Java application |
Code location: | filer module in DataGenerator repository |
Arguments: | Configuration provided by resource file |
Processing steps:
- Connect to DDS SFTP server
- Download any new files and write to local disk
- Decrypt CSV files
- Apply CSV data to subscriber DB
- Write feedback files to SFTP server to report outcome of updating database
- Delete files
Enterprise age updater
- When the older “enterprise” DB schema is configured to de-identify patients, the patient’s age is stored as an integer in the database
- To keep the age value correct, the Enterprise Age Updater application is run daily, via cron, to re-calculate the age for patients in these databases
Property | Value |
---|---|
Component type: | Java application |
Code location: | utility-enterprise-age_updater module in Eds repository |
Arguments: | Configuration record name for the subscriber DB to be updated |
Processing steps:
- Finds all patients whose age will have changed since the last run date
- Calculates the new age for each patient
- Updates the subscriber DB with the new age
Population Health
- Population Health is a web application that provides a UI for performing analytics on a subscriber DB
- The intention is for customers to use this application to run reports and queries on their patient data
- This will mean customers can perform powerful analytics without needing direct database access
- Still essentially in beta
Property | Value |
---|---|
Component type: | Tomcat servlet and Angular4 frontend (will be migrated to Angular8) |
Code location: | enterprise-web module in Enterprise repository |
Arguments: | ID for config table record provided by web.xml in war file |
DDS-UI
- Same application as used to monitor inbound pipeline also has similar functionality for outbound pipeline
- Used to configure subscribing services
- Used to queue messages into RabbitMQ when new subscribers are added to DDS – publishes to the EdsProtocol queue to send existing published data through outbound transformation pipeline
Property | Value |
---|---|
Component type: | Tomcat servlet and Angular2 frontend (no plans to migrate to Angular8 as it’s an internal tool) |
Code location: | eds-ui module in eds repository |
Arguments: | none |