OMOP 转换:自定义映射示例

本文提供了如何通过自定义映射和配置来扩展 OMOP(观察性医疗结果伙伴关系) Common Data Model (CDM) 的示例。 这些示例将指导您完成添加新表、列和更新 OMOP 框架中的现有映射等场景。 有关如何创建这些自定义映射的概述,请参阅在医疗保健数据解决方案中自定义和扩展 OMOP 映射

先决条件

按照自定义和扩展 OMOP 映射中的步骤复制配置文件并更新库以使用自定义配置文件而不是默认配置文件。

示例 1:向表中添加列

将列对象添加到表的列集合时,可以复制现有列并根据需要进行调整。 如果表已存在,则库不会修改其架构。 因此,您需要使用 Spark SQL 脚本将列添加到表中。 同样,如果数据已加载到目标中,并且您添加了新列,则必须重新加载数据,因为没有自动回填。

在此示例中,让我们向 OMOP PERSON 表添加一个新列 custom_myid,来存储 FHIR 患者标识符集合中的值。

  1. 更新 dbTargetSchema.json 文件中的自定义列。

    {
      "name": "custom_myid",
      "originDataTypeName": {
        "typeName": "string",
        "length": 50,
        "properties": {
          "minValue": null,
          "maxValue": null,
          "description": "Custom myid"
        }
      },
      "isNullable": true
    }
    
  2. 更新 dmfAdapter.json 文件,将患者标识符集合中的第一个标识符(其中系统是 Synthea 系统)映射到新列。

    {
      "fieldName": "custom_myid_alias",
      "fieldCalculatedValue": "try_element_at(filter(patient.identifier, x -> x.system == 'https://github.com/synthetichealth/synthea'), 1).value",
      "enabled": true,
      "fieldType": "string",
      "targetFields": {
        "fields": [
          {
            "tableName": "person",
            "fieldName": "custom_myid"
          }
        ]
      }
    }
    
  3. 在医疗保健数据解决方案环境中创建新笔记本,并添加以下 Spark SQL 脚本。 对 OMOP 湖屋运行此脚本以使用新列修改 PERSON 表。

    ALTER table healthcare1_msft_gold_omop.PERSON ADD columns (custom_myid string);

  4. 在管理员湖屋文件中,更新 deploymentParametersConfiguration.json 文件以将 omop_config_path 参数设置为自定义映射文件的路径。

    例如:

    "omop_config_path": "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/healthcare1_msft_gold_omop.Lakehouse/Files/Mappings"

  5. 保存所有 JSON 文件以将更改提交到 Fabric 工作区。

  6. 运行 OMOP 笔记本/管道。

示例 2:添加新表

您可以复制现有表对象并根据需要对其进行修改。 但是,如果需要添加任何关系,请确保更新表对象上的关系集合。 有关详细信息,请查看 dbTargetSchema.json 中的关系属性。

添加表时,请包含以下列,以保持与 dmfAdapter.json 文件中其他表及其映射的一致性。

  • msftSourceTableName:存储主要参与此 OMOP 记录的银牌行的域资源类型。
  • msftModifiedDateTime:存储将行更新为银牌湖屋的时间戳。
  • msftSourceRecordId:存储主要负责此行的银牌域资源的 ID。

例如,我们添加一个表来存储 DiagnosticReport

  1. 使用 DiagnosticReport 的表更新 dbTargetSchema.json

    {
      "namespace": {
        "databaseName": ""
      },
      "tableType": "EXTERNAL",
      "storageDescriptor": {
        "columns": [
          {
            "name": "custom_diagnostic_report_id",
            "originDataTypeName": {
           "typeName": "bigint",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "A unique identifier for the DiagnosticReport record."
              },
              "isNullable": false
            }
          },
          {
            "name": "custom_person_id",
            "originDataTypeName": {
              "typeName": "bigint",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "The person record this Diagnostic Report is related to."
              },
              "isNullable": false
            }
          },
          {
            "name": "custom_conclusion",
            "originDataTypeName": {
              "typeName": "string",
              "length": 1000,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "Holds the conclusion of the Diagnostic Report."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftSourceRecordId",
            "originDataTypeName": {
              "typeName": "string",
              "length": 50,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "This field is used to store the original record id from the source data. It is not intended for use in standard analytics and is for reference only."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftSourceTableName",
            "originDataTypeName": {
              "typeName": "string",
              "length": 50,
              "properties": {
                "minValue": null,
                "maxValue": null,
                "description": "This field is used to store the original table name from the source data. It is not intended for use in standard analytics and is for reference only."
              },
              "isNullable": true
            }
          },
          {
            "name": "msftModifiedDatetime",
            "originDataTypeName": {
              "typeName": "timestamp",
              "properties": {
                "minValue": null,
                "maxValue": null,
                "timestampFormat": "yyyy-MM-ddTHH:mm:ssZ",
                "description": ""
              },
              "isNullable": true
            }
          }
        ]
      },
      "name": "custom_diagnostic_report",
      "entityType": "TABLE",
      "properties": {
        "businessArea": "",
        "path": "custom_diagnostic_report.cdm.json/custom_diagnostic_report",
        "description": "Holds basic info for the diagnostic report.",
        "displayName": "custom_diagnostic_report",
        "isDay0Entity": "False",
        "fromBusinessAreas": "",
        "primaryKeys": "custom_diagnostic_report_id",
        "industries": "",
        "relationships": "[{\"joinPairs\": [{\"fromAttribute\": \"custom_person_id\", \"toAttribute\": \"person_id\"}], \"fromEntity\": \"custom_diagnostic_report.cdm.json/custom_diagnostic_report\", \"toEntity\": \"person.cdm.json/person\"}]"
      }
    }
    
  2. 更新 dmfAdapter.json 以包含基本诊断报告映射。

    1. DiagnosticReport 添加到查询表中。

      {
      "name": "DiagnosticReport"
      }
      
    2. 添加具有基本诊断报告映射的新源表。

      {
          "tableName": "DiagnosticReport",
          "description": "diagnostic report",
          "query": "select *, msftModifiedDatetime as d_msftModifiedDatetime from DiagnosticReport",
          "modifiedonField": "msftModifiedDatetime",
          "targetAnchorTables": [
            {
              "tableName": "custom_diagnostic_report"
            }
          ],
          "enabled": true,
          "sourceFields": [
            {
              "fieldName": "id",
              "fieldType": "string",
              "isPrimaryKey": true,
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "custom_diagnostic_report_id"
                  },
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftSourceRecordId"
                  }
                ]
              }
            },
            {
              "fieldName": "conclusion",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "custom_conclusion"
                  }
                ]
              }
            },
            {
              "fieldName": "subject_id",
              "fieldCalculatedValue": "subject.id",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "person",
                    "fieldName": "person_id",
                    "targetField": "custom_person_id"
                  }
                ]
              }
            },
            {
              "fieldName": "d_msftModifiedDatetime",
              "fieldType": "datetime",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftModifiedDatetime"
                  }
                ]
              }
            },
            {
              "fieldName": "resourceType",
              "fieldType": "string",
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "custom_diagnostic_report",
                    "fieldName": "msftSourceTableName"
                  }
                ]
              }
            }
          ]
      }
      
  3. 更新 dbTargetSchemaConfig.json 并在 SourceModifiedOnSourceTable 的字段属性中包括 custom_diagnostic_report

    [
          {
            "name": "SourceModifiedOn",
            "description": "Used for comparing with source system records to know when a record was changed in the source.",
            "tables": [
              "person",
              "observation_period",
              "visit_occurrence",
              "visit_detail",
              "condition_occurrence",
              "drug_exposure",
              "procedure_occurrence",
              "device_exposure",
              "measurement",
              "observation",
              "death",
              "note",
              "specimen",
              "location",
              "provider",
              "care_site",
              "note_nlp",
              "image_occurrence",
              "custom_diagnostic_report"
            ],
            "type": "timestamp",
            "timestampFormat": "yyyy-MM-ddTHH:mm:ssZ",
            "enabled": true
          },
          {
            "name": "SourceTable",
            "description": "Used for comparing with source system records to know when a record was changed in the source.",
            "tables": [
              "person",
              "observation_period",
              "visit_occurrence",
              "visit_detail",
              "condition_occurrence",
              "drug_exposure",
              "procedure_occurrence",
              "device_exposure",
              "measurement",
              "observation",
              "death",
              "note",
              "specimen",
              "location",
              "provider",
              "care_site",
              "note_nlp",
              "image_occurrence",
              "custom_diagnostic_report"
            ],
            "type": "string",
            "enabled": true
          }
    ]
    
  4. 在管理员湖屋文件中,更新 deploymentParametersConfiguration.json 文件以将 omop_config_path 参数设置为自定义映射文件的路径。

    例如:

    "omop_config_path": "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/healthcare1_msft_gold_omop.Lakehouse/Files/Mappings"

  5. 保存所有 JSON 文件以将更改提交到 Fabric 工作区。

  6. 运行 OMOP 笔记本/管道。

域映射

OMOP 中的许多表都有一个主要用于分析的概念列,例如 observation_concept_idprocedure_concept_idmeasurement_concept_id。 使用源表时,可以使用此主分析列的域来确定要写入的目标表。 例如,FHIR observation.code 可以映射到 OMOP 词汇。 根据域的不同,此 FHIR 观察可能映射到 OMOP 观察、度量、条件或过程。 若要将基于域的映射合并到 OMOP,请按照本节的步骤操作。

使用快捷方式将金牌湖屋表 fhir_system_to_omop_vocab_mappingconcept 映射到银牌湖屋。 此快捷方式允许库从源湖屋访问相关词汇表。

使用以下更改更新 dmfAdapter.json 文件:

  1. fhir_system_to_omop_vocab_mappingconcept 添加到 queryTables 部分。

  2. 对于需要域映射的 sourceTables,请按照下列步骤作:

    1. 修改域需要在 SELECT 语句中将 c.domain_id 返回为 omop_domain 的查询。

    2. 更新需要域在初始 FROM 子句后再包含两个 SQL LEFT JOIN 语句的查询。 下面是几个示例,在这些示例中,您可以假定源表是观察表,别名为 o

      1. left join fhir_system_to_omop_vocab_mapping fhiromop on o.code.coding[0].system = fhiromop.fhir_uri
      2. left join concept c on c.concept_code = o.code.coding[0].code
    3. 此观察的查询应为:

        SELECT
            o.*,
            o.id AS o_id,
            p.id AS p_id,
            e.id AS encounter_id,
            c.domain_id AS omop_domain
        FROM Observation o
        LEFT JOIN fhir_system_to_omop_vocab_mapping fhiromop
            ON o.code.coding[0].system = fhiromop.fhir_uri
        LEFT JOIN concept c
            ON c.concept_code = o.code.coding[0].code
            AND fhiromop.vocabulary_id = c.vocabulary_id
        LEFT JOIN (
            SELECT
                MAX(child.id) AS id,
                child_i.value AS value,
                child_i.system AS system,
                child_i.type.coding.code AS type_code
            FROM Patient child
            LATERAL VIEW EXPLODE(identifier) AS child_i
            GROUP BY child_i.value, child_i.system, child_i.type.coding.code
        ) p
        ON p.value = o.subject.identifier.value
            AND (p.system = o.subject.identifier.system OR (p.system IS NULL AND o.subject.identifier.system IS NULL))
            AND (p.type_code = o.subject.identifier.type.coding.code OR (p.type_code IS NULL AND o.subject.identifier.type.coding.code IS NULL))
        LEFT JOIN (
            SELECT
                MAX(enc.id) AS id,
                enc_i.value AS value,
                enc_i.system AS system,
                enc_i.type.coding.code AS type_code
            FROM Encounter enc
            LATERAL VIEW EXPLODE(identifier) AS enc_i
            GROUP BY enc_i.value, enc_i.system, enc_i.type.coding.code
        ) e
        ON e.value = o.encounter.identifier.value
            AND (e.system = o.encounter.identifier.system OR (e.system IS NULL AND o.encounter.identifier.system IS NULL))
            AND (e.type_code = o.encounter.identifier.type.coding.code OR (e.type_code IS NULL AND o.encounter.identifier.type.coding.code IS NULL));
      
    4. 对于主键的 sourceFields 映射,包含更多字段以容纳潜在的目标表。 每个字段映射都必须包含一个条件,用于确保基于域进行映射。 如果不满足主键条件,则不会将记录写入目标表。 例如,可以修改观察字段映射,以允许根据域映射到观察、度量或过程。

      {
          "sourceFields": [
            {
              "fieldName": "o_id",
              "fieldType": "string",
              "isPrimaryKey": true,
              "enabled": true,
              "targetFields": {
                "fields": [
                  {
                    "tableName": "observation",
                    "fieldName": "observation_id",
                    "condition": "subject.type = 'Patient' and isnull(omop_domain) or (omop_domain != 'Measurement' and omop_domain != 'Procedure')"
                  },
                  {
                    "tableName": "measurement",
                    "fieldName": "measurement_id",
                    "condition": "subject.type = 'Patient' and omop_domain == 'Measurement'"
                  },
                  {
                    "tableName": "procedure_occurrence",
                    "fieldName": "procedure_occurrence_id",
                    "condition": "subject.type = 'Patient' and omop_domain == 'Procedure'"
                  },
                  { "tableName": "observation", "fieldName": "msftSourceRecordId" },
                  { "tableName": "measurement", "fieldName": "msftSourceRecordId" },
                  { "tableName": "procedure_occurrence", "fieldName": "msftSourceRecordId" }
                ]
              }
            }
          ]
      }
      
    5. 将任何必要的 sourceField 映射添加到新的目标表。 例如,您可能需要将 FHIR observation.effectiveDateTime 映射到 OMOP 的 procedure_occurrence.procedure_datetime

故障排除

查看 OMOP 笔记本单元格输出以诊断执行问题。 还可以查看 Azure Log Analytics 获取详细的库日志。

警告

首次运行期间可能会出现以下警告。 这些警告是预期之内的,因为尚未创建目标表。

  • Could not read table from 'abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/d5dd0e6f-0d4a-4d66-8de7-d80312418f52/DMHCheckpoint/dtt/dtt_state_db/KEY_MAPPING/CUSTOM_DIAGNOSTIC_REPORT_ID_MAPPING', error message: [PATH_NOT_FOUND] Path does not exist: abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/d5dd0e6f-0d4a-4d66-8de7-d80312418f52/DMHCheckpoint/dtt/dtt_state_db/KEY_MAPPING/CUSTOM_DIAGNOSTIC_REPORT_ID_MAPPING. Traceback: <traceback object at 0x793a44b03500>

  • Could not read table from 'abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>/Tables/custom_diagnostic_report', error message: [PATH_NOT_FOUND] Path does not exist: abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>/Tables/custom_diagnostic_report. Traceback: <traceback object at 0x793a3c5c2780>