Difference between revisions of "Outbound Pipeline"

From Discovery Data Service
Jump to navigation Jump to search
 
(3 intermediate revisions by the same user not shown)
Line 1: Line 1:
 
== Introduction ==
 
== Introduction ==
The DDS outbound pipeline takes published data, that is already standardised into FHIR, and transforms it to customer databases.
+
The DDS Outbound Pipeline takes published data from the [[Inbound Pipeline]], that is already standardised into FHIR, and transforms it to customer databases.
  
 
'''Input''': FHIR resources, stored as JSON in '''EHR''' databases.
 
'''Input''': FHIR resources, stored as JSON in '''EHR''' databases.
Line 15: Line 15:
  
 
[[File:Outbound Pipeline Overview.png|border|850x850px]]
 
[[File:Outbound Pipeline Overview.png|border|850x850px]]
 
<br />
 
  
 
== Databases ==
 
== Databases ==
  
* Multiple databases are used in the outbound transformation pipeline
+
* Multiple databases are used in the Outbound Pipeline
 
* Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance
 
* Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance
 
* In production DDS instance, there are approx. 10 database instances
 
* In production DDS instance, there are approx. 10 database instances
Line 94: Line 92:
 
== Application: Protocol queue reader ==
 
== Application: Protocol queue reader ==
  
* Same Queue Reader application as used for inbound pipeline, but configured to run as a Protocol Queue Reader
+
* Same Queue Reader application as used for [[Inbound Pipeline]], but configured to run as a Protocol Queue Reader
* Consumes messages from RabbitMQ EdsProtocol queue Determines which Sharing Agreements are relevant for new published data (i.e. which subscribers the new published data should go to)
+
* Consumes messages from RabbitMQ EdsProtocol queue  
 +
*Determines which Sharing Agreements are relevant for new published data (which subscribers the new published data should go to)
 
* Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement
 
* Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement
  
[[File:Protocol Queue Reader.jpg|frameless|400x400px]]
+
[[File:Protocol queue reader.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 120: Line 119:
 
# ACK message in EdsProtocol queue to remove it
 
# ACK message in EdsProtocol queue to remove it
  
[[File:Protocol Queue Reader 2.jpg|frameless|500x500px]]
+
[[File:Protocol queue reader details.png|border]]
  
 
== Transform queue reader ==
 
== Transform queue reader ==
Line 130: Line 129:
 
* Publishes messages to RabbitMQ EdsSubscriber exchange
 
* Publishes messages to RabbitMQ EdsSubscriber exchange
  
[[File:Transform Queue Reader.jpg|frameless|400x400px]]
+
[[File:Transform queue reader.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 152: Line 151:
 
# ACK message in EdsTransform queue to remove it
 
# ACK message in EdsTransform queue to remove it
  
[[File:Transform Queue Reader 2.jpg|frameless|500x500px]]
+
[[File:Transform queue reader details.png|border]]
  
 
== Subscriber queue reader ==
 
== Subscriber queue reader ==
Line 161: Line 160:
 
* Stages transformed CSV data for sending to third party hosting otherwise
 
* Stages transformed CSV data for sending to third party hosting otherwise
  
[[File:Subscriber Queue Reader 1.jpg|frameless|400x400px]]
+
[[File:Subscriber queue reader.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 184: Line 183:
 
# ACK message in EdsSubscriber queue to remove it
 
# ACK message in EdsSubscriber queue to remove it
  
[[File:Subscriber Queue Reader 2.jpg|frameless|500x500px]]
+
[[File:Subscriber queue reader details.png|border]]
  
 
== Database file sender ==
 
== Database file sender ==
Line 192: Line 191:
 
* Run multiple times per day by cron
 
* Run multiple times per day by cron
  
[[File:Database File Sender 1.jpg|frameless|450x450px]]
+
[[File:Database file sender.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 216: Line 215:
 
# Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table
 
# Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table
  
[[File:Database File Sender 2.jpg|frameless|500x500px]]
+
[[File:Database file sender details.png|border]]
 +
 
 +
<br />
  
 
== Remote server filer ==
 
== Remote server filer ==
Line 226: Line 227:
 
* Run several times per day by cron
 
* Run several times per day by cron
  
[[File:Remote server filer 1.jpg|frameless]]
+
[[File:Remote server filer.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 249: Line 250:
 
# Delete files
 
# Delete files
  
[[File:Remote server filer 2.jpg|frameless|500x500px]]
+
[[File:Remote server filer details.png|border]]
  
 
== Enterprise age updater ==
 
== Enterprise age updater ==
Line 256: Line 257:
 
* To keep the age value correct, the '''Enterprise Age Updater''' application is run daily, via cron, to re-calculate the age for patients in these databases
 
* To keep the age value correct, the '''Enterprise Age Updater''' application is run daily, via cron, to re-calculate the age for patients in these databases
  
[[File:Enterprise Age Updater.jpg|frameless|300x300px]]
+
[[File:Enterprise age updater.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property
Line 276: Line 277:
 
# Updates the subscriber DB with the new age
 
# Updates the subscriber DB with the new age
  
[[File:Enterprise Age Updater 2.jpg|frameless|450x450px]]
+
[[File:Enterprise age updater details.png|border]]
  
 
== Population Health ==
 
== Population Health ==
Line 285: Line 286:
 
* Still essentially in beta
 
* Still essentially in beta
  
[[File:Population Health.jpg|frameless]]
+
[[File:Population health.png|border]]
 
{| class="wikitable"
 
{| class="wikitable"
 
!Property
 
!Property

Latest revision as of 13:30, 24 June 2021

Introduction

The DDS Outbound Pipeline takes published data from the Inbound Pipeline, that is already standardised into FHIR, and transforms it to customer databases.

Input: FHIR resources, stored as JSON in EHR databases.

Output: Populated customer databases of patient data, some hosted internally in DDS environment and some hosted by third parties.

Technical overview

Multiple applications make up the outbound transformation pipeline:

  • White boxes are DDS applications
  • Arrows show communication method between components
  • Not all databases shown
  • Interaction with databases simplified

Outbound Pipeline Overview.png

Databases

  • Multiple databases are used in the Outbound Pipeline
  • Architecture is designed to avoid cross-database joins, so databases do not need to be co-located on the same instance
  • In production DDS instance, there are approx. 10 database instances
  • Most database access layer code in EdsCore repository
Database Name Platform Description
Admin MySQL Stores details on services publishing to and subscribing from the DDS
Audit MySQL Stores audit of published data and transformations to it
Config MySQL Stores most configuration including database connection strings, logging, RabbitMQ routing
Data_generator MySQL Staging area for sending transformed data to externally hosted databases
Ehr MySQL Stores all patient and reference data in FHIR JSON. There is support for multiple ehr databases, with each publishing service configured to write to a specific ehr (many to one)
Enterprise MySQL Original schema of the “subscriber” customer database for storing patient records in a relational format. Also known as “compass” or “compass v1”. There are multiple instances of this database, each for a specific customer.
Subscriber MySQL & SQL Server Newer schema of the “subscriber” customer database. Also known as “compass v2”. There are multiple instances of this database, each for a specific customer.
Subscriber_transform MySQL Stores persistent mappings between FHIR UUIDs from ehr database and record ID integers in enterprise and subscriber databases. Each enterprise and subscriber database has a corresponding subscriber_transform database.

Subscriber database schemas

The term subscriber database is an umbrella term for databases of three different schemas:

Enterprise PI Enterprise Psudeo Subscriber
Original schema Same schema as Enterprise PI except modified to support de-identified (pseudonymised) data only Newest schema
Supports only patient identifiable data Schema only supports MySQL Similar to the older ones, but with some revisions
Schema only supports MySQL Same transform code as Enterprise PI with de-identification handled by configuration Supports both patient identifiable and de-identified data in same schema
Transform code in org.endeavourhealth.transform.enterprise package of transform repository Same schema works on both MySQL and SQL Server
Transform code in org.endeavourhealth.transform.subscriber package of transform repository

In this document, any subscriber database mentioned may be any of the above three schemas, except when a subscriber database is hosted by a third party, in which case the database will always be the newest schema.

Application: Protocol queue reader

  • Same Queue Reader application as used for Inbound Pipeline, but configured to run as a Protocol Queue Reader
  • Consumes messages from RabbitMQ EdsProtocol queue
  • Determines which Sharing Agreements are relevant for new published data (which subscribers the new published data should go to)
  • Publishes results to RabbitMQ EdsTransform exchange, with one message per Sharing Agreement

Protocol queue reader.png

Property Value
Component type: Java application
Code location: eds-queuereader module in eds repository
Arguments: Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation.

Processing steps:

  1. Consume message from EdsProtocol queue (message contains metadata about new FHIR data for a specific patient)
  2. Determine which data sharing agreements the publishing organisation is part of
  3. For each protocol, determine if the new data for the patient falls under the data sharing agreement
  4. For each matching protocol, a message is published to the EdsTransform exchange (containing same metadata as consumed, plus the protocol ID)
  5. ACK message in EdsProtocol queue to remove it

Protocol queue reader details.png

Transform queue reader

  • Same Queue Reader application as used for inbound pipeline, but configured to run as a Transform Queue Reader
  • Consumes messages from RabbitMQ EdsTransform queue
  • Performs outbound FHIR to CSV transform on the new data, CSV mapping directly to tables in subscriber DB
  • Stores resulting CSV data in holding table
  • Publishes messages to RabbitMQ EdsSubscriber exchange

Transform queue reader.png

Property Value
Component type: Java application
Code location: eds-queuereader module in eds repository, but all transform logic in transforms repository
Arguments: Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation.

Processing steps:

  1. Consume message from EdsTransform queue (message contains metadata about new FHIR data for a specific patient and a protocol ID)
  2. Perform relevant FHIR to CSV transform for newly published data (FHIR retrieved from ehr DB), storing persistent mappings in subscriber transform DB
  3. Store CSV data (as base64 encoded zip file) in holding table
  4. Publish message to EdsSubscriber exchange (message containing same metadata as consumed, plus reference to the holding table record)
  5. ACK message in EdsTransform queue to remove it

Transform queue reader details.png

Subscriber queue reader

  • Same Queue Reader application as used for inbound pipeline, but configured to run as a Subscriber Queue Reader
  • Consumes messages from RabbitMQ EdsSubscriber queue
  • Writes transformed CSV data to subscriber DB where possible
  • Stages transformed CSV data for sending to third party hosting otherwise

Subscriber queue reader.png

Property Value
Component type: Java application
Code location: eds-queuereader module in eds repository
Arguments: Config ID – tells it what config record to use which determines which queue is consumed from and what type of Queue Reader it is. Note other arguments are supported for specific one-off tasks, but aren’t used for normal operation.

Processing steps:

  1. Consume message from EdsSubscriber queue (message contains metadata about new FHIR data for a specific patient, a protocol ID, and a reference to the transformed CSV data)
  2. Read CSV data from holding table
  3. Check if subscriber is locally hosted or remote
  4. If locally hosted, write CSV data directly to subscriber DB
  5. If remote hosted, write CSV data to staging table for sending externally
  6. ACK message in EdsSubscriber queue to remove it

Subscriber queue reader details.png

Database file sender

  • When subscriber DB is remotely hosted, this application is used to make the transformed CSV data available over SFTP
  • Transformed CSV is encrypted and written to directory on SFTP server for download by external party
  • Run multiple times per day by cron

Database file sender.png

Property Value
Component type: Java application
Code location: cegdatabasefilesender module in DataGenerator repository
Arguments: “sending” as argument to run the application in sending mode

“feedback” as argument to run the application in feedback mode

Processing steps:

  1. Retrieve the new CSV data in the staging table
  2. PGP encrypt the data
  3. Encrypted data written to folder on SFTP server
  4. Updated staging table to record data has been sent for filing
  5. Check for feedback files on SFTP server (sent by third party) to inform of successful application of previously sent data and update staging table

Database file sender details.png


Remote server filer

  • When subscriber DB is remotely hosted, this application is used by the third party in their hosted environment
  • Downloads CSV data from the DDS SFTP server
  • Applies the CSV data to the subscriber DB in this hosted environment
  • Writes back feedback files to report outcome of the above
  • Run several times per day by cron

Remote server filer.png

Property Value
Component type: Java application
Code location: filer module in DataGenerator repository
Arguments: Configuration provided by resource file

Processing steps:

  1. Connect to DDS SFTP server
  2. Download any new files and write to local disk
  3. Decrypt CSV files
  4. Apply CSV data to subscriber DB
  5. Write feedback files to SFTP server to report outcome of updating database
  6. Delete files

Remote server filer details.png

Enterprise age updater

  • When the older “enterprise” DB schema is configured to de-identify patients, the patient’s age is stored as an integer in the database
  • To keep the age value correct, the Enterprise Age Updater application is run daily, via cron, to re-calculate the age for patients in these databases

Enterprise age updater.png

Property Value
Component type: Java application
Code location: utility-enterprise-age_updater module in Eds repository
Arguments: Configuration record name for the subscriber DB to be updated

Processing steps:

  1. Finds all patients whose age will have changed since the last run date
  2. Calculates the new age for each patient
  3. Updates the subscriber DB with the new age

Enterprise age updater details.png

Population Health

  • Population Health is a web application that provides a UI for performing analytics on a subscriber DB
  • The intention is for customers to use this application to run reports and queries on their patient data
  • This will mean customers can perform powerful analytics without needing direct database access
  • Still essentially in beta

Population health.png

Property Value
Component type: Tomcat servlet and Angular4 frontend (will be migrated to Angular8)
Code location: enterprise-web module in Enterprise repository
Arguments: ID for config table record provided by web.xml in war file

DDS-UI

  • Same application as used to monitor inbound pipeline also has similar functionality for outbound pipeline
  • Used to configure subscribing services
  • Used to queue messages into RabbitMQ when new subscribers are added to DDS – publishes to the EdsProtocol queue to send existing published data through outbound transformation pipeline

DDS-UI.png

Property Value
Component type: Tomcat servlet and Angular2 frontend (no plans to migrate to Angular8 as it’s an internal tool)
Code location: eds-ui module in eds repository
Arguments: none