Overview
Reflexiv command line options consist of four parts
spark-submit + [spark-options] + Reflexiv.jar + [Reflexiv-options].
Reflexiv executable file wraps the command and simplifies as
Reflexiv + [command] + [Spark options] + [Reflexiv options]
where [command] specifies a particular function of the assembler; [Spark options] are parameters for the Spark framework, eg. to configure the Spark cluster; [Reflexiv options] are parameters for Reflexiv.
The examples below compare the two types of input commands:
#Created by rhinempi on 23/12/17.
#
# Reflexiv
#
# Copyright (c) 2015-2015
# Liren Huang <huanglr at cebitec.uni-bielefeld.de>
#
# Reflexiv is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more detail.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses>.
# path
sparkbin="/root/spark/bin"
Reflexivlib="/mnt/software/Reflexiv/lib/"
# spark submit
time $sparkbin/spark-submit \ # spark-submit
--conf "spark.eventLog.enabled=true" \ # [spark-options]
--driver-memory 15G \
--executor-memory 57G \
--class uni.bielefeld.cmg.Reflexiv.main.Main \
$Reflexivlib/original-Reflexiv-0.3.jar \ # Reflexiv.jar
-fastq /mnt/HMP/stool/part* \ # [Reflexiv-options]
-outfile /mnt/Reflexiv/stool-result \
-kmer 31 \
-partition 3200 \
> Reflexiv.log 2> Reflexiv.err
The second one:
#Created by rhinempi on 06/01/18.
#
# Reflexiv
#
# Copyright (c) 2015-2015
# Liren Huang <huanglr at cebitec.uni-bielefeld.de>
#
# Reflexiv is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more detail.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses>.
# path
Reflexiv_HOME="/vol/ec2-user/Reflexiv"
# Reflexiv command
time $Reflexiv_HOME/bin/reflexiv run \ # Reflexiv + [command]
--driver-memory 3G \ # [Spark options]
--executor-memory 3G \
-fastq ./example/'paired_dat*.fq.gz' \ # [Reflexiv options]
-outfile ./example/result \
-kmer 31
Commands and Modules
Reflexiv integrates a few functions to faciliate the assembly processes. These functions are activated by using a Command after the Reflexiv executable file. To see all commands and their functions, simply run reflexiv.
$ ./bin/reflexiv
Reflexiv - on the cloud.
Version: 0.3
Commands:
run Run the entire assembly pipeline
counter counting Kmer frequency
reassembler re-assemble and extend genome fragments
Type each command to view its options, eg. Usage: ./reflexiv run
Spark cluster configuration:
--spark-conf Spark cluster configuration file or spark input parameters
--spark-param Spark cluster parameters in quotation marks "--driver-memory 4G --executor-memory 16G"
--spark-help View spark-submit options. You can include spark`s options directly.
Usage: reflexiv [commands] --spark-conf spark_cluster_default.conf [option...]
reflexiv [commands] --spark-param "--driver-memory 4G --executor-memory 16G" [option...]
reflexiv [commands] --driver-memory 4G --executor-memory 16G --executor-cores 2 [option...]
For detailed cluster submission, please refer to scripts located in:
./sbin
As shown above, there is a list of commands. Detailed descriptions are as follows:
| Command | Description |
|---|---|
| run | This command runs the standard Reflexiv assembly pipeline, from sequencing data to assembled genomes. |
| counter | This command counts all K-mer frequencies. This result can be used to analyze the genome composition (GC, K-mer distribution etc.). The result can also be used for tuning the downstream assemblies (assemble the genome with different K-mer sizes or different K-mer coverages). |
| reassembler | This command reassembles (extends) a collection of sequence fragments using external NGS data (eg. metagenomic datasets). |
-
All commands are pointers (not the pointer in the computer programming domain) to a specific main Java class in the program. As shown in the first script above, the
--classoption passes a main Java class to the Spark framework. Whereas in the second script, theruncommand assigns the main Java class to Spark.
Spark options
Spark options are the options for the Apache Spark framework. You can find detailed explanations of all options at the Spark configuration page.
Reflexiv identifies these options through parameters starting with two dashes --, just like the options used in Spark. Reflexiv also provide three additional input options for assigning Spark options. See the table below:
| Option | Description |
|---|---|
| --spark-conf | This option specifies a configuration file as input options for Spark. It has the same function as the --properties-file option used in the spark-submit executable.
|
| --spark-param | This option reads a series of commandline options quoted within a quotation mark. eg. --spark-param "--driver-memory 2G --executor-memory 4G" passes two parameters --driver-memory and --executor-memory to the Spark framework. |
| --spark-help | This option invokes spark-submit to print out all available options. This is useful when users want to directly check all the options of Spark. |
-
In case you are not familiar with Spark, we recommend several important Spark options that users should pay attentions to:
--masteris important if you want to submit a job in the cluster mode;--driver-memorysets the memory consumption for the main program, e.g. In theReflexiv-reassembler, driver memory is the memory consumption of building Reflexible Distributed K-mer (RDK) from input fragments;--executor-memorysets the memory for each executor (usually the worker nodes). The memory should be increased accordingly, if users want tocachethe input data across cluster nodes.
Reflexiv run options
The run command starts a standard assembly pipeline. The options of the run command mainly consist of input sequencing data (usually a fastq file), a K-mer size and an output result file.
To print out all options, simply type the command:
$ reflexiv run
Reflexiv 16:31:52 Reflexiv main initiating ...
Reflexiv 16:31:52 interpreting parameters.
Name:
Reflexiv Main
Options:
-fastq <input fastq file> Input NGS data, fastq file format, four
line per unit
-fasta <input fasta file> Also input NGS data, but in fasta file
format, two line per unit
-outfile <output file> Output assembly result
-kmer <kmer size> Kmer length for reads mapping
-overlap <kmer overlap> Overlap size between two adjacent kmers
-miniter <minimum iterations> Minimum iterations for contig
construction
-maxiter <maximum iterations> Maximum iterations for contig
construction
-clipf <clip front nt> Clip N number of nucleotides from the
beginning of the reads
-clipe <clip end nt> Clip N number of nucleotides from the end
of the reads
-cover <minimal kmer coverage> Minimal coverage to filter low freq kmers
-maxcov <maximal kmer coverage> Maximal coverage to filter high freq
kmers
-error <minimum error correction cover> Minimum coverage for correcting
sequencing errors. Used for low coverage
sequencing. Should be higher than minimal
kmer coverage -cover
-minlength <minimal read length> Minimal read length required for assembly
-mincontig <minimal contig length> Minimal contig length to be reported
-partition <re-partition number> re generate N number of partitions
-bubble Set to NOT remove bubbles.
-cache weather to store data in memory or not
-version show version information
-h
-help print and show this information
Usage:
run de novo genome assembly :
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.Main reflexiv.jar [parameters] -fastq input.fq -kmer 31 -outfile output_file
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.Main reflexiv.jar [parameters] -fasta input.txt -kmer 31 -outfile output_file
reflexiv run [spark parameter] [parameters] -fastq input.fq -kmer 31 -outfile output_file
| Options | Description |
|---|---|
| -fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
|
| -fasta | The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
|
| -outfile | Directory of output files. |
| -kmer | The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31. |
| -overlap | When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
|
| -miniter | The minimal iterations for contig extention. The program will start checking convergence after the minimal iteration value is reached. As the convergence checking step consumes a certain amount of resource, it is better to set a minimal iterations to avoid unnecessary convergence check. The Default is 20. |
| -maxiter | The maximum iterations for contig extention. In a really rare case where convergence will never be reached (never happened before), the maximum iteration threshold will stop the infinite loop. |
| -clipf | Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read. |
| -clipe | Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read. |
| -cover | The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be. |
| -maxcov | The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers. |
| -error | The maximal coverage for error correction. Error nucleotides with lower coverage will be corrected to the higher coverage nucleotides. |
| -minlength | The minimal length of input reads. The default is the length of the K-mer. |
| -mincontig | The minimal length of the output contigs. The default is 100. |
| -partition | Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below). |
| -bubble | Set to NOT remove bubbles in the assembly. Mainly used for debugging. |
| -cache | Set to cache input data into memory. Default is not. |
| -version | Print out the Reflexiv version. |
| -help | Print the help information. |
-
The peak memory consuming step of Reflexiv is counting K-mer frequencies. If you do not have enough RAM for a certain dataset, try to increase the number of partitions by setting
-partitionto a higher value. This option will trade runtime for lower memory consumptions.
Reflexiv counter options
counter allows users to run a seperate K-mer counting step before the assembly. This step can also be replaced by other K-mer counters. Reflexiv can read other K-mer counting result (in tabular format) directly as assembly input.
To list all options, simply type command:
$ reflexiv counter
Reflexiv 17:10:21 Reflexiv KmerCounter initiating ...
Reflexiv 17:10:21 interpreting parameters.
Name:
Reflexiv Kmer Counter
Options:
-fastq <input fastq file> Input NGS data, fastq file format, four line per
unit
-fasta <input fasta file> Also input NGS data, but in fasta file format,
two line per unit
-outfile <output file> Output assembly result
-kmer <kmer size> Kmer length for reads mapping
-overlap <kmer overlap> Overlap size between two adjacent kmers
-clipf <clip front nt> Clip N number of nucleotides from the beginning
of the reads
-clipe <clip end nt> Clip N number of nucleotides from the end of the
reads
-cover <minimal kmer coverage> Minimal coverage to filter low freq kmers
-maxcov <maximal kmer coverage> Maximal coverage to filter high freq kmers
-minlength <minimal read length> Minimal read length required for assembly
-partition <re-partition number> re generate N number of partitions
-cache weather to store data in memory or not
-version show version information
-h
-help print and show this information
Usage:
run K-mer counting :
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfCounter reflexiv.jar [parameters] -fastq input.fq -kmer 31 -outfile kmerCount.out
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfCounter reflexiv.jar [parameters] -fasta input.txt -kmer 31 -outfile kmerCount.out
reflexiv counter [spark parameter] [parameters] -fastq input.fq -kmer 31 -outfile kmerCount.out
| Options | Description |
|---|---|
| -fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
|
| -fasta | The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
|
| -outfile | Directory of output files. |
| -kmer | The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31. |
| -overlap | When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
|
| -clipf | Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read. |
| -clipe | Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read. |
| -cover | The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be. |
| -maxcov | The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers. |
| -minlength | The minimal length of input reads. The default is the length of the K-mer. |
| -partition | Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below). |
| -cache | Set to cache input data into memory. Default is not. |
| -version | Print out the Reflexiv version. |
| -help | Print the help information. |
- This is the most memory consuming step of the entire assemly process. Thus, we recommend users to run a K-mer counting step first. The result can, then, be iteratively used for several downstream assembly attemps with different input parameters (e.g. K-mer length or minimal K-mer coverage).
Reflexiv reassembler options
reassembler is a tool to extend pre-assembled or probe-targeted sequence fragments using other NGS (Next-generation sequencing) datasets. Reflexiv reads the sequence fragments and extracts RDK from the sequence. It also extracts RDK from the NGS datasets and combine the two RDKs into one RDK. After combining two RDKs, Reflexiv assembles and extends the fragments using the standard pipeline. Once the assembly is complete, re-assembled fragments will be extracted and saved in another folder.
To list all options, type command:
$ ./bin/reflexiv reassembler
Reflexiv 15:35:47 Reflexiv ReAssembler initiating ...
Reflexiv 15:35:47 interpreting parameters.
Name:
Reflexiv Re-Assembler
Options:
-fastq <input fastq file> Input NGS data, fastq file format, four line
per unit
-kmerc <input Kmer file> Input counted kmer file, tabular file format
or spark RDD pair text file
-fasta <input fasta file> Also input NGS data, but in fasta file format,
two line per unit
-frag <input pre assemblies> Input pre-assembled contig fragments to be
extended
-outfile <output file> Output assembly result
-kmer <kmer size> Kmer length for reads mapping
-overlap <kmer overlap> Overlap size between two adjacent kmers
-miniter <minimum iterations> Minimum iterations for contig construction
-maxiter <maximum iterations> Maximum iterations for contig construction
-clipf <clip front nt> Clip N number of nucleotides from the
beginning of the reads
-clipe <clip end nt> Clip N number of nucleotides from the end of
the reads
-cover <minimal kmer coverage> Minimal coverage to filter low freq kmers
-maxcov <maximal kmer coverage> Maximal coverage to filter high freq kmers
-minlength <minimal read length> Minimal read length required for assembly
-mincontig <minimal contig length> Minimal contig length to be reported
-partition <re-partition number> re generate N number of partitions
-bubble Set to NOT remove bubbles.
-cache weather to store data in memory or not
-version show version information
-h
-help print and show this information
Usage:
run de novo genome Re-assembly :
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfReAssembler reflexiv.jar [parameters] -fastq input.fq -frag gene_frag.fa -kmer 31 -outfile reassemble.out
spark-submit [spark parameter] --class uni.bielefeld.cmg.reflexiv.main.MainOfReAssembler reflexiv.jar [parameters] -fasta input.txt -frag gene_frag.fa -kmer 31 -outfile reassemble.out
reflexiv reassembler [spark parameter] [parameters] -fastq input.fq -frag gene_frag.fa -kmer 31 -outfile reassemble.out
| Options | Description |
|---|---|
| -fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2.
|
| -kmerc | The input sequencing data is either the direct output of counter or files in the tabular format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -kmerc /vol/data/samples/*.kmers.txt.
|
| -fasta | The input sequencing data is in the fasta format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fasta /vol/data/samples/*.fa.gz.
|
| -frag | The input sequence fragments which will be extended in the fasta format. The files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -frag /vol/data/samples/gene_fragment1.fa,/vol/data/samples/gene_fragment2.fa.
|
| -outfile | Directory of output files. |
| -kmer | The K-mer length for building the Reflexilbe Distributed K-mer (RDK). Default is 31. |
| -overlap | When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. For most cases, the overlap should be K-1 mers.
|
| -miniter | The minimal iterations for contig extention. The program will start checking convergence after the minimal iteration value is reached. As the convergence checking step consumes a certain amount of resource, it is better to set a minimal iterations to avoid unnecessary convergence check. The Default is 20. |
| -maxiter | The maximum iterations for contig extention. In a really rare case where convergence will never be reached (never happened before), the maximum iteration threshold will stop the infinite loop. |
| -clipf | Trim a certain number of nucleotides at the beginning of the sequencing reads. This is useful to clip high error region at the start of an Illumina sequencing read. |
| -clipe | Trim a certain number of nucleotides at the end of the sequencing reads. This is useful to clip high error region at the end of an Illumina sequencing read. |
| -cover | The minimal coverage to filter low frequency K-mers. The higher the coverage is, the more accurate the result will be. On the other hand, the lower the coverage is, the more sensitive the result will be. |
| -maxcov | The maximal coverage to filter high frequency K-mers. In case of sequencing artifacts that produces extreme high cover K-mers, the threshold filtered abnormal high coverage K-mers. |
| -minlength | The minimal length of input reads. The default is the length of the K-mer. |
| -mincontig | The minimal length of the output contigs. The default is 100. |
| -partition | Re-distribute the input data into a number of partitions. Each partition runs a individual task. This is useful when you want to evenly distribute your data for parallel computations (Also see Notes below). |
| -bubble | Set to NOT remove bubbles in the assembly. Mainly used for debugging. |
| -cache | Set to cache input data into memory. Default is not. |
| -version | Print out the Reflexiv version. |
| -help | Print the help information. |
-
The extended result is in the
fastaformat. The ID of each contig provides the loci of the extended fragment(s) in the contig. E.g.>ReCon-length:739-begin:455-end:665-NC0130078_102956401|NC0130078_105631461-begin:503-end:731-NC0130078_103544151-34. Here, length denotes the nucleotide number of the assembled contig. begin denotes the start point of the following fragment and end denotes the end position of the fragment. Accordingly, NC0130078_102956401 is the ID of the fragment.| symbol connects two fragments which have identical start and end K-mers (redundancy).
Setup a Spark cluster on Sun Grid Engine (SGE)
To setup a Spark cluster on SGE, we can use scripts located in the ./sbin folder where Spark is installed.
We can, firstly, start a Spark master node at the SGE login node:
sh start-master.sh
Then, a success info should be displayed on you screen:
starting org.apache.spark.deploy.master.Master, logging to $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
Open the log file to see the master node info:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
Spark Command: /usr/lib/jvm/java-1.8.0/jre//bin/java -cp $SPARK_HOME/conf/:$SPARK_HOME/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host zorin --port 7077 --webui-port 8080 ======================================== Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/03/10 13:38:36 INFO Master: Started daemon with process name: 12018@zorin 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for TERM 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for HUP 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for INT 17/03/10 13:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/10 13:38:38 INFO SecurityManager: Changing view acls to: huanglr 17/03/10 13:38:38 INFO SecurityManager: Changing modify acls to: huanglr 17/03/10 13:38:38 INFO SecurityManager: Changing view acls groups to: 17/03/10 13:38:38 INFO SecurityManager: Changing modify acls groups to: 17/03/10 13:38:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(huanglr); groups with view permissions: Set(); users with modify permissions: Set(huanglr); groups with modify permissions: Set() 17/03/10 13:38:39 INFO Utils: Successfully started service 'sparkMaster' on port 7077. 17/03/10 13:38:39 INFO Master: Starting Spark master at spark://zorin:7077 17/03/10 13:38:39 INFO Master: Running Spark version 2.0.0 17/03/10 13:38:40 INFO Utils: Successfully started service 'MasterUI' on port 8080. 17/03/10 13:38:40 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://zorin:8080 17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066. 17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066 17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE
Now, you can browse the WebUI of the master node via:
http://zorin:8080, where zorin is the hostname of the master node.
Next, we will add worker nodes to the master node. To do that, we use the start-slave.sh script located at the ./sbin folder where Spark is installed.
To submit a slave deamon to the SGE compute node, we can write a script for the SGE submission:
#!/bin/sh # Set SGE options: ## run the job in the current working directory (where qsub is called) #$ -cwd ## Specify an architecture #$ -l arch=lx24-amd64 ## Specify multislot pe #$ -pe multislot 48 ## Specify memory for each pe job #$ -l vf=2G sh $SPARK_HOME/sbin/start-slave.sh spark://zorin:7077 sleep 100000000
Where spark://zorin:7077 is used for assigning this worker node to the master node we launched before. The code sleep 1000000 is to keep the deamon running on the submitted SGE compute node.
Check the log file again to see if the worker node connection is successful:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066. 17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066 17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE 17/03/10 14:03:58 INFO Master: Registering worker statler:45341 with 48 cores, 250.8 GB RAM
To shutdown worker nodes, simply use qdel command to delete submitted jobs.
-
To launch a group of worker nodes, use the
start-slaves.shscript located in the./sbinfolder. Before running thestart-slaves.shscript, include all IP addresses of the compute nodes to the./conf/slavesfile in$SPARK_HOME.
Setup a Spark cluster on the AWS EC2 cloud
We use Spark-ec2 to setup a Spark cluster on the Amazon EC2 cloud. Detailed instructions can be found at Spark-ec2. Here, we provide an example to setup a 5-node Spark cluster.
To view all options:
$ $SPARK_HOME/ec2/spark-ec2
Usage: spark-ec2 [options] <action> <cluster_name>
<action> can be: launch, destroy, login, stop, start, get-master, reboot-slaves
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-s SLAVES, --slaves=SLAVES
Number of slaves to launch (default: 1)
-w WAIT, --wait=WAIT DEPRECATED (no longer necessary) - Seconds to wait for
nodes to start
-k KEY_PAIR, --key-pair=KEY_PAIR
Key pair to use on instances
-i IDENTITY_FILE, --identity-file=IDENTITY_FILE
SSH private key file to use for logging into instances
-p PROFILE, --profile=PROFILE
If you have multiple profiles (AWS or boto config),
you can configure additional, named profiles by using
this option (default: none)
-t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE
Type of instance to launch (default: m1.large).
WARNING: must be 64-bit; small instances won't work
-m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE
Master instance type (leave empty for same as
instance-type)
-r REGION, --region=REGION
EC2 region used to launch instances in, or to find
them in (default: us-east-1)
-z ZONE, --zone=ZONE Availability zone to launch instances in, or 'all' to
spread slaves across multiple (an additional $0.01/Gb
for bandwidthbetween zones applies) (default: a single
zone chosen at random)
-a AMI, --ami=AMI Amazon Machine Image ID to use
-v SPARK_VERSION, --spark-version=SPARK_VERSION
Version of Spark to use: 'X.Y.Z' or a specific git
hash (default: 1.6.0)
--spark-git-repo=SPARK_GIT_REPO
Github repo from which to checkout supplied commit
hash (default: https://github.com/apache/spark)
--spark-ec2-git-repo=SPARK_EC2_GIT_REPO
Github repo from which to checkout spark-ec2 (default:
https://github.com/amplab/spark-ec2)
--spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH
Github repo branch of spark-ec2 to use (default:
branch-1.5)
--deploy-root-dir=DEPLOY_ROOT_DIR
A directory to copy into / on the first master. Must
be absolute. Note that a trailing slash is handled as
per rsync: If you omit it, the last directory of the
--deploy-root-dir path will be created in / before
copying its contents. If you append the trailing
slash, the directory is not created and its contents
are copied directly into /. (default: none).
--hadoop-major-version=HADOOP_MAJOR_VERSION
Major version of Hadoop. Valid options are 1 (Hadoop
1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
1)
-D [ADDRESS:]PORT Use SSH dynamic port forwarding to create a SOCKS
proxy at the given local address (for use with login)
--resume Resume installation on a previously launched cluster
(for debugging)
--ebs-vol-size=SIZE Size (in GB) of each EBS volume.
--ebs-vol-type=EBS_VOL_TYPE
EBS volume type (e.g. 'gp2', 'standard').
--ebs-vol-num=EBS_VOL_NUM
Number of EBS volumes to attach to each node as
/vol[x]. The volumes will be deleted when the
instances terminate. Only possible on EBS-backed AMIs.
EBS volumes are only attached if --ebs-vol-size > 0.
Only support up to 8 EBS volumes.
--placement-group=PLACEMENT_GROUP
Which placement group to try and launch instances
into. Assumes placement group is already created.
--swap=SWAP Swap space to set up per node, in MB (default: 1024)
--spot-price=PRICE If specified, launch slaves as spot instances with the
given maximum price (in dollars)
--ganglia Setup Ganglia monitoring on cluster (default: True).
NOTE: the Ganglia page will be publicly accessible
--no-ganglia Disable Ganglia monitoring for the cluster
-u USER, --user=USER The SSH user you want to connect as (default: root)
--delete-groups When destroying a cluster, delete the security groups
that were created
--use-existing-master
Launch fresh slaves, but use an existing stopped
master if possible
--worker-instances=WORKER_INSTANCES
Number of instances per worker: variable
SPARK_WORKER_INSTANCES. Not used if YARN is used as
Hadoop major version (default: 1)
--master-opts=MASTER_OPTS
Extra options to give to master through
SPARK_MASTER_OPTS variable (e.g
-Dspark.worker.timeout=180)
--user-data=USER_DATA
Path to a user-data file (most AMIs interpret this as
an initialization script)
--authorized-address=AUTHORIZED_ADDRESS
Address to authorize on created security groups
(default: 0.0.0.0/0)
--additional-security-group=ADDITIONAL_SECURITY_GROUP
Additional security group to place the machines in
--additional-tags=ADDITIONAL_TAGS
Additional tags to set on the machines; tags are
comma-separated, while name and value are colon
separated; ex: "Task:MySparkProject,Env:production"
--copy-aws-credentials
Add AWS credentials to hadoop configuration to allow
Spark to access S3
--subnet-id=SUBNET_ID
VPC subnet to launch instances in
--vpc-id=VPC_ID VPC to launch instances in
--private-ips Use private IPs for instances rather than public if
VPC/subnet requires that.
--instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR
Whether instances should terminate when shut down or
just stop
--instance-profile-name=INSTANCE_PROFILE_NAME
IAM profile name to launch instances under
To setup a Spark cluster, we have to specify the master node and the number of worker nodes with options -m and -s. If you want to use spot instances, set the option --spot-price=0.8, where 0.8 is the bid price in dollar.
You also have to set your correspond AWS security Key with -k rhinempi and -i /vol/AWS/Key/rhinempi.pem. Here, rhinempi is the name of my security Key.
Also include your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to your ENV:
$ vi ~/.bash_profile
# --- spark ec2 env --- # export AWS_ACCESS_KEY_ID="xxxxxxxxxx" export AWS_SECRET_ACCESS_KEY="xxxxxxxxxxx"
Update your ENV:
source ~/.bash_profile
$ $SPARK_HOME/ec2/spark-ec2 -m m1.xlarge -k rhinempi -i /vol/AWS/Key/rhinempi.pem -s 5 --spot-price=0.8 --instance-type=c3.8xlarge --region=eu-west-1 launch Map-All-HMP
Setting up security groups... Searching for existing cluster Map-All-HMP in region eu-west-1... Spark AMI: ami-1ae0166d Launching instances... Requesting 5 slaves as spot instances with price $0.800 Waiting for spot instances to be granted... All 5 slaves granted Launched master in eu-west-1a, regid = r-0cd1d32a495250e99 Waiting for AWS to propagate instance metadata... Waiting for cluster to enter 'ssh-ready' state......
To terminate the cluster, use the destroy command of Spark-ec2:
$ $SPARK_HOME/ec2/spark-ec2 --region=eu-west-1 destroy Map-All-HMP
Searching for existing cluster Map-All-HMP in region eu-west-1... Found 1 master, 5 slaves. The following instances will be terminated: > ec2-54-194-240-108.eu-west-1.compute.amazonaws.com > ec2-54-154-111-11.eu-west-1.compute.amazonaws.com > ec2-54-154-69-167.eu-west-1.compute.amazonaws.com > ec2-54-229-5-191.eu-west-1.compute.amazonaws.com > ec2-54-194-140-87.eu-west-1.compute.amazonaws.com > ec2-54-154-186-102.eu-west-1.compute.amazonaws.com ALL DATA ON ALL NODES WILL BE LOST!! Are you sure you want to destroy the cluster huanglrHMP? (y/N) y Terminating master... Terminating slaves...
-
For the Spark 2.0.0 version, the spark-ec2 script is not included in the package distribution. To download
spark-ec2, use the following linkhttps://github.com/amplab/spark-ec2/archive/branch-2.0.zip.