-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbeagle_phased_to_hap_sample.py
More file actions
executable file
·80 lines (64 loc) · 2.5 KB
/
beagle_phased_to_hap_sample.py
File metadata and controls
executable file
·80 lines (64 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python3
import sys
import gzip
import argparse
def _parse_sites(theBeagleSitesFile):
d = {}
with open(theBeagleSitesFile) as f:
for row in f:
tmp = row.rstrip().split()
# pos, ref, alt
d[tmp[0]] = [tmp[1], tmp[2], tmp[3]]
return d
def _output_sample_file(header, theOutPrefix):
"""
header of phased.gz: I id sample1 sample2 ...
"""
samples = header.rstrip().split()[2:]
out = open(f"{theOutPrefix}.samples", "w")
header_of_sample = "ID_1 ID_2 missing\n0 0 0\n"
out.write(header_of_sample)
for s in samples[::2]:
out.write(f"{s} {s} 0\n")
def run(theChr, theBeaglePhasedFile, theBeagleSitesFile, theOutPrefix):
"""
output: hap.gz/samples format of shapeit2 but coustomized for bcftools convert
see:
http://www.htslib.org/doc/1.1/bcftools.html#convert
https://mathgen.stats.ox.ac.uk/genetics_software/shapeit/shapeit.html#hapsample
"""
sites = _parse_sites(theBeagleSitesFile)
out = gzip.open(f"{theOutPrefix}.hap.gz", "wt")
# get sample from header of phased.gz
line = 0
with gzip.open(theBeaglePhasedFile, "rt") as f:
for row in f:
line += 1
if line == 1:
_output_sample_file(row, theOutPrefix)
else:
tmp = row.rstrip().split()
snp = tmp[1]
if sites.get(snp):
d = {sites[snp][1]: "0", sites[snp][2]: "1"}
gts = [d[g] for g in tmp[2:]]
out.write(
f"{theChr}:{sites[snp][0]}_{sites[snp][1]}_{sites[snp][2]} {theChr}:{sites[snp][0]}_{sites[snp][1]}_{sites[snp][2]} {sites[snp][0]} {sites[snp][1]} {sites[snp][2]} {' '.join(gts)}\n"
)
else:
sys.stderr.write("the sites file doesn't match the phased file!")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="convert phased.gz file output by beagle v3.3 to hap/sample file\n"
)
parser.add_argument("chr", metavar="STRING", help="chromosome to use")
parser.add_argument(
"phased", metavar="phased.gz", help="phased.gz file by beagle v3"
)
parser.add_argument("sites", metavar="sites", help="sites file by beagle v3")
parser.add_argument("out", metavar="OUT", help="output prefix")
args = parser.parse_args()
run(args.chr, args.phased, args.sites, args.out)
if __name__ == "__main__":
main()