Overview

Reflexiv command line options consist of four parts
       spark-submit + [spark-options] + Reflexiv.jar + [Reflexiv-options].
Reflexiv executable file wraps the command and simplifies as
       Reflexiv + [command] + [Spark options] + [Reflexiv options]
where [command] specifies a particular function of the assembler; [Spark options] are parameters for the Spark framework, eg. to configure the Spark cluster; [Reflexiv options] are parameters for Reflexiv.

The examples below compare the two types of input commands:

#Created by rhinempi on 23/12/17.
 #
 #      Reflexiv
 #
 # Copyright (c) 2015-2015
 #      Liren Huang      <huanglr at cebitec.uni-bielefeld.de>
 # 
 # Reflexiv is free software: you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the Free
 # Software Foundation, either version 3 of the License, or (at your option)
 # any later version.
 #
 # This program is distributed in the hope that it will be useful, but WITHOUT
 # ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or
 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 # more detail.
 # 
 # You should have received a copy of the GNU General Public License along
 # with this program. If not, see <http://www.gnu.org/licenses>.


# path
sparkbin="/root/spark/bin"
Reflexivlib="/mnt/software/Reflexiv/lib/"        
      
# spark submit
time $sparkbin/spark-submit \                                      # spark-submit
	--conf "spark.eventLog.enabled=true" \                     # [spark-options]
	--driver-memory 15G \
	--executor-memory 57G \
	--class uni.bielefeld.cmg.Reflexiv.main.Main \
	$Reflexivlib/original-Reflexiv-0.3.jar \                   # Reflexiv.jar
		-fastq /mnt/HMP/stool/part* \                      # [Reflexiv-options]
		-outfile /mnt/Reflexiv/stool-result \
		-kmer 31 \    
                -partition 3200 \
		> Reflexiv.log 2> Reflexiv.err

The second one:

#Created by rhinempi on 06/01/18.
 #
 #      Reflexiv
 #
 # Copyright (c) 2015-2015
 #      Liren Huang      <huanglr at cebitec.uni-bielefeld.de>
 # 
 # Reflexiv is free software: you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the Free
 # Software Foundation, either version 3 of the License, or (at your option)
 # any later version.
 #
 # This program is distributed in the hope that it will be useful, but WITHOUT
 # ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or
 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
 # more detail.
 # 
 # You should have received a copy of the GNU General Public License along
 # with this program. If not, see <http://www.gnu.org/licenses>.
 
# path
Reflexiv_HOME="/vol/ec2-user/Reflexiv"

# Reflexiv command
time $Reflexiv_HOME/bin/reflexiv run \            # Reflexiv + [command]
    --driver-memory 3G \                          # [Spark options]
    --executor-memory 3G \
        -fastq ./example/'paired_dat*.fq.gz' \    # [Reflexiv options]
        -outfile ./example/result \
        -kmer 31
        

Commands and Modules

Reflexiv integrates a few functions to faciliate the assembly processes. These functions are activated by using a Command after the Reflexiv executable file. To see all commands and their functions, simply run reflexiv.
$ ./bin/reflexiv

Reflexiv - on the cloud.
Version: 0.3

Commands:
  run             Run the entire assembly pipeline
  counter         counting Kmer frequency
  reassembler     re-assemble and extend genome fragments

Type each command to view its options, eg. Usage: ./reflexiv run

Spark cluster configuration:
  --spark-conf       Spark cluster configuration file or spark input parameters
  --spark-param      Spark cluster parameters in quotation marks "--driver-memory 4G --executor-memory 16G"
  --spark-help       View spark-submit options. You can include spark`s options directly.

Usage: reflexiv [commands] --spark-conf spark_cluster_default.conf [option...]
       reflexiv [commands] --spark-param "--driver-memory 4G --executor-memory 16G" [option...]
       reflexiv [commands] --driver-memory 4G --executor-memory 16G --executor-cores 2 [option...]

For detailed cluster submission, please refer to scripts located in:
./sbin

As shown above, there is a list of commands. Detailed descriptions are as follows:

Table M-1: Descriptions of all Reflexiv commands
Command Description
run This command runs the standard Reflexiv assembly pipeline, from sequencing data to assembled genomes.
counter This command counts all K-mer frequencies. This result can be used to analyze the genome composition (GC, K-mer distribution etc.). The result can also be used for tuning the downstream assemblies (assemble the genome with different K-mer sizes or different K-mer coverages).
reassembler This command reassembles (extends) a collection of sequence fragments using external NGS data (eg. metagenomic datasets).

☕  Notes
  1. All commands are pointers (not the pointer in the computer programming domain) to a specific main Java class in the program. As shown in the first script above, the --class option passes a main Java class to the Spark framework. Whereas in the second script, the run command assigns the main Java class to Spark.

Spark options

Spark options are the options for the Apache Spark framework. You can find detailed explanations of all options at the Spark configuration page.

Reflexiv identifies these options through parameters starting with two dashes --, just like the options used in Spark. Reflexiv also provide three additional input options for assigning Spark options. See the table below:

Table M-2: Reflexiv input options for assigning Spark options
Option  Description
--spark-conf This option specifies a configuration file as input options for Spark. It has the same function as the --properties-file option used in the spark-submit executable.
--spark-param This option reads a series of commandline options quoted within a quotation mark. eg. --spark-param "--driver-memory 2G --executor-memory 4G" passes two parameters --driver-memory and --executor-memory to the Spark framework.
--spark-help This option invokes spark-submit to print out all available options. This is useful when users want to directly check all the options of Spark.

☕  Notes
  1. In case you are not familiar with Spark, we recommend several important Spark options that users should pay attentions to: --master is important if you want to submit a job in the cluster mode; --driver-memory sets the memory consumption for the main program, e.g. In the Reflexiv-reassembler, driver memory is the memory consumption of building Reflexible Distributed K-mer (RDK) from input fragments; --executor-memory sets the memory for each executor (usually the worker nodes). The memory should be increased accordingly, if users want tocache the input data across cluster nodes.

Reflexiv run options

The run command starts a standard assembly pipeline. The options of the run command mainly consist of input sequencing data (usually a fastq file), a K-mer size and an output result file.

To print out all options, simply type the command:
$ reflexiv run

Reflexiv 16:31:52 Reflexiv main initiating ... 
Reflexiv 16:31:52 interpreting parameters.
Name:
	Reflexiv Main

Options:
  -fastq <input fastq file>                 Input NGS data, fastq file format, four
                                            line per unit
  -fasta <input fasta file>                 Also input NGS data, but in fasta file
                                            format, two line per unit
  -outfile <output file>                    Output assembly result
  -kmer <kmer size>                         Kmer length for reads mapping
  -overlap <kmer overlap>                   Overlap size between two adjacent kmers
  -miniter <minimum iterations>             Minimum iterations for contig
                                            construction
  -maxiter <maximum iterations>             Maximum iterations for contig
                                            construction
  -clipf <clip front nt>                    Clip N number of nucleotides from the
                                            beginning of the reads
  -clipe <clip end nt>                      Clip N number of nucleotides from the end
                                            of the reads
  -cover <minimal kmer coverage>            Minimal coverage to filter low freq kmers
  -maxcov <maximal kmer coverage>           Maximal coverage to filter high freq
                                            kmers
  -error <minimum error correction cover>   Minimum coverage for correcting
                                            sequencing errors. Used for low coverage
                                            sequencing. Should be higher than minimal
                                            kmer coverage -cover
  -minlength <minimal read length>          Minimal read length required for assembly
  -mincontig <minimal contig length>        Minimal contig length to be reported
  -partition <re-partition number>          re generate N number of partitions
  -bubble                                   Set to NOT remove bubbles.
  -cache                                    weather to store data in memory or not
  -version                                  show version information
  -h
  -help                                     print and show this information

Usage:
	run de novo genome assembly : 
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.Main reflexiv.jar [parameters] -fastq input.fq -kmer 31 -outfile output_file
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.Main reflexiv.jar [parameters] -fasta input.txt -kmer 31 -outfile output_file
reflexiv run [spark parameter] [parameters] -fastq input.fq -kmer 31 -outfile output_file
Table M-3: Descriptions of Reflexiv run options
Options Description
-fastq The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
-fasta The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
-outfile Directory of output files.
-kmer The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31.
-overlap When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
-miniter The minimal iterations for contig extention. The program will start checking convergence after the minimal iteration value is reached. As the convergence checking step consumes a certain amount of resource, it is better to set a minimal iterations to avoid unnecessary convergence check. The Default is 20.
-maxiter The maximum iterations for contig extention. In a really rare case where convergence will never be reached (never happened before), the maximum iteration threshold will stop the infinite loop.
-clipf Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read.
-clipe Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read.
-cover The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be.
-maxcov The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers.
-error The maximal coverage for error correction. Error nucleotides with lower coverage will be corrected to the higher coverage nucleotides.
-minlength The minimal length of input reads. The default is the length of the K-mer.
-mincontig The minimal length of the output contigs. The default is 100.
-partition Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below).
-bubble Set to NOT remove bubbles in the assembly. Mainly used for debugging.
-cache Set to cache input data into memory. Default is not.
-version Print out the Reflexiv version.
-help Print the help information.

☕  Notes
  1. The peak memory consuming step of Reflexiv is counting K-mer frequencies. If you do not have enough RAM for a certain dataset, try to increase the number of partitions by setting -partition to a higher value. This option will trade runtime for lower memory consumptions.

Reflexiv counter options

counter allows users to run a seperate K-mer counting step before the assembly. This step can also be replaced by other K-mer counters. Reflexiv can read other K-mer counting result (in tabular format) directly as assembly input.

To list all options, simply type command:
$ reflexiv counter

Reflexiv 17:10:21 Reflexiv KmerCounter initiating ... 
Reflexiv 17:10:21 interpreting parameters.
Name:
	Reflexiv Kmer Counter

Options:
  -fastq <input fastq file>          Input NGS data, fastq file format, four line per
                                     unit
  -fasta <input fasta file>          Also input NGS data, but in fasta file format,
                                     two line per unit
  -outfile <output file>             Output assembly result
  -kmer <kmer size>                  Kmer length for reads mapping
  -overlap <kmer overlap>            Overlap size between two adjacent kmers
  -clipf <clip front nt>             Clip N number of nucleotides from the beginning
                                     of the reads
  -clipe <clip end nt>               Clip N number of nucleotides from the end of the
                                     reads
  -cover <minimal kmer coverage>     Minimal coverage to filter low freq kmers
  -maxcov <maximal kmer coverage>    Maximal coverage to filter high freq kmers
  -minlength <minimal read length>   Minimal read length required for assembly
  -partition <re-partition number>   re generate N number of partitions
  -cache                             weather to store data in memory or not
  -version                           show version information
  -h
  -help                              print and show this information

Usage:
	run K-mer counting : 
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfCounter reflexiv.jar [parameters] -fastq input.fq -kmer 31 -outfile kmerCount.out
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfCounter reflexiv.jar [parameters] -fasta input.txt -kmer 31 -outfile kmerCount.out
reflexiv counter [spark parameter] [parameters] -fastq input.fq -kmer 31 -outfile kmerCount.out
Table M-4: Descriptions of Reflexiv counter options
Options Description
-fastq The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
-fasta The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
-outfile Directory of output files.
-kmer The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31.
-overlap When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
-clipf Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read.
-clipe Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read.
-cover The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be.
-maxcov The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers.
-minlength The minimal length of input reads. The default is the length of the K-mer.
-partition Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below).
-cache Set to cache input data into memory. Default is not.
-version Print out the Reflexiv version.
-help Print the help information.

☕  Notes
  1. This is the most memory consuming step of the entire assemly process. Thus, we recommend users to run a K-mer counting step first. The result can, then, be iteratively used for several downstream assembly attemps with different input parameters (e.g. K-mer length or minimal K-mer coverage).

Reflexiv reassembler options

reassembler is a tool to extend pre-assembled or probe-targeted sequence fragments using other NGS (Next-generation sequencing) datasets. Reflexiv reads the sequence fragments and extracts RDK from the sequence. It also extracts RDK from the NGS datasets and combine the two RDKs into one RDK. After combining two RDKs, Reflexiv assembles and extends the fragments using the standard pipeline. Once the assembly is complete, re-assembled fragments will be extracted and saved in another folder.

To list all options, type command:
$ ./bin/reflexiv reassembler

Reflexiv 15:35:47 Reflexiv ReAssembler initiating ... 
Reflexiv 15:35:47 interpreting parameters.
Name:
	Reflexiv Re-Assembler

Options:
  -fastq <input fastq file>            Input NGS data, fastq file format, four line
                                       per unit
  -kmerc <input Kmer file>             Input counted kmer file, tabular file format
                                       or spark RDD pair text file
  -fasta <input fasta file>            Also input NGS data, but in fasta file format,
                                       two line per unit
  -frag <input pre assemblies>         Input pre-assembled contig fragments to be
                                       extended
  -outfile <output file>               Output assembly result
  -kmer <kmer size>                    Kmer length for reads mapping
  -overlap <kmer overlap>              Overlap size between two adjacent kmers
  -miniter <minimum iterations>        Minimum iterations for contig construction
  -maxiter <maximum iterations>        Maximum iterations for contig construction
  -clipf <clip front nt>               Clip N number of nucleotides from the
                                       beginning of the reads
  -clipe <clip end nt>                 Clip N number of nucleotides from the end of
                                       the reads
  -cover <minimal kmer coverage>       Minimal coverage to filter low freq kmers
  -maxcov <maximal kmer coverage>      Maximal coverage to filter high freq kmers
  -minlength <minimal read length>     Minimal read length required for assembly
  -mincontig <minimal contig length>   Minimal contig length to be reported
  -partition <re-partition number>     re generate N number of partitions
  -bubble                              Set to NOT remove bubbles.
  -cache                               weather to store data in memory or not
  -version                             show version information
  -h
  -help                                print and show this information

Usage:
	run de novo genome Re-assembly : 
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfReAssembler reflexiv.jar [parameters] -fastq input.fq -frag gene_frag.fa -kmer 31 -outfile reassemble.out
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfReAssembler reflexiv.jar [parameters] -fasta input.txt -frag gene_frag.fa -kmer 31 -outfile reassemble.out
reflexiv reassembler [spark parameter] [parameters] -fastq input.fq -frag gene_frag.fa -kmer 31 -outfile reassemble.out

Table M-5: Descriptitons of Reflexiv reassembler options
Options Description
-fastq The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
-kmerc The input sequencing data is either the direct output of counter or files in the tabular format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -kmerc /vol/data/samples/*.kmers.txt.
-fasta The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
-frag The input sequence fragments which will be extended in the fasta format. The files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -frag /vol/data/samples/gene_fragment1.fa,/vol/data/samples/gene_fragment2.fa.
-outfile Directory of output files.
-kmer The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31.
-overlap When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
-miniter The minimal iterations for contig extention. The program will start checking convergence after the minimal iteration value is reached. As the convergence checking step consumes a certain amount of resource, it is better to set a minimal iterations to avoid unnecessary convergence check. The Default is 20.
-maxiter The maximum iterations for contig extention. In a really rare case where convergence will never be reached (never happened before), the maximum iteration threshold will stop the infinite loop.
-clipf Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read.
-clipe Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read.
-cover The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be.
-maxcov The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers.
-minlength The minimal length of input reads. The default is the length of the K-mer.
-mincontig The minimal length of the output contigs. The default is 100.
-partition Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below).
-bubble Set to NOT remove bubbles in the assembly. Mainly used for debugging.
-cache Set to cache input data into memory. Default is not.
-version Print out the Reflexiv version.
-help Print the help information.

☕  Notes
  1. The extended result is in the fasta format. The ID of each contig provides the loci of the extended fragment(s) in the contig. E.g. >ReCon-length:739-begin:455-end:665-NC0130078_102956401|NC0130078_105631461-begin:503-end:731-NC0130078_103544151-34. Here, length denotes the nucleotide number of the assembled contig. begin denotes the start point of the following fragment and end denotes the end position of the fragment. Accordingly, NC0130078_102956401 is the ID of the fragment. | symbol connects two fragments which have identical start and end K-mers (redundancy).

Setup a Spark cluster on Sun Grid Engine (SGE)

To setup a Spark cluster on SGE, we can use scripts located in the ./sbin folder where Spark is installed.

We can, firstly, start a Spark master node at the SGE login node:
sh start-master.sh
Then, a success info should be displayed on you screen:

starting org.apache.spark.deploy.master.Master, logging to $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out

Open the log file to see the master node info:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out

Spark Command: /usr/lib/jvm/java-1.8.0/jre//bin/java -cp $SPARK_HOME/conf/:$SPARK_HOME/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host zorin --port 7077 --webui-port 8080
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/03/10 13:38:36 INFO Master: Started daemon with process name: 12018@zorin
17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for TERM
17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for HUP
17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for INT
17/03/10 13:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/10 13:38:38 INFO SecurityManager: Changing view acls to: huanglr
17/03/10 13:38:38 INFO SecurityManager: Changing modify acls to: huanglr
17/03/10 13:38:38 INFO SecurityManager: Changing view acls groups to: 
17/03/10 13:38:38 INFO SecurityManager: Changing modify acls groups to: 
17/03/10 13:38:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(huanglr); groups with view permissions: Set(); users  with modify permissions: Set(huanglr); groups with modify permissions: Set()
17/03/10 13:38:39 INFO Utils: Successfully started service 'sparkMaster' on port 7077.
17/03/10 13:38:39 INFO Master: Starting Spark master at spark://zorin:7077
17/03/10 13:38:39 INFO Master: Running Spark version 2.0.0
17/03/10 13:38:40 INFO Utils: Successfully started service 'MasterUI' on port 8080.
17/03/10 13:38:40 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://zorin:8080
17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066.
17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE

Now, you can browse the WebUI of the master node via:
http://zorin:8080, where zorin is the hostname of the master node.

Next, we will add worker nodes to the master node. To do that, we use the start-slave.sh script located at the ./sbin folder where Spark is installed.
To submit a slave deamon to the SGE compute node, we can write a script for the SGE submission:

#!/bin/sh

# Set SGE options:
## run the job in the current working directory (where qsub is called)
#$ -cwd
## Specify an architecture
#$ -l arch=lx24-amd64
## Specify multislot pe
#$ -pe multislot 48
## Specify memory for each pe job
#$ -l vf=2G

sh $SPARK_HOME/sbin/start-slave.sh spark://zorin:7077

sleep 100000000

Where spark://zorin:7077 is used for assigning this worker node to the master node we launched before. The code sleep 1000000 is to keep the deamon running on the submitted SGE compute node.

Check the log file again to see if the worker node connection is successful:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out

17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066.
17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE
17/03/10 14:03:58 INFO Master: Registering worker statler:45341 with 48 cores, 250.8 GB RAM

To shutdown worker nodes, simply use qdel command to delete submitted jobs.

☕  Notes
  1. To launch a group of worker nodes, use the start-slaves.sh script located in the ./sbin folder. Before running the start-slaves.sh script, include all IP addresses of the compute nodes to the ./conf/slaves file in $SPARK_HOME.

Setup a Spark cluster on the AWS EC2 cloud

We use Spark-ec2 to setup a Spark cluster on the Amazon EC2 cloud. Detailed instructions can be found at Spark-ec2. Here, we provide an example to setup a 5-node Spark cluster.

To view all options:
$ $SPARK_HOME/ec2/spark-ec2

Usage: spark-ec2 [options] <action> <cluster_name>

<action> can be: launch, destroy, login, stop, start, get-master, reboot-slaves

Options:
  --version             show program's version number and exit
  -h, --help            show this help message and exit
  -s SLAVES, --slaves=SLAVES
                        Number of slaves to launch (default: 1)
  -w WAIT, --wait=WAIT  DEPRECATED (no longer necessary) - Seconds to wait for
                        nodes to start
  -k KEY_PAIR, --key-pair=KEY_PAIR
                        Key pair to use on instances
  -i IDENTITY_FILE, --identity-file=IDENTITY_FILE
                        SSH private key file to use for logging into instances
  -p PROFILE, --profile=PROFILE
                        If you have multiple profiles (AWS or boto config),
                        you can configure additional, named profiles by using
                        this option (default: none)
  -t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE
                        Type of instance to launch (default: m1.large).
                        WARNING: must be 64-bit; small instances won't work
  -m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE
                        Master instance type (leave empty for same as
                        instance-type)
  -r REGION, --region=REGION
                        EC2 region used to launch instances in, or to find
                        them in (default: us-east-1)
  -z ZONE, --zone=ZONE  Availability zone to launch instances in, or 'all' to
                        spread slaves across multiple (an additional $0.01/Gb
                        for bandwidthbetween zones applies) (default: a single
                        zone chosen at random)
  -a AMI, --ami=AMI     Amazon Machine Image ID to use
  -v SPARK_VERSION, --spark-version=SPARK_VERSION
                        Version of Spark to use: 'X.Y.Z' or a specific git
                        hash (default: 1.6.0)
  --spark-git-repo=SPARK_GIT_REPO
                        Github repo from which to checkout supplied commit
                        hash (default: https://github.com/apache/spark)
  --spark-ec2-git-repo=SPARK_EC2_GIT_REPO
                        Github repo from which to checkout spark-ec2 (default:
                        https://github.com/amplab/spark-ec2)
  --spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH
                        Github repo branch of spark-ec2 to use (default:
                        branch-1.5)
  --deploy-root-dir=DEPLOY_ROOT_DIR
                        A directory to copy into / on the first master. Must
                        be absolute. Note that a trailing slash is handled as
                        per rsync: If you omit it, the last directory of the
                        --deploy-root-dir path will be created in / before
                        copying its contents. If you append the trailing
                        slash, the directory is not created and its contents
                        are copied directly into /. (default: none).
  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)
  -D [ADDRESS:]PORT     Use SSH dynamic port forwarding to create a SOCKS
                        proxy at the given local address (for use with login)
  --resume              Resume installation on a previously launched cluster
                        (for debugging)
  --ebs-vol-size=SIZE   Size (in GB) of each EBS volume.
  --ebs-vol-type=EBS_VOL_TYPE
                        EBS volume type (e.g. 'gp2', 'standard').
  --ebs-vol-num=EBS_VOL_NUM
                        Number of EBS volumes to attach to each node as
                        /vol[x]. The volumes will be deleted when the
                        instances terminate. Only possible on EBS-backed AMIs.
                        EBS volumes are only attached if --ebs-vol-size > 0.
                        Only support up to 8 EBS volumes.
  --placement-group=PLACEMENT_GROUP
                        Which placement group to try and launch instances
                        into. Assumes placement group is already created.
  --swap=SWAP           Swap space to set up per node, in MB (default: 1024)
  --spot-price=PRICE    If specified, launch slaves as spot instances with the
                        given maximum price (in dollars)
  --ganglia             Setup Ganglia monitoring on cluster (default: True).
                        NOTE: the Ganglia page will be publicly accessible
  --no-ganglia          Disable Ganglia monitoring for the cluster
  -u USER, --user=USER  The SSH user you want to connect as (default: root)
  --delete-groups       When destroying a cluster, delete the security groups
                        that were created
  --use-existing-master
                        Launch fresh slaves, but use an existing stopped
                        master if possible
  --worker-instances=WORKER_INSTANCES
                        Number of instances per worker: variable
                        SPARK_WORKER_INSTANCES. Not used if YARN is used as
                        Hadoop major version (default: 1)
  --master-opts=MASTER_OPTS
                        Extra options to give to master through
                        SPARK_MASTER_OPTS variable (e.g
                        -Dspark.worker.timeout=180)
  --user-data=USER_DATA
                        Path to a user-data file (most AMIs interpret this as
                        an initialization script)
  --authorized-address=AUTHORIZED_ADDRESS
                        Address to authorize on created security groups
                        (default: 0.0.0.0/0)
  --additional-security-group=ADDITIONAL_SECURITY_GROUP
                        Additional security group to place the machines in
  --additional-tags=ADDITIONAL_TAGS
                        Additional tags to set on the machines; tags are
                        comma-separated, while name and value are colon
                        separated; ex: "Task:MySparkProject,Env:production"
  --copy-aws-credentials
                        Add AWS credentials to hadoop configuration to allow
                        Spark to access S3
  --subnet-id=SUBNET_ID
                        VPC subnet to launch instances in
  --vpc-id=VPC_ID       VPC to launch instances in
  --private-ips         Use private IPs for instances rather than public if
                        VPC/subnet requires that.
  --instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR
                        Whether instances should terminate when shut down or
                        just stop
  --instance-profile-name=INSTANCE_PROFILE_NAME
                        IAM profile name to launch instances under
        

To setup a Spark cluster, we have to specify the master node and the number of worker nodes with options -m and -s. If you want to use spot instances, set the option --spot-price=0.8, where 0.8 is the bid price in dollar.

You also have to set your correspond AWS security Key with -k rhinempi and -i /vol/AWS/Key/rhinempi.pem. Here, rhinempi is the name of my security Key.

Also include your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to your ENV:
$ vi ~/.bash_profile

# --- spark ec2 env --- #
export AWS_ACCESS_KEY_ID="xxxxxxxxxx"
export AWS_SECRET_ACCESS_KEY="xxxxxxxxxxx"

Update your ENV:
source ~/.bash_profile

Here is an example command:

$ $SPARK_HOME/ec2/spark-ec2 -m m1.xlarge -k rhinempi -i /vol/AWS/Key/rhinempi.pem -s 5 --spot-price=0.8 --instance-type=c3.8xlarge --region=eu-west-1 launch Map-All-HMP

Setting up security groups...
Searching for existing cluster Map-All-HMP in region eu-west-1...
Spark AMI: ami-1ae0166d
Launching instances...
Requesting 5 slaves as spot instances with price $0.800
Waiting for spot instances to be granted...
All 5 slaves granted
Launched master in eu-west-1a, regid = r-0cd1d32a495250e99
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state......

To terminate the cluster, use the destroy command of Spark-ec2:

$ $SPARK_HOME/ec2/spark-ec2 --region=eu-west-1 destroy Map-All-HMP

Searching for existing cluster Map-All-HMP in region eu-west-1...
Found 1 master, 5 slaves.
The following instances will be terminated:
> ec2-54-194-240-108.eu-west-1.compute.amazonaws.com
> ec2-54-154-111-11.eu-west-1.compute.amazonaws.com
> ec2-54-154-69-167.eu-west-1.compute.amazonaws.com
> ec2-54-229-5-191.eu-west-1.compute.amazonaws.com
> ec2-54-194-140-87.eu-west-1.compute.amazonaws.com
> ec2-54-154-186-102.eu-west-1.compute.amazonaws.com
ALL DATA ON ALL NODES WILL BE LOST!!
Are you sure you want to destroy the cluster huanglrHMP? (y/N) y
Terminating master...
Terminating slaves...
☕  Notes
  1. For the Spark 2.0.0 version, the spark-ec2 script is not included in the package distribution. To download spark-ec2, use the following link https://github.com/amplab/spark-ec2/archive/branch-2.0.zip.

Back to top