Data Parsingn Pipeline
This pipeline will parse the extracted data and convert it into data structures compatible with the CaseOLAP pipeline.
Installation of required python package : Install and import lxml, itertools
and json
libraries into the current python environment.
import re import itertools import json import sys import os import time import traceback from lxml import etree
Set up output data dir
DATA_DIR = './'
MeSH statistics dictionary
mesh_statistics = {}
Data parsing strategy : The extracted data is an XML
file, and text data is embedded in the tree structure of XML
document. The following are steps for parsing data:
- Implement the etree functionality in lxml
module to dig into the tree structure of XML
document.
- The separate components e.g. PMID, authors, abstract
, MeSH
etc. of the data is obtained by using tags representing these components.
- Implement the chain functionality in itertools to creates an iterator that returns elements from the iterables which was obtained by implementing etree functionality in lxml
.
# Search the tag in the xml element # Return tag's text if tag exists, return empty string if doesn't def get_text(element, tag): e = element.find(tag) if e is not None: return e.text else: return '' # <!ELEMENT AuthorList (Author+) > # <!ELEMENT Author (((LastName, ForeName?, Initials?, Suffix?)\ # | CollectiveName), Identifier*, AffiliationInfo*) > def parse_author(authors): result = [] for author in authors: item = {} item['LastName'] = get_text(author, 'LastName') item['ForeName'] = get_text(author, 'ForeName') item['Initials'] = get_text(author, 'Initials') item['Suffix'] = get_text(author, 'Suffix') item['CollectiveName'] = get_text(author, 'CollectiveName') result.append(item) return result
Creation of dictionary of parsed data : A python dictionary is created with all the components as key-value pair. This JSON-like data structure makes it compatible for indexing and searching in Elasticsearch which is described in step 3 of protocol.
Creation of MeSH to PMID mapping: During the creation of dictionary of parsed data, MeSH to PMID mapping table can also be created. This mapping is used to create Text-cube(in step 4) document structure as an requirement of CaseOLAP algorithm.
def parse_pubmed_file(file, pubmed_output_file, pmid2mesh_output_file): print('Start parsing %s' % file) sys.stdout.flush() t1 = time.time() f = open(file, 'r') tree = etree.parse(f) articles = itertools.chain(tree.findall('PubmedArticle'),\ tree.findall('BookDocument')) count = 0 noabs = 0 for article in articles: count += 1 result = {} pmid2mesh = {} # PMID - Exactly One Occurrance result['PMID'] = get_text(article, './/PMID') pmid2mesh['PMID'] = get_text(article, './/PMID') # # Article title - Zero or One Occurrences # result['ArticleTitle'] = get_text(article, './/ArticleTitle') # Abstract - Zero or One Occurrences abstractList = article.find('.//Abstract') if abstractList != None: try: abstract = '\n'.join([line.text for line in abstractList.\ findall('AbstractText')]) result['Abstract'] = abstract except: result['Abstract'] = '' noabs += 1 else: result['Abstract'] = '' noabs += 1 # # Author List - Zero or More Occurrences # authors = article.findall('.//Author') # result['AuthorList'] = parse_author(authors) # # Journal - Exactly One Occurrance # journal = article.find('.//Journal') # result['Journal'] = get_text(journal, 'Title') result['PubDate'] = {} result['PubDate']['Year'] = get_text(journal,\ 'JournalIssue/PubDate/Year') # result['PubDate']['Month'] = get_text(journal,\ # 'JournalIssue/PubDate/Month') # result['PubDate']['Day'] = get_text(journal,\ # 'JournalIssue/PubDate/Day') # result['PubDate']['Season'] = get_text(journal,\ # 'JournalIssue/PubDate/Season') # result['PubDate']['MedlineDate'] = get_text(journal,\ # 'JournalIssue/PubDate/MedlineDate') # MeshHeading - Zero or More Occurrences headings = article.findall('.//MeshHeading') result['MeshHeadingList'] = [] pmid2mesh['MeshHeadingList'] = [] if headings: for heading in headings: descriptor_names = heading.findall('DescriptorName') qualifier_names = heading.findall('QualifierName') if descriptor_names: for descriptor_name in descriptor_names: result['MeshHeadingList'].append(descriptor_name.text) pmid2mesh['MeshHeadingList'].append(descriptor_name.text) if qualifier_names: for qualifier_name in qualifier_names: result['MeshHeadingList'].append(qualifier_name.text) pmid2mesh['MeshHeadingList'].append(qualifier_name.text) mesh_count = len(result['MeshHeadingList']) if mesh_count in mesh_statistics: mesh_statistics[mesh_count] += 1 else: mesh_statistics[mesh_count] = 1 # Dump to pubmed json file json.dump(result, pubmed_output_file) pubmed_output_file.write('\n') # Dump to pmid2mesh json file json.dump(pmid2mesh, pmid2mesh_output_file) pmid2mesh_output_file.write('\n') print('Finish parsing %s, totally %d articles parsed. Total time: %fs'\ % (file, count, time.time() - t1)) print('%d acticles no abstracts' % (noabs)) sys.stdout.flush() f.close()
Setting up directories in parsing loop
def parse_dir(source_dir, pubmed_output_file,pmid2mesh_output_file): if os.path.isdir(source_dir): for file in os.listdir(source_dir): if re.search(r'^pubmed18n\d\d\d\d.xml$', file) is not None: try: parse_pubmed_file(os.path.join(source_dir, file),\ pubmed_output_file, pmid2mesh_output_file) except: print("XXXX Unexpected error happended when parsing %s XXXX" % file) print(traceback.print_exc()) sys.stdout.flush()
Run the Parsing Pipeline
t1 = time.time() pubmed_output_file_path = os.path.join(DATA_DIR, 'data/pubmed.json') pmid2mesh_output_file_path = os.path.join(DATA_DIR, 'pmid2mesh/pmid2mesh_from_parsing.json') pubmed_output_file = open(pubmed_output_file_path, 'w') pmid2mesh_output_file = open(pmid2mesh_output_file_path, 'w') parse_dir(os.path.join(DATA_DIR, 'ftp.ncbi.nlm.nih.gov/pubmed/baseline'),\ pubmed_output_file, pmid2mesh_output_file) parse_dir(os.path.join(DATA_DIR, 'ftp.ncbi.nlm.nih.gov/pubmed/updatefiles'),\ pubmed_output_file, pmid2mesh_output_file) pubmed_output_file.close() pmid2mesh_output_file.close() mesh_file = open(os.path.join(DATA_DIR, 'data/mesh_statistics.json'), 'w') json.dump(mesh_statistics, mesh_file) mesh_file.close() print("==== Parsing finished, results dumped to %s ====" % pubmed_output_file_path) print("==== TOTAL TIME: %fs ====" % (time.time() - t1))
MeSH to PMID Mapping
inputFilePath = "data/pubmed.json" meshFilePath = "mesh2pmid/" mesh2pmid_output_file = open(meshFilePath + 'mesh2pmid.json', "w") mesh2pmid = dict() with open(inputFilePath, "r") as fin: start = time.time() k = 0 for line in fin: ## each line is single document try: k = k+1 paperInfo = json.loads(line.strip()) data_dict = {} # update PMID data_dict["pmid"] = paperInfo.get("PMID", "-1") #update MeSH Heading data_dict["mesh_heading"] = " ".join(paperInfo["MeshHeadingList"]) # collect Mesh2PMID if data_dict["pmid"] != "-1": for mesh in paperInfo["MeshHeadingList"]: if mesh not in mesh2pmid: mesh2pmid[mesh] = [] mesh2pmid[mesh].append(data_dict["pmid"]) if k%500000 == 0: print(k,'done!') #break except: print("XXXX Unexpected Error happened at line: XXXX") # Dumping rest papers for key,value in mesh2pmid.items(): json.dump({key:value}, mesh2pmid_output_file) mesh2pmid_output_file.write('\n') mesh2pmid = dict() end = time.time() print("Finish Total escaped time %s (seconds) " % (end - start) )