Module cnvpytor.export
Source code
import json
from pathlib import Path
from .io import *
from .genome import *
_logger = logging.getLogger("cnvpytor.export")
class Wiggle:
def __init__(self, filename):
"""
creates bigwig file
Parameters
----------
filename : path
Path for the bigwig filename
"""
self.filename = filename
self.file = None
import pyBigWig
if not Path(filename).exists():
try:
self.file = pyBigWig.open(filename, 'w')
except IOError as e:
print("Unable to open file {}! Error: {}".format(filename, e))
except RuntimeError as e:
print("Unable to open file {}! Error: {}".format(filename, e))
else:
self.file = pyBigWig.open(filename)
def add_header_list(self, chr_len_list):
"""
Add header to the bigwig file
Parameters
----------
chr_len_list : list of tuple
chromosome name and length list.
Returns
-------
"""
self.file.addHeader(chr_len_list)
def add_fixedstep(self, chrom, position_int, value_list, span=1, step=1):
"""
Add fixed step formatted data
Parameters
----------
chrom : str
chromosome name
position_int : int
start position
value_list : list of values
input values
span : int
step : int
Returns
-------
"""
self.file.addEntries(chrom, position_int, values=value_list, span=span, step=step)
def get_cnvpytor_signal(self, md5, chrom, bin_size, signal, flag):
signal_details = md5.get_signal(chrom, bin_size, signal, flag)
return signal_details
def get_chrom_list(self, md5):
chr_len = md5.get_signal(None, None, "chromosome lengths")
chr_len_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int)))
return chr_len_list
def create_wig_offset_transform(self, md5, chr_list, bin_size, signal, flag, offset):
# add chr_list to add wig header
self.add_header_list(chr_list)
# add the data
for (chrom, length) in chr_list:
signal_details = md5.get_signal(chrom, bin_size, signal, flag)
if isinstance(signal_details, np.ndarray):
signal_value_list = signal_details[()]
signal_value_list[signal_value_list != 0] += offset
signal_value_list = np.absolute(signal_value_list)
self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
def create_wig(self, md5, chr_list, bin_size, signal, flag):
# add chr_list to add wig header
self.add_header_list(chr_list)
# add the data
for (chrom, length) in chr_list:
signal_details = md5.get_signal(chrom, bin_size, signal, flag)
if isinstance(signal_details, np.ndarray):
signal_value_list = signal_details[()]
self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
def __del__(self):
if self.file:
self.file.close()
class ExportJBrowse:
rd_signal_dct = {
"RD": {
"FLAG": [0, 0x0010],
"color": ["gray", "black"]
},
"RD partition": {
"FLAG": [0x0010],
"color": ["red"]
},
"RD call": {
"FLAG": [0x0010],
"color": ["green"]
}
}
snp_signal_dct = {
"SNP baf": {
"FLAG": [0x0100],
"color": ["gray"],
"nonCont": [True],
},
"SNP i1": {
"FLAG": [0x0100, 0x0100],
"color": ["red", "red"],
"nonCont": [True, True],
"offset": [0.5, -0.5]
},
}
signal_dct = {
"RD": "his_rd_p_%(bin_size)d%(rd_flag)s",
"RD partition": "his_rd_p_%(bin_size)d_partition%(rd_flag)s",
"RD call": "his_rd_p_%(bin_size)d_partition%(rd_flag)s_merge",
"SNP baf": "snp_baf_%(bin_size)d%(snp_flag)s",
"SNP maf": "snp_maf_%(bin_size)d%(snp_flag)s",
"SNP i1": "snp_i1_%(bin_size)d%(snp_flag)s",
"SNP i1 partition": "snp_i1_%(bin_size)d%(snp_flag)s_partition",
}
def __init__(self, files, dir_name):
"""
Exports CNVpytor data
Parameters
----------
files : path
CNVpytor files path
dir_name: path
Export directory path
"""
self.files = files
self.dir = Path(dir_name)
self.io = [IO(f, ro=True) for f in files]
self.export_dir = self.export_create_dir()
@property
def pytor_names(self):
name_list = []
for filename in self.files:
name_list.append(Path(filename).resolve().stem)
return name_list
@property
def export_directory(self):
if self.dir.is_dir():
if len(self.files) > 1:
# for multiple input file
default_name = self.dir.joinpath("cnvpytor_jbrowse_export")
else:
# for single_input_file
default_name = self.dir.joinpath("jbrowse_{}".format(self.pytor_names[0]))
if default_name.exists():
tmp_name = default_name
i = 1
while default_name.exists():
update_name = "{}({})".format(tmp_name.name, i)
default_name = default_name.with_name(update_name)
i = i+1
return default_name
else:
if self.dir.parent.exists():
return self.dir
else:
_logger.error("Error: incorrect export path: {}".format(self.dir))
exit(0)
def export_create_dir(self):
main_dir = self.export_directory
main_dir.mkdir(parents=True, exist_ok=True)
_logger.info("CNVpytor data exporting for JBrowse view in {}".format(main_dir))
return main_dir
@property
def export_data_dir_list(self):
data_dir = self.export_dir.joinpath("bw")
data_dir.mkdir(parents=True, exist_ok=True)
data_dir_list = []
for root_name in self.pytor_names:
root_data = data_dir.joinpath(root_name)
root_data.mkdir(parents=True, exist_ok=True)
data_dir_list.append(root_data)
return data_dir_list
@property
def export_seq_dir(self):
seq_dir = self.export_dir.joinpath("seq")
seq_dir.mkdir(parents=True, exist_ok=True)
return seq_dir
@property
def export_tracklist_file(self):
track_list = self.export_dir.joinpath("trackList.json")
return track_list
@property
def export_ref_file(self):
ref_file = self.export_seq_dir.joinpath("refSeqs.json")
return ref_file
def signal_name(self, bin_size, signal, flags=0):
if signal in self.signal_dct:
try:
return self.signal_dct[signal] % {"bin_size": bin_size, "rd_flag": Signals().suffix_rd_flag(flags),
"snp_flag": Signals().suffix_snp_flag(flags),
"flag": Signals().suffix_flag(flags)}
except TypeError:
return None
else:
return None
def rd_chr_bin(self, root_io):
chr_bs = root_io.chromosomes_bin_sizes_with_signal("RD")
chrs = {}
bss = []
for c, b in chr_bs:
if c not in chrs:
chrs[c] = []
chrs[c].append(int(b))
if int(b) not in bss:
bss.append(int(b))
return chrs, bss
def snp_chr_bin(self, root_io):
chr_bs = root_io.chromosomes_bin_sizes_with_signal("SNP likelihood", FLAG_USEMASK)
chrs = {}
bss = []
for c, b in chr_bs:
if c not in chrs:
chrs[c] = []
chrs[c].append(int(b))
if int(b) not in bss:
bss.append(int(b))
return chrs, bss
@staticmethod
def create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=None):
wig = None
for (chrom, length) in chr_list:
signal_details = root_io.get_signal(chrom, bin_size, signal_name, flag)
if isinstance(signal_details, np.ndarray):
signal_value_list = signal_details[()]
if offset is not None:
signal_value_list[signal_value_list != 0] += offset
signal_value_list = np.absolute(signal_value_list)
if not isinstance(wig, Wiggle):
wig = Wiggle(bigwig_file)
wig.add_header_list(chr_list)
wig.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
def rd_signal(self):
_logger.debug("Create Read depth related signals")
for root_index, root_io in enumerate(self.io):
_logger.info("JBrowse export: RD related data for {}".format(self.pytor_names[root_index]))
rd_chr, rd_bin = self.rd_chr_bin(root_io)
# get chr list
chr_len = root_io.get_signal(None, None, "chromosome lengths")
chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int)))
for signal_name, signal_dct in self.rd_signal_dct.items():
_logger.info("JBrowse export: RD signal {}".format(signal_name))
for index, flag in enumerate(signal_dct['FLAG']):
for bin_size in rd_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}.bw".format(signal)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_file = str(bigwig_file)
self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag)
def snp_signal(self):
_logger.debug("Create SNP related signals")
for root_index, root_io in enumerate(self.io):
_logger.info("JBrowse export: SNP related data for {}".format(self.pytor_names[root_index]))
snp_chr, snp_bin = self.snp_chr_bin(root_io)
# get chr list
chr_len = root_io.get_signal(None, None, "chromosome lengths")
chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int)))
for signal_name, signal_dct in self.snp_signal_dct.items():
_logger.info("JBrowse export: SNP signal {}".format(signal_name))
for index, flag in enumerate(signal_dct['FLAG']):
if "offset" in signal_dct:
offset = signal_dct['offset'][index]
for bin_size in snp_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}_offset{}.bw".format(signal, offset)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_file = str(bigwig_file)
self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag,
offset=offset)
else:
for bin_size in snp_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}.bw".format(signal)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_file = str(bigwig_file)
self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag)
@staticmethod
def add_config_reference():
track_dct = {'formatVersion': 1, "plugins": ["MultiBigWig", "MultiScaleBigWig"], 'tracks': []}
track_dct['tracks'].append({
"category": "Reference sequence",
"chunkSize": 20000,
"key": "Reference sequence",
"label": "DNA",
"seqType": "dna",
"storeClass": "JBrowse/Store/Sequence/StaticChunked",
"type": "SequenceTrack",
"urlTemplates": "seq/{refseq_dirpath}/{refseq}-"
})
return track_dct
def add_rd_config_track(self):
_logger.debug("Get RD config track")
track_dct_list = []
for root_index, root_io in enumerate(self.io):
rd_chr, rd_bin = self.rd_chr_bin(root_io)
url_template_dct = []
for signal_name, signal_dct in self.rd_signal_dct.items():
if 'FLAG' in signal_dct:
for index, flag in enumerate(signal_dct['FLAG']):
suffix_rd_flag = Signals.suffix_rd_flag(flag)
signal_id = "{}_{}{}".format(self.pytor_names[root_index], signal_name, suffix_rd_flag)
scales = {}
for bin_size in rd_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}.bw".format(signal)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix()
if bigwig_file.exists():
scales[bin_size] = bigwig_current_path
if len(scales) > 0:
url_template_dct.append({
"storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig",
"scales": scales,
"name": signal_id,
"color": signal_dct['color'][index],
})
if len(url_template_dct) > 0:
track_dct = {
"category": self.pytor_names[root_index],
'autoscale': 'local',
"storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig",
"showTooltips": True,
"showLabels": True,
"clickTooltips": True,
"key": "RD",
"label": "RD {}".format(self.pytor_names[root_index]),
"type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot",
'useStdDev': True,
'urlTemplates': url_template_dct
}
track_dct_list.append(track_dct)
return track_dct_list
def add_snp_config_track(self):
_logger.debug("Get SNP config track info")
track_dct_list = []
for root_index, root_io in enumerate(self.io):
snp_url_dct_list = []
snp_chr, snp_bin = self.snp_chr_bin(root_io)
for signal_name, signal_dct in self.snp_signal_dct.items():
for index, flag in enumerate(signal_dct['FLAG']):
suffix_flag = Signals.suffix_snp_flag(flag)
scales = {}
if "offset" in signal_dct:
offset = signal_dct['offset'][index]
signal_id = "{}{}{}".format(signal_name, suffix_flag, offset)
for bin_size in snp_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}_offset{}.bw".format(signal, offset)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix()
if bigwig_file.exists():
scales[bin_size] = bigwig_current_path
else:
signal_id = "{}{}".format(signal_name, suffix_flag)
for bin_size in snp_bin:
signal = self.signal_name(bin_size, signal_name, flag)
bigwig_filename = "{}.bw".format(signal)
bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename)
bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix()
if bigwig_file.exists():
scales[bin_size] = bigwig_current_path
if len(scales) > 0:
snp_url_dct_list.append({
"storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig",
"scales": scales,
"name": signal_id,
"color": signal_dct['color'][index],
"nonCont": signal_dct['nonCont'][index]
})
if len(snp_url_dct_list) > 0:
track_dct = {
"category": self.pytor_names[root_index],
'autoscale': 'local',
"storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig",
"showTooltips": True,
"showLabels": True,
"clickTooltips": True,
"max_score": 1,
"key": "SNP",
"label": "SNP {}".format(self.pytor_names[root_index]),
"type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot",
'urlTemplates': snp_url_dct_list,
}
track_dct_list.append(track_dct)
return track_dct_list
def create_tracklist_json(self):
_logger.debug("Creates config file: {}".format(self.export_tracklist_file))
# reference config
track_dct = self.add_config_reference()
# create rd config
rd_track_list = self.add_rd_config_track()
for rd_track in rd_track_list:
track_dct['tracks'].append(rd_track)
# create SNP config
snp_track_list = self.add_snp_config_track()
for snp_track in snp_track_list:
track_dct['tracks'].append(snp_track)
with open(self.export_tracklist_file, 'w') as f:
json.dump(track_dct, f, indent=2)
return track_dct
def create_reference_json(self):
_logger.debug("Exporting reference details")
# get signal details
chr_len = list(np.array(self.io[0].get_signal(None, None, "chromosome lengths")).astype("str"))
chr_dct = dict(zip(chr_len[::2], chr_len[1::2]))
# create signal list in proper format
chr_dct_list = []
for chr, length in chr_dct.items():
tmp_dct = {"end": length, "length": length, "name": chr, "start": 0}
chr_dct_list.append(tmp_dct)
# save it to file
with open(self.export_ref_file, 'w') as f:
json.dump(chr_dct_list, f, indent=2)
def __del__(self):
_logger.info("JBrowse export: complete")
_logger.info("Copy this directory to jbrowse directory if export path is not set to JBrowse path, "
"To access this via localhost: http://localhost/jbrowse/?data={}"
.format(self.export_directory.parent.name))
Classes
class ExportJBrowse (files, dir_name)
-
Exports CNVpytor data Parameters
files
:path
- CNVpytor files path
dir_name
:path
- Export directory path
Source code
class ExportJBrowse: rd_signal_dct = { "RD": { "FLAG": [0, 0x0010], "color": ["gray", "black"] }, "RD partition": { "FLAG": [0x0010], "color": ["red"] }, "RD call": { "FLAG": [0x0010], "color": ["green"] } } snp_signal_dct = { "SNP baf": { "FLAG": [0x0100], "color": ["gray"], "nonCont": [True], }, "SNP i1": { "FLAG": [0x0100, 0x0100], "color": ["red", "red"], "nonCont": [True, True], "offset": [0.5, -0.5] }, } signal_dct = { "RD": "his_rd_p_%(bin_size)d%(rd_flag)s", "RD partition": "his_rd_p_%(bin_size)d_partition%(rd_flag)s", "RD call": "his_rd_p_%(bin_size)d_partition%(rd_flag)s_merge", "SNP baf": "snp_baf_%(bin_size)d%(snp_flag)s", "SNP maf": "snp_maf_%(bin_size)d%(snp_flag)s", "SNP i1": "snp_i1_%(bin_size)d%(snp_flag)s", "SNP i1 partition": "snp_i1_%(bin_size)d%(snp_flag)s_partition", } def __init__(self, files, dir_name): """ Exports CNVpytor data Parameters ---------- files : path CNVpytor files path dir_name: path Export directory path """ self.files = files self.dir = Path(dir_name) self.io = [IO(f, ro=True) for f in files] self.export_dir = self.export_create_dir() @property def pytor_names(self): name_list = [] for filename in self.files: name_list.append(Path(filename).resolve().stem) return name_list @property def export_directory(self): if self.dir.is_dir(): if len(self.files) > 1: # for multiple input file default_name = self.dir.joinpath("cnvpytor_jbrowse_export") else: # for single_input_file default_name = self.dir.joinpath("jbrowse_{}".format(self.pytor_names[0])) if default_name.exists(): tmp_name = default_name i = 1 while default_name.exists(): update_name = "{}({})".format(tmp_name.name, i) default_name = default_name.with_name(update_name) i = i+1 return default_name else: if self.dir.parent.exists(): return self.dir else: _logger.error("Error: incorrect export path: {}".format(self.dir)) exit(0) def export_create_dir(self): main_dir = self.export_directory main_dir.mkdir(parents=True, exist_ok=True) _logger.info("CNVpytor data exporting for JBrowse view in {}".format(main_dir)) return main_dir @property def export_data_dir_list(self): data_dir = self.export_dir.joinpath("bw") data_dir.mkdir(parents=True, exist_ok=True) data_dir_list = [] for root_name in self.pytor_names: root_data = data_dir.joinpath(root_name) root_data.mkdir(parents=True, exist_ok=True) data_dir_list.append(root_data) return data_dir_list @property def export_seq_dir(self): seq_dir = self.export_dir.joinpath("seq") seq_dir.mkdir(parents=True, exist_ok=True) return seq_dir @property def export_tracklist_file(self): track_list = self.export_dir.joinpath("trackList.json") return track_list @property def export_ref_file(self): ref_file = self.export_seq_dir.joinpath("refSeqs.json") return ref_file def signal_name(self, bin_size, signal, flags=0): if signal in self.signal_dct: try: return self.signal_dct[signal] % {"bin_size": bin_size, "rd_flag": Signals().suffix_rd_flag(flags), "snp_flag": Signals().suffix_snp_flag(flags), "flag": Signals().suffix_flag(flags)} except TypeError: return None else: return None def rd_chr_bin(self, root_io): chr_bs = root_io.chromosomes_bin_sizes_with_signal("RD") chrs = {} bss = [] for c, b in chr_bs: if c not in chrs: chrs[c] = [] chrs[c].append(int(b)) if int(b) not in bss: bss.append(int(b)) return chrs, bss def snp_chr_bin(self, root_io): chr_bs = root_io.chromosomes_bin_sizes_with_signal("SNP likelihood", FLAG_USEMASK) chrs = {} bss = [] for c, b in chr_bs: if c not in chrs: chrs[c] = [] chrs[c].append(int(b)) if int(b) not in bss: bss.append(int(b)) return chrs, bss @staticmethod def create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=None): wig = None for (chrom, length) in chr_list: signal_details = root_io.get_signal(chrom, bin_size, signal_name, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] if offset is not None: signal_value_list[signal_value_list != 0] += offset signal_value_list = np.absolute(signal_value_list) if not isinstance(wig, Wiggle): wig = Wiggle(bigwig_file) wig.add_header_list(chr_list) wig.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size) def rd_signal(self): _logger.debug("Create Read depth related signals") for root_index, root_io in enumerate(self.io): _logger.info("JBrowse export: RD related data for {}".format(self.pytor_names[root_index])) rd_chr, rd_bin = self.rd_chr_bin(root_io) # get chr list chr_len = root_io.get_signal(None, None, "chromosome lengths") chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) for signal_name, signal_dct in self.rd_signal_dct.items(): _logger.info("JBrowse export: RD signal {}".format(signal_name)) for index, flag in enumerate(signal_dct['FLAG']): for bin_size in rd_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag) def snp_signal(self): _logger.debug("Create SNP related signals") for root_index, root_io in enumerate(self.io): _logger.info("JBrowse export: SNP related data for {}".format(self.pytor_names[root_index])) snp_chr, snp_bin = self.snp_chr_bin(root_io) # get chr list chr_len = root_io.get_signal(None, None, "chromosome lengths") chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) for signal_name, signal_dct in self.snp_signal_dct.items(): _logger.info("JBrowse export: SNP signal {}".format(signal_name)) for index, flag in enumerate(signal_dct['FLAG']): if "offset" in signal_dct: offset = signal_dct['offset'][index] for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}_offset{}.bw".format(signal, offset) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=offset) else: for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag) @staticmethod def add_config_reference(): track_dct = {'formatVersion': 1, "plugins": ["MultiBigWig", "MultiScaleBigWig"], 'tracks': []} track_dct['tracks'].append({ "category": "Reference sequence", "chunkSize": 20000, "key": "Reference sequence", "label": "DNA", "seqType": "dna", "storeClass": "JBrowse/Store/Sequence/StaticChunked", "type": "SequenceTrack", "urlTemplates": "seq/{refseq_dirpath}/{refseq}-" }) return track_dct def add_rd_config_track(self): _logger.debug("Get RD config track") track_dct_list = [] for root_index, root_io in enumerate(self.io): rd_chr, rd_bin = self.rd_chr_bin(root_io) url_template_dct = [] for signal_name, signal_dct in self.rd_signal_dct.items(): if 'FLAG' in signal_dct: for index, flag in enumerate(signal_dct['FLAG']): suffix_rd_flag = Signals.suffix_rd_flag(flag) signal_id = "{}_{}{}".format(self.pytor_names[root_index], signal_name, suffix_rd_flag) scales = {} for bin_size in rd_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path if len(scales) > 0: url_template_dct.append({ "storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig", "scales": scales, "name": signal_id, "color": signal_dct['color'][index], }) if len(url_template_dct) > 0: track_dct = { "category": self.pytor_names[root_index], 'autoscale': 'local', "storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig", "showTooltips": True, "showLabels": True, "clickTooltips": True, "key": "RD", "label": "RD {}".format(self.pytor_names[root_index]), "type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot", 'useStdDev': True, 'urlTemplates': url_template_dct } track_dct_list.append(track_dct) return track_dct_list def add_snp_config_track(self): _logger.debug("Get SNP config track info") track_dct_list = [] for root_index, root_io in enumerate(self.io): snp_url_dct_list = [] snp_chr, snp_bin = self.snp_chr_bin(root_io) for signal_name, signal_dct in self.snp_signal_dct.items(): for index, flag in enumerate(signal_dct['FLAG']): suffix_flag = Signals.suffix_snp_flag(flag) scales = {} if "offset" in signal_dct: offset = signal_dct['offset'][index] signal_id = "{}{}{}".format(signal_name, suffix_flag, offset) for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}_offset{}.bw".format(signal, offset) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path else: signal_id = "{}{}".format(signal_name, suffix_flag) for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path if len(scales) > 0: snp_url_dct_list.append({ "storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig", "scales": scales, "name": signal_id, "color": signal_dct['color'][index], "nonCont": signal_dct['nonCont'][index] }) if len(snp_url_dct_list) > 0: track_dct = { "category": self.pytor_names[root_index], 'autoscale': 'local', "storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig", "showTooltips": True, "showLabels": True, "clickTooltips": True, "max_score": 1, "key": "SNP", "label": "SNP {}".format(self.pytor_names[root_index]), "type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot", 'urlTemplates': snp_url_dct_list, } track_dct_list.append(track_dct) return track_dct_list def create_tracklist_json(self): _logger.debug("Creates config file: {}".format(self.export_tracklist_file)) # reference config track_dct = self.add_config_reference() # create rd config rd_track_list = self.add_rd_config_track() for rd_track in rd_track_list: track_dct['tracks'].append(rd_track) # create SNP config snp_track_list = self.add_snp_config_track() for snp_track in snp_track_list: track_dct['tracks'].append(snp_track) with open(self.export_tracklist_file, 'w') as f: json.dump(track_dct, f, indent=2) return track_dct def create_reference_json(self): _logger.debug("Exporting reference details") # get signal details chr_len = list(np.array(self.io[0].get_signal(None, None, "chromosome lengths")).astype("str")) chr_dct = dict(zip(chr_len[::2], chr_len[1::2])) # create signal list in proper format chr_dct_list = [] for chr, length in chr_dct.items(): tmp_dct = {"end": length, "length": length, "name": chr, "start": 0} chr_dct_list.append(tmp_dct) # save it to file with open(self.export_ref_file, 'w') as f: json.dump(chr_dct_list, f, indent=2) def __del__(self): _logger.info("JBrowse export: complete") _logger.info("Copy this directory to jbrowse directory if export path is not set to JBrowse path, " "To access this via localhost: http://localhost/jbrowse/?data={}" .format(self.export_directory.parent.name))
Class variables
var rd_signal_dct
var signal_dct
var snp_signal_dct
Static methods
def add_config_reference()
-
Source code
@staticmethod def add_config_reference(): track_dct = {'formatVersion': 1, "plugins": ["MultiBigWig", "MultiScaleBigWig"], 'tracks': []} track_dct['tracks'].append({ "category": "Reference sequence", "chunkSize": 20000, "key": "Reference sequence", "label": "DNA", "seqType": "dna", "storeClass": "JBrowse/Store/Sequence/StaticChunked", "type": "SequenceTrack", "urlTemplates": "seq/{refseq_dirpath}/{refseq}-" }) return track_dct
def create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=None)
-
Source code
@staticmethod def create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=None): wig = None for (chrom, length) in chr_list: signal_details = root_io.get_signal(chrom, bin_size, signal_name, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] if offset is not None: signal_value_list[signal_value_list != 0] += offset signal_value_list = np.absolute(signal_value_list) if not isinstance(wig, Wiggle): wig = Wiggle(bigwig_file) wig.add_header_list(chr_list) wig.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
Instance variables
var export_data_dir_list
-
Source code
@property def export_data_dir_list(self): data_dir = self.export_dir.joinpath("bw") data_dir.mkdir(parents=True, exist_ok=True) data_dir_list = [] for root_name in self.pytor_names: root_data = data_dir.joinpath(root_name) root_data.mkdir(parents=True, exist_ok=True) data_dir_list.append(root_data) return data_dir_list
var export_directory
-
Source code
@property def export_directory(self): if self.dir.is_dir(): if len(self.files) > 1: # for multiple input file default_name = self.dir.joinpath("cnvpytor_jbrowse_export") else: # for single_input_file default_name = self.dir.joinpath("jbrowse_{}".format(self.pytor_names[0])) if default_name.exists(): tmp_name = default_name i = 1 while default_name.exists(): update_name = "{}({})".format(tmp_name.name, i) default_name = default_name.with_name(update_name) i = i+1 return default_name else: if self.dir.parent.exists(): return self.dir else: _logger.error("Error: incorrect export path: {}".format(self.dir)) exit(0)
var export_ref_file
-
Source code
@property def export_ref_file(self): ref_file = self.export_seq_dir.joinpath("refSeqs.json") return ref_file
var export_seq_dir
-
Source code
@property def export_seq_dir(self): seq_dir = self.export_dir.joinpath("seq") seq_dir.mkdir(parents=True, exist_ok=True) return seq_dir
var export_tracklist_file
-
Source code
@property def export_tracklist_file(self): track_list = self.export_dir.joinpath("trackList.json") return track_list
var pytor_names
-
Source code
@property def pytor_names(self): name_list = [] for filename in self.files: name_list.append(Path(filename).resolve().stem) return name_list
Methods
def add_rd_config_track(self)
-
Source code
def add_rd_config_track(self): _logger.debug("Get RD config track") track_dct_list = [] for root_index, root_io in enumerate(self.io): rd_chr, rd_bin = self.rd_chr_bin(root_io) url_template_dct = [] for signal_name, signal_dct in self.rd_signal_dct.items(): if 'FLAG' in signal_dct: for index, flag in enumerate(signal_dct['FLAG']): suffix_rd_flag = Signals.suffix_rd_flag(flag) signal_id = "{}_{}{}".format(self.pytor_names[root_index], signal_name, suffix_rd_flag) scales = {} for bin_size in rd_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path if len(scales) > 0: url_template_dct.append({ "storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig", "scales": scales, "name": signal_id, "color": signal_dct['color'][index], }) if len(url_template_dct) > 0: track_dct = { "category": self.pytor_names[root_index], 'autoscale': 'local', "storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig", "showTooltips": True, "showLabels": True, "clickTooltips": True, "key": "RD", "label": "RD {}".format(self.pytor_names[root_index]), "type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot", 'useStdDev': True, 'urlTemplates': url_template_dct } track_dct_list.append(track_dct) return track_dct_list
def add_snp_config_track(self)
-
Source code
def add_snp_config_track(self): _logger.debug("Get SNP config track info") track_dct_list = [] for root_index, root_io in enumerate(self.io): snp_url_dct_list = [] snp_chr, snp_bin = self.snp_chr_bin(root_io) for signal_name, signal_dct in self.snp_signal_dct.items(): for index, flag in enumerate(signal_dct['FLAG']): suffix_flag = Signals.suffix_snp_flag(flag) scales = {} if "offset" in signal_dct: offset = signal_dct['offset'][index] signal_id = "{}{}{}".format(signal_name, suffix_flag, offset) for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}_offset{}.bw".format(signal, offset) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path else: signal_id = "{}{}".format(signal_name, suffix_flag) for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_current_path = Path(bigwig_file.parent.parent.name).joinpath(bigwig_file.parent.name, bigwig_file.name).as_posix() if bigwig_file.exists(): scales[bin_size] = bigwig_current_path if len(scales) > 0: snp_url_dct_list.append({ "storeClass": "MultiScaleBigWig/Store/SeqFeature/MultiScaleBigWig", "scales": scales, "name": signal_id, "color": signal_dct['color'][index], "nonCont": signal_dct['nonCont'][index] }) if len(snp_url_dct_list) > 0: track_dct = { "category": self.pytor_names[root_index], 'autoscale': 'local', "storeClass": "MultiBigWig/Store/SeqFeature/MultiBigWig", "showTooltips": True, "showLabels": True, "clickTooltips": True, "max_score": 1, "key": "SNP", "label": "SNP {}".format(self.pytor_names[root_index]), "type": "MultiBigWig/View/Track/MultiWiggle/MultiXYPlot", 'urlTemplates': snp_url_dct_list, } track_dct_list.append(track_dct) return track_dct_list
def create_reference_json(self)
-
Source code
def create_reference_json(self): _logger.debug("Exporting reference details") # get signal details chr_len = list(np.array(self.io[0].get_signal(None, None, "chromosome lengths")).astype("str")) chr_dct = dict(zip(chr_len[::2], chr_len[1::2])) # create signal list in proper format chr_dct_list = [] for chr, length in chr_dct.items(): tmp_dct = {"end": length, "length": length, "name": chr, "start": 0} chr_dct_list.append(tmp_dct) # save it to file with open(self.export_ref_file, 'w') as f: json.dump(chr_dct_list, f, indent=2)
def create_tracklist_json(self)
-
Source code
def create_tracklist_json(self): _logger.debug("Creates config file: {}".format(self.export_tracklist_file)) # reference config track_dct = self.add_config_reference() # create rd config rd_track_list = self.add_rd_config_track() for rd_track in rd_track_list: track_dct['tracks'].append(rd_track) # create SNP config snp_track_list = self.add_snp_config_track() for snp_track in snp_track_list: track_dct['tracks'].append(snp_track) with open(self.export_tracklist_file, 'w') as f: json.dump(track_dct, f, indent=2) return track_dct
def export_create_dir(self)
-
Source code
def export_create_dir(self): main_dir = self.export_directory main_dir.mkdir(parents=True, exist_ok=True) _logger.info("CNVpytor data exporting for JBrowse view in {}".format(main_dir)) return main_dir
def rd_chr_bin(self, root_io)
-
Source code
def rd_chr_bin(self, root_io): chr_bs = root_io.chromosomes_bin_sizes_with_signal("RD") chrs = {} bss = [] for c, b in chr_bs: if c not in chrs: chrs[c] = [] chrs[c].append(int(b)) if int(b) not in bss: bss.append(int(b)) return chrs, bss
def rd_signal(self)
-
Source code
def rd_signal(self): _logger.debug("Create Read depth related signals") for root_index, root_io in enumerate(self.io): _logger.info("JBrowse export: RD related data for {}".format(self.pytor_names[root_index])) rd_chr, rd_bin = self.rd_chr_bin(root_io) # get chr list chr_len = root_io.get_signal(None, None, "chromosome lengths") chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) for signal_name, signal_dct in self.rd_signal_dct.items(): _logger.info("JBrowse export: RD signal {}".format(signal_name)) for index, flag in enumerate(signal_dct['FLAG']): for bin_size in rd_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag)
def signal_name(self, bin_size, signal, flags=0)
-
Source code
def signal_name(self, bin_size, signal, flags=0): if signal in self.signal_dct: try: return self.signal_dct[signal] % {"bin_size": bin_size, "rd_flag": Signals().suffix_rd_flag(flags), "snp_flag": Signals().suffix_snp_flag(flags), "flag": Signals().suffix_flag(flags)} except TypeError: return None else: return None
def snp_chr_bin(self, root_io)
-
Source code
def snp_chr_bin(self, root_io): chr_bs = root_io.chromosomes_bin_sizes_with_signal("SNP likelihood", FLAG_USEMASK) chrs = {} bss = [] for c, b in chr_bs: if c not in chrs: chrs[c] = [] chrs[c].append(int(b)) if int(b) not in bss: bss.append(int(b)) return chrs, bss
def snp_signal(self)
-
Source code
def snp_signal(self): _logger.debug("Create SNP related signals") for root_index, root_io in enumerate(self.io): _logger.info("JBrowse export: SNP related data for {}".format(self.pytor_names[root_index])) snp_chr, snp_bin = self.snp_chr_bin(root_io) # get chr list chr_len = root_io.get_signal(None, None, "chromosome lengths") chr_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) for signal_name, signal_dct in self.snp_signal_dct.items(): _logger.info("JBrowse export: SNP signal {}".format(signal_name)) for index, flag in enumerate(signal_dct['FLAG']): if "offset" in signal_dct: offset = signal_dct['offset'][index] for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}_offset{}.bw".format(signal, offset) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag, offset=offset) else: for bin_size in snp_bin: signal = self.signal_name(bin_size, signal_name, flag) bigwig_filename = "{}.bw".format(signal) bigwig_file = self.export_data_dir_list[root_index].joinpath(bigwig_filename) bigwig_file = str(bigwig_file) self.create_bigwig(root_io, bigwig_file, chr_list, bin_size, signal_name, flag)
class Wiggle (filename)
-
creates bigwig file Parameters
filename
:path
- Path for the bigwig filename
Source code
class Wiggle: def __init__(self, filename): """ creates bigwig file Parameters ---------- filename : path Path for the bigwig filename """ self.filename = filename self.file = None import pyBigWig if not Path(filename).exists(): try: self.file = pyBigWig.open(filename, 'w') except IOError as e: print("Unable to open file {}! Error: {}".format(filename, e)) except RuntimeError as e: print("Unable to open file {}! Error: {}".format(filename, e)) else: self.file = pyBigWig.open(filename) def add_header_list(self, chr_len_list): """ Add header to the bigwig file Parameters ---------- chr_len_list : list of tuple chromosome name and length list. Returns ------- """ self.file.addHeader(chr_len_list) def add_fixedstep(self, chrom, position_int, value_list, span=1, step=1): """ Add fixed step formatted data Parameters ---------- chrom : str chromosome name position_int : int start position value_list : list of values input values span : int step : int Returns ------- """ self.file.addEntries(chrom, position_int, values=value_list, span=span, step=step) def get_cnvpytor_signal(self, md5, chrom, bin_size, signal, flag): signal_details = md5.get_signal(chrom, bin_size, signal, flag) return signal_details def get_chrom_list(self, md5): chr_len = md5.get_signal(None, None, "chromosome lengths") chr_len_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) return chr_len_list def create_wig_offset_transform(self, md5, chr_list, bin_size, signal, flag, offset): # add chr_list to add wig header self.add_header_list(chr_list) # add the data for (chrom, length) in chr_list: signal_details = md5.get_signal(chrom, bin_size, signal, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] signal_value_list[signal_value_list != 0] += offset signal_value_list = np.absolute(signal_value_list) self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size) def create_wig(self, md5, chr_list, bin_size, signal, flag): # add chr_list to add wig header self.add_header_list(chr_list) # add the data for (chrom, length) in chr_list: signal_details = md5.get_signal(chrom, bin_size, signal, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size) def __del__(self): if self.file: self.file.close()
Methods
def add_fixedstep(self, chrom, position_int, value_list, span=1, step=1)
-
Add fixed step formatted data Parameters
chrom
:str
- chromosome name
position_int
:int
- start position
value_list
:list
ofvalues
- input values
span
:int
step
:int
Returns
Source code
def add_fixedstep(self, chrom, position_int, value_list, span=1, step=1): """ Add fixed step formatted data Parameters ---------- chrom : str chromosome name position_int : int start position value_list : list of values input values span : int step : int Returns ------- """ self.file.addEntries(chrom, position_int, values=value_list, span=span, step=step)
def add_header_list(self, chr_len_list)
-
Add header to the bigwig file Parameters
chr_len_list
:list
oftuple
- chromosome name and length list.
Returns
Source code
def add_header_list(self, chr_len_list): """ Add header to the bigwig file Parameters ---------- chr_len_list : list of tuple chromosome name and length list. Returns ------- """ self.file.addHeader(chr_len_list)
def create_wig(self, md5, chr_list, bin_size, signal, flag)
-
Source code
def create_wig(self, md5, chr_list, bin_size, signal, flag): # add chr_list to add wig header self.add_header_list(chr_list) # add the data for (chrom, length) in chr_list: signal_details = md5.get_signal(chrom, bin_size, signal, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
def create_wig_offset_transform(self, md5, chr_list, bin_size, signal, flag, offset)
-
Source code
def create_wig_offset_transform(self, md5, chr_list, bin_size, signal, flag, offset): # add chr_list to add wig header self.add_header_list(chr_list) # add the data for (chrom, length) in chr_list: signal_details = md5.get_signal(chrom, bin_size, signal, flag) if isinstance(signal_details, np.ndarray): signal_value_list = signal_details[()] signal_value_list[signal_value_list != 0] += offset signal_value_list = np.absolute(signal_value_list) self.add_fixedstep(chrom, 0, signal_value_list, span=bin_size, step=bin_size)
def get_chrom_list(self, md5)
-
Source code
def get_chrom_list(self, md5): chr_len = md5.get_signal(None, None, "chromosome lengths") chr_len_list = list(zip(chr_len[::2].astype(str), chr_len[1::2].astype(int))) return chr_len_list
def get_cnvpytor_signal(self, md5, chrom, bin_size, signal, flag)
-
Source code
def get_cnvpytor_signal(self, md5, chrom, bin_size, signal, flag): signal_details = md5.get_signal(chrom, bin_size, signal, flag) return signal_details