import snakemake
import abc
import re
import os
import pysam
import pyfaidx
import rnftools
[docs]class Source(object):
""" Abstract class for a genome from which read tuples are simulated.
Args:
fasta (str): File name of the genome from which reads are created (FASTA file).
reads_in_tuple (int): Number of reads in each read tuple.
rng_seed (int): Seed for simulator's random number generator.
sequences (set of int or str): FASTA sequences to extract. Sequences can be specified either by their ids, or by their names.
number_of_required_cores (int): Number of cores used by the simulator. This parameter is used to prevent running other threads or programs at the same time.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, fasta, reads_in_tuple, rng_seed, sequences, number_of_required_cores=1):
rnftools.mishmash.add_source(self)
self._rng_seed = rng_seed
self._reads_in_tuple = reads_in_tuple
self._sample = rnftools.mishmash.current_sample()
self._sample.add_source(self)
self.genome_id = len(self._sample.get_sources())
self._number_of_required_cores = number_of_required_cores
self._name = str(self.genome_id).zfill(3)
self._dir = os.path.join(self._sample.get_dir(), self._name)
self._fa0_fn = os.path.abspath(fasta)
self._fa_fn = os.path.abspath(os.path.join(self._dir, "reference.fa"))
self._fai_fn = self._fa_fn + ".fai"
self._seqs = sequences
self._fq_fn = os.path.join(self._dir, "_final_reads.fq")
self.dict_chr_ids = {}
self.dict_chr_lengths = {}
############################################################################
############################################################################
[docs] def get_dir(self):
"""Get working directory.
Returns:
str: Working directory.
"""
return self._dir
[docs] def get_genome_id(self):
"""Get genome ID.
Returns:
int: Genome ID.
"""
return self.genome_id
[docs] def get_reads_in_tuple(self):
"""Get number of entries in a read tuple.
Returns:
int: Number of reads in a read tuple.
"""
return self._reads_in_tuple
[docs] def get_number_of_required_cores(self):
"""Get number of required cores.
Returns:
int: Number of required cores.
"""
return self._number_of_required_cores
[docs] def clean(self):
"""Clean working directory.
"""
rnftools.utils.shell('rm -fR "{}"'.format(self.get_dir()))
############################################################################
############################################################################
[docs] def fa0_fn(self):
"""Get input FASTA file.
Returns:
str: Input FASTA file.
"""
return self._fa0_fn
[docs] def fa_fn(self):
"""Get output FASTA file (with selected chromosomes).
Returns:
str: Output FASTA file.
"""
return self._fa_fn
[docs] def fq_fn(self):
"""Get file name of the output FASTQ file.
Returns:
str: Output FASTQ file
"""
return self._fq_fn
############################################################################
############################################################################
[docs] def get_output(self):
"""Get list of output files (created during simulation).
Returns:
list: List of input files
"""
raise NotImplementedError
############################################################################
############################################################################
[docs] def create_fq(self):
"""Simulate reads.
"""
raise NotImplementedError
[docs] def create_fa(self):
"""Create a FASTA file with extracted sequences.
"""
if self._seqs is None:
os.symlink(self._fa0_fn, self._fa_fn)
else:
in_seqs = pyfaidx.Fasta(self._fa0_fn)
with open(self._fa_fn, "w+") as g:
for seq_desc in self._seqs:
x = in_seqs[seq_desc]
name, seq = x.name, str(x)
g.write(">" + name + "\n")
n = 80
seq_split = "\n".join([seq[i:i + n] for i in range(0, len(seq), n)])
g.write(seq_split + "\n")
[docs] @staticmethod
def recode_sam_reads(
sam_fn,
fastq_rnf_fo,
fai_fo,
genome_id,
number_of_read_tuples=10**9,
simulator_name=None,
allow_unmapped=False,
):
"""Transform a SAM file to RNF-compatible FASTQ.
Args:
sam_fn (str): SAM/BAM file - file name.
fastq_rnf_fo (str): Output FASTQ file - file object.
fai_fo (str): FAI index of the reference genome - file object.
genome_id (int): Genome ID for RNF.
number_of_read_tuples (int): Expected number of read tuples (to set width of read tuple id).
simulator_name (str): Name of the simulator. Used for comment in read tuple name.
allow_unmapped (bool): Allow unmapped reads.
Raises:
NotImplementedError
"""
fai_index = rnftools.utils.FaIdx(fai_fo)
# last_read_tuple_name=[]
read_tuple_id_width = len(format(number_of_read_tuples, 'x'))
fq_creator = rnftools.rnfformat.FqCreator(
fastq_fo=fastq_rnf_fo,
read_tuple_id_width=read_tuple_id_width,
genome_id_width=2,
chr_id_width=fai_index.chr_id_width,
coor_width=fai_index.coor_width,
info_reads_in_tuple=True,
info_simulator=simulator_name,
)
# todo: check if clipping corrections is well implemented
cigar_reg_shift = re.compile("([0-9]+)([MDNP=X])")
# todo: other upac codes
reverse_complement_dict = {
"A": "T",
"T": "A",
"C": "G",
"G": "C",
"N": "N",
}
read_tuple_id = 0
last_read_tuple_name = None
with pysam.AlignmentFile(
sam_fn,
check_header=False,
) as samfile:
for alignment in samfile:
if alignment.query_name != last_read_tuple_name and last_read_tuple_name is not None:
read_tuple_id += 1
last_read_tuple_name = alignment.query_name
if alignment.is_unmapped:
rnftools.utils.error(
"SAM files used for conversion should not contain unaligned segments. "
"This condition is broken by read tuple "
"'{}' in file '{}'.".format(alignment.query_name, sam_fn),
program="RNFtools",
subprogram="MIShmash",
exception=NotImplementedError,
)
if alignment.is_reverse:
direction = "R"
bases = "".join([reverse_complement_dict[nucl] for nucl in alignment.seq[::-1]])
qualities = str(alignment.qual[::-1])
else:
direction = "F"
bases = alignment.seq[:]
qualities = str(alignment.qual[:])
# todo: are chromosomes in bam sorted correctly (the same order as in FASTA)?
if fai_index.dict_chr_ids != {}:
chr_id = fai_index.dict_chr_ids[samfile.getrname(alignment.reference_id)]
else:
chr_id = "0"
left = int(alignment.reference_start) + 1
right = left - 1
for (steps, operation) in cigar_reg_shift.findall(alignment.cigarstring):
right += int(steps)
segment = rnftools.rnfformat.Segment(
genome_id=genome_id,
chr_id=chr_id,
direction=direction,
left=left,
right=right,
)
fq_creator.add_read(
read_tuple_id=read_tuple_id,
bases=bases,
qualities=qualities,
segments=[segment],
)
fq_creator.flush_read_tuple()