
In this lecture, I will firstly give a brief introduction to the new parallel computational framework, Apache Spark, and its applications in Biology. The introduction will be mainly focused on Spark`s new data sharing primitive called Resilient Distributed Dataset (RDD) and its advantages compared to the conventional Apache Hadoop platform. In the hands on section, we will deploy our own Spark clusters on the de.NBI cloud and analyse NGS data using Sparkhit, a bioinformatics framework builds on top of the Spark platform. The hands-on practice should facilitate your understanding to the architecture and the underlying mechanism of Spark.

Introductory slides

Hands-on overview and workflow

Download tools and dataset from SWIFT

Directory tree: all file locations

        ├── script
        │   ├── 00.spark-shell.sh
        │   ├── 01.mapping.sh
        │   ├── 02.mapping_docker.sh
        │   ├── 03.report_ear.sh
        │   ├── 04.report_stool.sh
        │   ├── 05.plot.krona.sh
        │   ├── 06.view.sh
        │   └── docker.sh
        ├── software
        │   ├── KronaTools-2.7
        │   └── sparkhit-latest
        ├── dataset
        │   ├── ear
        │   ├── reference.fa
        │   ├── reference.fa.amb
        │   ├── reference.fa.ann
        │   ├── reference.fa.bwt
        │   ├── reference.fa.pac
        │   ├── reference.fa.sa
        │   ├── SRS018978.fastq.gz
        │   └── stool


Public Master IP

The public Master node`s ip address can be used to access the Spark webUI.
Type in:
host myip.opendns.com resolver1.opendns.com

Using domain server:
Name: resolver1.opendns.com

myip.opendns.com has address
Host myip.opendns.com not found: 3(NXDOMAIN)
Host myip.opendns.com not found: 3(NXDOMAIN)

To list your public IP address.
Or check your openstack UI at Project / Network / Routers / sparkhitrouter

Use the master ip to browse the Spark webUI to monitor the cluster and submitted jobs:

Private Master IP

You are gona need your private master ip to submit your Spark jobs.
Type in:

docker0   Link encap:Ethernet  HWaddr 02:42:5a:04:83:25  
          inet addr:  Bcast:  Mask:
          UP BROADCAST MULTICAST  MTU:1500  Metric:1
          RX packets:0 errors:0 dropped:0 overruns:0 frame:0
          TX packets:0 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0 
          RX bytes:0 (0.0 B)  TX bytes:0 (0.0 B)

eth0      Link encap:Ethernet  HWaddr fa:16:3e:b5:a1:a3  
          inet addr:  Bcast:  Mask:
          inet6 addr: fe80::f816:3eff:feb5:a1a3/64 Scope:Link
          RX packets:1700393 errors:0 dropped:0 overruns:0 frame:0
          TX packets:1590152 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:3882456204 (3.8 GB)  TX bytes:11170161352 (11.1 GB)

lo        Link encap:Local Loopback  
          inet addr:  Mask:
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:65536  Metric:1
          RX packets:151909 errors:0 dropped:0 overruns:0 frame:0
          TX packets:151909 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0 
          RX bytes:60357490 (60.3 MB)  TX bytes:60357490 (60.3 MB)

Programming in Spark-shell:

Step 1: RDD loading and caching

Switch to /vol/spool/sparkhit/script directory with command:
cd /vol/spool/sparkhit/script
as working directory. You should be able to see a script 00.spark-shell.sh. Print the script by:
cat 00.spark-shell.sh
These are interactive console commands, please do not run them directly.

spark-shell --driver-memory 30G --executor-memory 30G --executor-cores 30

val fastqRDD =sc.textFile("/vol/spool/sparkhit/dataset/SRS018978.fastq.gz")

# RDD cache function
fastqRDD.cache()    # cache data into memroy, but since RDD is "lazy". Nothing actually happens
val a = fastqRDD.count()    # here "cache" step is now running, thus this step takes more time
val a = fastqRDD.count()
val a = fastqRDD.count()
val a = fastqRDD.count()
val a = fastqRDD.count()

There should be a run time difference between the cache RDD and the uncache RDD

Step 2: RDD repartitioning and distributing

Lets say you wanna write a program that counts all the lines with more than 4 "N" in the fastq file.
cat 00.spark-shell.sh

# RDD data distribution
val reparRDD = fastqRDD.repartition(30) # distributing data
reparRDD.filter(line => line.contains("NNNN")).count()
fastqRDD.filter(line => line.contains("NNNN")).count()
reparRDD.filter(line => line.contains("NNNN")).count()
fastqRDD.filter(line => line.contains("NNNN")).count()

There should be a run time difference between the partitioned RDD and the unpartitioned RDD

Use CTRL + D to quit.

Processing NGS data with Sparkhit:

Step 3: mapping metagenomic data using Sparkhit algorithm

cat 01.mapping.sh

/vol/spool/sparkhit/software/sparkhit-latest/bin/sparkhit mapper \
	--master spark:// \    <----- change IP address. With command: ifconfig
	--driver-memory 15G \
	--executor-memory 20G \
	-fastq "/vol/spool/sparkhit/dataset/ear/SRS*bz2" \
	-reference /vol/spool/sparkhit/dataset/reference.fa \
	-outfile ./01.ear_result \
	-kmer 12

Step 4: using Sparkhit to parallelize Docker tasks (e.g. bwa)

docker run lh3lh3/bwa

Unable to find image 'lh3lh3/bwa:latest' locally
latest: Pulling from lh3lh3/bwa
d56ac91634e2: Pull complete 
a3ed95caeb02: Pull complete 
Digest: sha256:ecb80258bdaebe4d42445eb34adea936c929b3a3439bea154a128939f7cce95d
Status: Downloaded newer image for lh3lh3/bwa:latest

Program: bwa (alignment via Burrows-Wheeler transformation)
Version: 0.7.12-r1044
Contact: Heng Li <lh3@sanger.ac.uk>

Usage:   bwa <command> [options]

Command: index         index sequences in the FASTA format
         mem           BWA-MEM algorithm
         fastmap       identify super-maximal exact matches
         pemerge       merge overlapping paired ends (EXPERIMENTAL)
         aln           gapped/ungapped alignment
         samse         generate alignment (single ended)
         sampe         generate alignment (paired ended)
         bwasw         BWA-SW for long queries

         shm           manage indices in shared memory
         fa2pac        convert FASTA to PAC format
         pac2bwt       generate BWT from PAC
         pac2bwtgen    alternative algorithm for generating BWT
         bwtupdate     update .bwt to the new format
         bwt2sa        generate SA from BWT and Occ

Note: To use BWA, you need to first index the genome with `bwa index'.
      There are three alignment algorithms in BWA: `mem', `bwasw', and
      `aln/samse/sampe'. If you are not sure which to use, try `bwa mem'
      first. Please `man ./bwa.1' for the manual.

cat 02.mapping_docker.sh

/vol/spool/sparkhit/software/sparkhit-latest/bin/sparkhit piper \
	--master spark:// \     <----- change IP address. With command: ifconfig
	--driver-memory 60G \
	--executor-memory 30G \
	-fastq "/vol/spool/sparkhit/dataset/stool/SRS*.tar.bz2" \
	-filter \
	-outfile ./02.stool_result \
	-tool "sh /vol/spool/sparkhit/script/docker.sh"

cat docker.sh

docker run -iv /vol/spool/sparkhit/dataset:/tmp -w /tmp --rm \
    lh3lh3/bwa \
    mem reference.fa /dev/stdin \
    | perl -ne 'my $seq = $_;next if $seq=~/^\@/; my @a=split /\t/,$seq; next if $a[2] eq "\*";print "$seq"'

Step 5: summarize mapping results of the two mapping tools

Sparkhit reporter summerizes tabular files by columns, for instance:
Use command: less ./01.ear_result/part-00000
to view the mapping result.

@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    4.26e-31        88      4       91      +       89.01   NC_000913.3     2726600 2726687
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    2.30e-29        88      4       91      +       87.91   NC_000913.3     3424199 3424286
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    4.26e-31        88      1       88      -       89.01   NC_000913.3     228256  228343
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    4.26e-31        88      1       88      -       89.01   NC_000913.3     4040017 4040104
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    4.26e-31        88      1       88      -       89.01   NC_000913.3     4171138 4171225
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    4.26e-31        88      1       88      -       89.01   NC_000913.3     4212540 4212627
@HWI-EAS324_102408434:3:100:10079:21137/2       91nt    2.30e-29        88      1       88      -       87.91   NC_000913.3     3946201 3946288
@HWI-EAS324_102408434:3:100:10083:8631/2        100nt   4.08e-24        88      9       96      +       77.00   NC_000913.3     224064  224151
@HWI-EAS324_102408434:3:100:10083:8631/2        100nt   4.08e-24        88      9       96      +       77.00   NC_000913.3     3942101 3942188
@HWI-EAS324_102408434:3:100:10083:8631/2        100nt   4.08e-24        88      9       96      +       77.00   NC_000913.3     4035824 4035911
@HWI-EAS324_102408434:3:100:10083:8631/2        100nt   4.08e-24        88      9       96      +       77.00   NC_000913.3     4166952 4167039

cat 03.report_ear.sh

/vol/spool/sparkhit/software/sparkhit-latest/bin/sparkhit reporter \
	-input "./01.ear_result/part*" \
	-word 9 \
	-outfile ./ear.report.txt

cat 04.report_stool.sh

/vol/spool/sparkhit/software/sparkhit-latest/bin/sparkhit reporter \
	--driver-memory 60G \
	--executor-memory 30G \
	-input "./02.stool_result/part*" \
	-word 3 \
	-outfile ./stool.report.txt

Step 6: generate and visualize the mapping results

cat 05.plot.krona.sh

/vol/spool/sparkhit/software/KronaTools-2.7/bin/ktImportText -o HMP.krona.html ear.report.txt stool.report.txt

cat 06.view.sh

cp HMP.krona.html /vol/spool/www/index.html

After generating the Krona plot, go to to see the results.
Remember to change the IP address to your own IP.

Ear sample should have more reads than the Stool sample, as Sparkhit recruiter is more sensitive.