Overview
Sparkhit command line options consist of four parts
spark-submit + [spark-options] + Sparkhit.jar + [sparkhit-options].
Sparkhit executable file wraps the command and simplifies as
sparkhit + [command] + [Spark options] + [Sparkhit options]
where [command] specifies a particular application (tool) or a functional module. [Spark options] are parameters for the Spark framework, eg. to configure the Spark cluster. [Sparkhit options] are parameters for the Sparkhit applications.
The examples below compare the two types of input commands:
#Created by rhinempi on 23/01/16. # # SparkHit # # Copyright (c) 2015-2015 # Liren Huang <huanglr at cebitec.uni-bielefeld.de> # # SparkHit is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more detail. # # You should have received a copy of the GNU General Public License along # with this program. If not, see <http://www.gnu.org/licenses>. # path sparkbin="/root/spark/bin" sparkhitlib="/mnt/software/sparkhit/lib/" # spark submit $sparkbin/spark-submit \ # spark-submit --conf "spark.eventLog.enabled=true" \ # [spark-options] --driver-memory 15G \ --executor-memory 57G \ --class uni.bielefeld.cmg.sparkhit.main.Main \ $sparkhitlib/original-sparkhit-0.8.jar \ # Sparkhit.jar -line /mnt/HMP/tongue_dorsum3/part* \ # [sparkhit-options] -reference /mnt/reference/reference.fa3 \ -outfile /mnt/sparkhit/allmappedfasttongue3 \ -global 1 -kmer 12 -partition 3200 \ > sparkhit.log 2> sparkhit.err
The second one:
#Created by rhinempi on 16/10/16. # # SparkHit # # Copyright (c) 2015-2015 # Liren Huang <huanglr at cebitec.uni-bielefeld.de> # # SparkHit is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; Without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more detail. # # You should have received a copy of the GNU General Public License along # with this program. If not, see <http://www.gnu.org/licenses>. # path SPARKHIT_HOME="/vol/ec2-user/sparkhit" # sparkhit command $SPARKHIT_HOME/bin/sparkhit recruiter \ # sparkhit + [command] --driver-memory 2G \ # [Spark options] --executor-memory 4G \ -fastq ./example/Stool-SRS016203.fq.gz \ # [Sparkhit options] -reference ./example/Ecoli.fa \ -outfile ./example/stool-result
Commands and Modules
Sparkhit integrates a variety of methods and tools that can be used for different genomic analyses. These functions are activated by using a Command after the Sparkhit executable file. To see all commands and their functions, simply run sparkhit
.
$ ./bin/sparkhit
sparkhit - on the cloud. Version: 1.0 Commands: recruiter Fragment recruitment mapper NGS short read mapping reporter Summarize recruitment result piper Send data to external tools, eg. bwa, bowtie2 and fr-hit parallelizer Parallel a task to each worker node cluster Run cluster to a table tester Run Chi-square test converter Convert different file format: fastq, fasta or line based correlationer Run Correlation test decompresser Parallel decompression to splitable compressed files, eg. bzip2 reductioner Run Principle component analysis regressioner Run logistic regression statisticer Run Hardy–Weinberg Equilibrium variationer Genotype with samtools mpileup Type each command to view its options, eg. Usage: ./sparkhit mapper Spark cluster configuration: --spark-conf Spark cluster configuration file or spark input parameters --spark-param Spark cluster parameters in quotation marks "--driver-memory 4G --executor-memory 16G" --spark-help View spark-submit options. You can include spark`s options directly. Usage: sparkhit [commands] --spark-conf spark_cluster_default.conf [option...] sparkhit [commands] --spark-param "--driver-memory 4G --executor-memory 16G" [option...] sparkhit [commands] --driver-memory 4G --executor-memory 16G --executor-cores 2 [option...] For detailed cluster submission, please refer to scripts located in: /vol/cluster-data/huanglr/sparkhit/rhinempi-sparkhit-769845e/sbin
As shown above, there is a list of commands. Detailed descriptions are as follows:
Command | Description |
---|---|
recruiter | This command starts the Sparkhit-recuiter, a fragment recruitment tool which implements a more tolerant algorithm (comparing to the Sparkhit-mapper) that allows more mismatches during the sequence alignment process. |
mapper | This command starts the Sparkhit-mapper, a short read mapping tool which implements a less tolerant algorithm (comparing to the Sparkhit-recruiter) that allows less mismatches during the sequence alignment process, but improves its runtime performance. |
reporter | This command starts the Sparkhit-reporter which applies a 'reduceByKey' funtion to summarize mapping results of Sparkhit-recruiter or Sparkhit-mapper. |
piper | This command starts the Sparkhit-piper. It can be used to invoke external tools and scripts in a Spark cluster. It manages the data distribution and sends the data to invoked tools via the standard input stream (like a pipe in Linux). The standard output of invoked tools will be send back to Spark. |
parallelizer | This command starts the Sparkhit-parallelizer. It parallelizes scripts or linux commands and distributes them to each Spark worker node. Different from the Sparkhit-piper, this operation does not have to load and distribute input data. This function could be useful when users want an operation running simultaneously on each node, eg. download reference genomes to a cluster without a shared file system. |
cluster | This command starts the Sparkhit-cluster. It applies clustering to a matrix or a VCF file. |
tester | This command starts the Sparkhit-tester. It applies the chi-square test to a matrix or a VCF file. |
converter | This command starts the Sparkhit-converter. It converts different file formats in a distributed way. This is useful when some tools are only compatible with a specific file format, e.g. only the fasta file. |
correlationer | This command starts the Sparkhit-correlationer. It applies the Pearson correlation test to two lists of data points. |
decompresser | This command starts the Sparkhit-decompresser. It parallelizes decompression processes to a "splitable" compressed file such as a Bzip2 file. (Spark parallel decompression is not thread safe at the 1.6.0 version, not sure if it is fixed by now. Alternatively users can use Sparkhit` Hadoop-based decompresser (Sparkhit-spadoop) located in the ./lib folder). |
reductioner | This command starts the Sparkhit-reductioner. It applies Principle component analysis (dimension reduction) to a high dimensional dataset |
regressioner | This command starts the Sparkhit-regressioner. It applies logistic regression to a matrix or VCF file. |
statisticer | This command starts the Sparkhit-statisticer. It applies the Hardy–Weinberg Equilibrium (HWE) test to a matrix of genotypes. |
variationer | This command starts the Sparkhit-variationer. It invokes Samtools-mpileup for variant detection. |
-
All commands are pointers (not the pointer in the computer programming domain) to a specific main Java class in the program. As shown in the first script above, the
--class
option passes a main Java class to the Spark framework. Whereas in the second script, themapper
command assigns the main Java class to Spark.
Spark options
Spark options are the options for the Apache Spark framework. You can find detailed explanations of all options at the Spark configuration page.
Sparkhit identifies these options through parameters starting with two dashes --
, just like the options used in Spark. Sparkhit also provide three additional input options for assigning Spark options. See the table below:
Option | Description |
---|---|
--spark-conf | This option specifies a configuration file as input options for Spark. It has the same function as the --properties-file option used in the spark-submit executable.
|
--spark-param | This option reads a series of commandline options quoted within a quotation mark. eg. --spark-param "--driver-memory 2G --executor-memory 4G" passes two parameters --driver-memory and --executor-memory to the Spark framework. |
--spark-help | This option invokes spark-submit to print out all available options. This is useful when users want to directly check all the options of Spark. |
-
In case you are not familiar with Spark, we recommend several important Spark options that users should pay attentions to:
--master
is important if you want to submit a job in the cluster mode;--driver-memory
sets the memory consumption for the main program, e.g. In thesparkhit-recruiter
, driver memory is the memory consumption of buiding the reference index;--executor-memory
sets the memory for each executor (usually the worker nodes). The memory should be increased accordingly, if users want tocache
the input data across cluster nodes.
Sparkhit recruiter options
Sparkhit recruiter is a fragment recruitment tool and builds on top of the Apache Spark platform. The options of Sparkhit recruiter mainly consist of input sequencing data (usually a fastq file), input reference genomes and an output result file.
Fragment recruitment allows more mismatches during the alignment between sequencing reads and reference genomes. This funtion is useful in situations like evaluating the qualities of metagenomic assemblies, where bad quality matches are also needed for the evaluation.
To print out all options, simply type the command:
$ sparkhit recruiter
SparkHit 15:32:41 SparkHit main initiating ... SparkHit 15:32:41 interpreting parameters. Name: SparkHit recruiter Options: -fastq <input fastq file> Input Next Generation Sequencing (NGS) data, fastq file format, four line per unit -line <input line file> Input NGS data, line based text file format, one line per unit -tag Set to tag filename to sequence id. It is useful when you are processing lots of samples at the same time -reference <input reference> Input genome reference file, usually fasta format file, as input file -outfile <output file> Output line based file in text format -kmer <kmer size> Kmer length for reads mapping -evalue <e-value> e-value threshold, default 10 -global <global or not> Use global alignment or not. 0 for local, 1 for global, default 0 -unmask <unmask> whether mask repeats of lower case nucleotides: 1: yes; 0 :no; default=1 -overlap <kmer overlap> small overlap for long read -identity <identity threshold> minimal identity for recruiting a read, default 75 (sensitive mode, fast mode starts from 94) -coverage <coverage threshold> minimal coverage for recruiting a read, default 30 -minlength <minimal read length> minimal read length required for processing -attempts <number attempts> maximum number of alignment attempts for one read to a block, default 20 -hits <hit number> how many hits for output: 0:all; N: top N hits -strand <strand +/-> -thread <number of threads> How many threads to use for parallelizing processes,default is 1 cpu. set to 0 is the number of cpus available!local mode only, for Spark version, use spark parameter! -partition <re-partition number> re generate number of partitions for .gz data, as .gz data only have one partition (spark parallelization) -version show version information -help print and show this information -h Usage: run fragment recruitment : spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.Main sparkhit.jar [parameters] -fastq query.fq -reference reference.fa -outfile output_file.txt spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.Main sparkhit.jar [parameters] -line query.txt -reference reference.fa -outfile output_file.txt sparkhit [command] [spark parameter] [parameters] -fastq query.fq -reference reference.fa -outfile output_file
Options | Description |
---|---|
-fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2 .
|
-line | The input sequencing data is a fastq file in the line-based format. This line-based sequecing data is useful when users have uncompressed files distributed across different nodes. In this case, a fastq unit is intact on one worker node. See below Notes. |
-tag | Whether to tag the file name to each sequence or not. This tag function can mark each sequence of the intermediate mapping result. This information is useful when users have a list of samples input in the same time and would like to be summarized seperately. This option will increase the size of output file. |
-reference | Input file of the reference genome in the fasta format. Make sure it is located in a shared file system or downloaded to each worker node. |
-outfile | Directory of output files. |
-kmer | The K-mer length for building the reference index, 8-12 is recommanded. Default is 12. |
-evalue | Set a maximum E-value as a filter to remove mappings higher than the threshold. |
-global | Global alignment or local alignment. Set to 0 as local alignment, to 1 as global alignment. The default is 0. |
-unmask | Whether to mask repeats in the reference genome or not. 0 for not mask; 1 for mask. The default is 1. |
-overlap | When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. The larger the overlap size is, the more sensitive the mapping result will be. |
-identity | The mapping identity between the sequencing read and the reference genome. The identity is calculated by dividing perfectly matched nuclieotides to the length of the sequecing read. |
-coverage | The minimal coverage to recruit a read. Default is 30. |
-minlength | The minmal length to recruit a read. |
-attempts | Sparkhit recruiter trys to map each sequence to a collection of q-Grams (a block of sequence that is very likely to be recuited). This parameter sets a number of failed attempts to map to q-Grams. Default is 20 |
-hits | How many hits to be reported in the result for each sequencing read. 0 for report all possible matches. The default is 0. |
-strand | Strands selection. 0 is for mapping to both strands of reference genome. 1 for mapping to only "+" strand. 2 for mapping to only "-". The Default is 0. |
-thread | How many threads to be used in parallel. This parameter is deprecated. Check the -partition .
|
-partition | Re-distribute the input data into a number of partitions. Each partition runs a individual mapping task. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
- Different from the standard short read alignment, fragment recruitment allows more mismatches during the alignment processes between sequencing reads and reference genomes. This function is useful in situations like evaluating the qualities of metagenomic assemblies, where bad quality matches are also needed for the evaluation.
- As most of the distributed storage systems manage the data distribution automatically (eg. Hadoop distributed file system (HDFS)), uncompressed text files are usually splitted and identified by lines. Fastq files consist of 4-line-units: an ID header line starts with "@", a raw sequence line, an identifier starts with "+" and a sequecing quality line. Most tools require the complete information of a fastq unit for their algorithms. So, after decompression, Sparkhit changes a fastq file to a tabular file where each line is a complete unit consists of the four elements space out by "tab".
Fastq file: @hiseq2000 ATCGGCTAATCGGCTAATCGGCTA + iiiibbbibibibibibbbbiiib Line file: @hiseq2000 ATCGGCTAATCGGCTAATCGGCTA + iiiibbbibibibibibbbbiiib
Sparkhit mapper options
Sparkhit mapper is a short read aligner built on top of the Apache Spark platform. The options for Sparkhit mapper are similar to standard alignment tools: an input file of sequencing data (usually a fastq file); An input reference genome; An output result file.
Different from the Sparkhit-recruiter, the mapper runs much faster than the recruiter by using the Pigeonhole principle as a more strict filter. Thus, Sparkhit-mapper is recommanded in the situation where only high quality matches (more than 94% identity) are needed for the analysis.
To list all options, simply type command:
$ sparkhit mapper
SparkHit 15:32:41 SparkHit main initiating ... SparkHit 15:32:41 interpreting parameters. Name: SparkHit mapper Options: -fastq <input fastq file> Input Next Generation Sequencing (NGS) data, fastq file format, four line per unit -line <input line file> Input NGS data, line based text file format, one line per unit -tag Set to tag filename to sequence id. It is useful when you are processing lots of samples at the same time -reference <input reference> Input genome reference file, usually fasta format file, as input file -outfile <output file> Output line based file in text format -kmer <kmer size> Kmer length for reads mapping -evalue <e-value> e-value threshold, default 10 -global <global or not> Use global alignment or not. 0 for local, 1 for global, default 0 -unmask <unmask> whether mask repeats of lower case nucleotides: 1: yes; 0 :no; default=1 -overlap <kmer overlap> small overlap for long read -identity <identity threshold> minimal identity for recruiting a read, default 75 (sensitive mode, fast mode starts from 94) -coverage <coverage threshold> minimal coverage for recruiting a read, default 30 -minlength <minimal read length> minimal read length required for processing -attempts <number attempts> maximum number of alignment attempts for one read to a block, default 20 -hits <hit number> how many hits for output: 0:all; N: top N hits -strand <strand +/-> -thread <number of threads> How many threads to use for parallelizing processes,default is 1 cpu. set to 0 is the number of cpus available!local mode only, for Spark version, use spark parameter! -partition <re-partition number> re generate number of partitions for .gz data, as .gz data only have one partition (spark parallelization) -version show version information -help print and show this information -h Usage: run short read mapping : spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfMapper sparkhit.jar [parameters] -fastq query.fq -reference reference.fa -outfile output_file.txt spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfMapper sparkhit.jar [parameters] -line query.txt -reference reference.fa -outfile output_file.txt sparkhit [command] [spark parameter] [parameters] -fastq query.fq -reference reference.fa -outfile output_file
Options | Description |
---|---|
-fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2 .
|
-line | The input sequencing data is a fastq file in the line-based format. This line-based sequecing data is useful when users have uncompressed files distributed across different nodes. In this case, a fastq unit is intact on one worker node. See below Notes. |
-tag | Whether to tag the file name to each sequence or not. This tag function can mark each sequence of the intermediate mapping result. This information is useful when users have a list of samples input in the same time and would like to be summarized seperately. This option will increase the size of output file. |
-reference | Input file of the reference genome in the fasta format. Make sure it is located in a shared file system or downloaded to each worker node. |
-outfile | Directory of output files. |
-kmer | The K-mer length for building the reference index, 8-12 is recommanded. Default is 12. |
-evalue | Set a maximum E-value as a filter to remove mappings higher than the threshold. |
-global | Global alignment or local alignment. Set to 0 as local alignment, to 1 as global alignment. The default is 0. |
-unmask | Whether to mask repeats in the reference genome or not. 0 for not mask; 1 for mask. The default is 1. |
-overlap | When extracting k-mers from the reference genome, there will be a shift between k-mers. The k-mer length minus the shifted length is the overlap length. The larger the overlap size is, the more sensitive the mapping result will be. |
-identity | The mapping identity between the sequencing read and the reference genome. The identity is calculated by dividing perfectly matched nuclieotides to the length of the sequecing read. |
-coverage | The minimal coverage to recruit a read. Default is 30. |
-minlength | The minmal length to recruit a read. |
-attempts | Sparkhit mapper trys to map each sequence to a collection of q-Grams (a block of sequence that is very likely to be recuited). This parameter sets a number of failed attempts to map to q-Grams. Default is 20 |
-hits | How many hits to be reported in the result for each sequencing read. 0 for report all possible matches. The default is 0. |
-strand | Strands selection. 0 is for mapping to both strands of reference genome. 1 for mapping to only "+" strand. 2 for mapping to only "-". The Default is 0. |
-thread | How many threads to be used in parallel. This parameter is deprecated. Check the -partition .
|
-partition | Re-distribute the input data into a number of partitions. Each partition runs a individual mapping task. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
- Different from the Sparkhit-recruiter, the mapper enables a faster sequence alignment by implementing the pigeonhole principle. While increasing the mapping speed, Sparkhit-mapper is less tolerant on mismatches during the alignment, due to the nature of the pigeonhole principle. Thus, when users only need good quality matches (more than 94% identity), Sparkhit-mapper is recommanded.
Sparkhit reporter options
Sparkhit reporter is a tool to summarize the mapping result of sparkhit mapper
. Like standard word count applictions, it applies a simple Map-Reduce computation to summarize the frequency of each element. In our case, it is the mapped read number of each chromosome/contig/scaffold of the reference genomes.
To list all options, type command:
$ ./bin/sparkhit reporter
SparkHit 11:48:21 SparkHit Reporter initiating ... SparkHit 11:48:21 interpreting parameters. Name: SparkHit Reporter Options: -input <input sparkhit result file> Input spark hit result file in tabular format. Accept wild card, s3n schema, hdfs schema -word <columns for identifier> a list of column number used to represent a category you want to summarize. eg, 1,3,8 means counting column 1 (chr), column 3 (strain), column 8 (identity) -count <column for count number> the number of column value which will be used to aggregate for the report. set to 0 the value will be 1 for every identifier -outfile <output report file> Output report file in text format -partition <re-partition number> re generate number of partitions for .gz data, as .gz data only have one partition (spark parallelization) -version show version information -help print and show this information -h Usage: Report mapping summary spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfReporter Sparkhit.jar [parameters] -input ./sparkhit.out -outfile ./sparkhit.report sparkhit reporter [spark parameter] [parameters] -input ./sparkhit.out -outfile ./sparkhit.report
Options | Description |
---|---|
-input | Input files of the fragment recruitment result in the tabular format. |
-word | A list of column numbers used to represent a category you want to summarize. See Notes for details. |
-count | Users can specify a column as the count number. This is useful when the tabular format has one column already stating the number of matches. By default, it is always one read per line. |
-outfile | The path of the output result file, tabular format. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Display the Sparkhit version. |
-help | Print the help information. |
-
The reporter usually summarizes the mapped read number of a certain chromosome/contig/scaffold above a certain identity, eg. how many reads mapped to
NM_086618
with more than 98% identity. Some times users might want to make some changes, eg. how many reads map to theNM_086618
"+" strand with more than 97% identity. We offer a flexible option for the summarization, the-word
option. It allows you to specify the identifier with a combination of elements by selecting a list of columns. Each line of the input tabular format file is a hit of a sequencing read. By choosing and combinig several columns, users can assemble their identifier as a key word for thesummarization (like a MapReduce WordCount).
Sparkhit piper options
Sparkhit piper is a functional module for invoking and parallelizing external tools and scripts at the "Map" step. The "Map" step in MapReduce is usually an independent process. This process can also be carried out by an external tool. As for Sparkhit, it loads sequencing data to an RDD and sends the RDD data to the external tool via STDIN. Invoked tool processes the input data as programmed and sends the result back to STDOUT (a new RDD).
The command line of Sparkhit piper consists of seven parts: sparkhit piper --spark-param [input data] [external tool dependencies][external tool path] [external tool options] [output]
. Here in the command line, [external tool dependencies], [external tools path] and [external tool options] are combined into one command that will be executed in the worker nodes independently, eg. :
-tooldepend "/usr/bin/java/1.7.0/jre/bin/java -jar"
-tool /home/ec2-user/javaMapper/javaMapper.jar
-toolparam "-input /dev/stdin -reference /home/ec2-user/reference/hg19.fa -output /dev/stdout"
will be a combined command:
$ /usr/bin/java/1.7.0/jre/bin/java -jar /home/ec2-user/javaMapper/javaMapper.jar -input /dev/stdin -reference /home/ec2-user/reference/hg19.fa -output /dev/stdout
running on worker nodes as a Linux command. However, the input and output have been redirected to the standard input and the standard output, where the data streams are managed by Sparkhit.
To list all options, type command:
$ ./bin/sparkhit piper
SparkHit 11:34:24 SparkHit ScriptPiper initiating ... SparkHit 11:34:24 interpreting parameters. Name: SparkHit ScriptPiper (bwa, bowtie2 or other aligner) Options: -fastq <input fastq file> Input spark hit result file in tabular format. Accept wild card, s3n schema, hdfs schema -line <input line file> Input NGS data, line based text file format, one line per unit -tag Set to tag filename to sequence id. It is useful when you are processing lots of samples at the same time -filter Weather to filter input fastq file or not, default not (big data with small error, who knows) -tofasta Convert input fastq file to fasta before sending to external tool to process -linetofa Convert input line file to fasta before sending to external tool to process -tool <external tool path> Path to an external tool you want to use, a script or a tool -toolparam <external tool param> Use "" quotation to quote Parameter for your tool, please exclude input and output for your tool as it should be STDIN and STDOUT for Spark Pipe. Please include reference and place it in the right position in your command line -tooldepend <tool dependencies> Use "" quotation to quote Dependencies for your tool. Or instead, put it in tool path in commandline logic. Default is NONE -outfile <output file> Output sequencing data directory -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Parallelize your own tool like bwa mem: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfPiper Sparkhit.jar [parameters] -fastq query.fq.tar.bz2 -outfile ./outbams_dir -tool "/mypath/bwa mem" -toolparam "/mypath/reference.fa -t 32" sparkhit piper [spark parameter] [parameters] -fastq query.fq.tar.bz2 -outfile ./outbams_dir -tool "/mypath/bwa mem" -toolparam "/mypath/reference.fa -t 32"
Options | Description |
---|---|
-fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2 .
|
-line | The input sequencing data is a fastq file in the line-based format. This line-based sequecing data is useful when users have uncompressed files distributed across different nodes. In this case, a fastq unit is intact on one worker node. |
-tag | >Whether to tag the file name to each sequence or not. This tag function can mark each sequence of the intermediate mapping result. This information is useful when users have a list of samples input in the same time and would like to be summarized seperately. This option will increase the size of output file. |
-filter | Set to filter input fastq files. This is useful when there are errors in a large number of fastq files. |
-tofasta | In case some tools only read fasta files. Set to convert input fastq files into the fasta file format. |
-linetofa | In combination with the -line parameter. In case some tools only read fasta files. Setting this option will convert input fastq files into the fasta file format. |
-tool | a script or tool which you want to use. Make sure it is accessible by each worker node, eg. place it in a shared file sytem or copy it to each worker node. If it is a script or one executable file, add -copy option to copy the script or executable file to each worker node. |
-toolparam | The correspond parameter for the invoked tool. |
-tooldepend | The dependencies of the invoked tool. | -copy | Copy a script or one executable file to each worker node so that it can be used by each Spark executor. |
-outfile | The directory of the output files. |
-partition | Re-distribute input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
When invoked tools have multithreading functions, e.g. it uses 4 cores for each task of the RDD, please also set the
--conf "spark.task.cpus=4"
to the multithreading number on each worker node. Eg., a 32-core worker node usually allows 32 tasks running in parallel. Now you have one task invoking a 4-core multithreading tool, the cpu number for each task is 4. Thus, only 8 tasks is running in parallel. If it is not been set correctly, more tasks will be fighting for resource. An example script can be found at ./sbin/sparkhit-frhit.sh. - To invoke external tools, we need to make sure the tools are accessible by Sparkhit on every worker node (including their dependencies). When copying them to each node or placing them on a shared file system, make sure their dependencies are also included (or use containers, eg. Docker).
-
The three parameters,
-tooldepend -tool -toolparam
, are used to formulate a line of command (which means they are flexible as long as you make sure they are valid commands). However, when using the-copy
option,-tool
will be copied to each worker nodes.
Sparkhit parallelizer options
Sparkhit parallelizer is quite similar to the piper. They both use the Spark pipe
function to invoke external commands and tools. The only difference is that the parallelizer does not manage data distribution, but focuses only on parallelizing a specific command to each worker node. Thus, the options for Sparkhit parallelizer has no input and output. The parallelizer is designed to parallelize an operation to each worker node, eg., download reference genomes to each worker node. See ./sbin/sparkhit-download.sh
To list all options, type command:
$ ./bin/sparkhit parallelizer
SparkHit 15:12:46 SparkHit Parallelizer (parallel operation on different nodes) initiating ... SparkHit 15:12:46 interpreting parameters. Name: SparkHit Parallelizer Options: -nodes <input nodes number> Input nodes number for parallel action -tool <external tool path> Path to an external tool you want to use, a script or a tool -toolparam <external tool param> Use "" quotation to quote Parameter for your tool, please exclude input and output for your tool as it should be STDIN and STDOUT for Spark Pipe. Please include reference and place it in the right position in your command line -tooldepend <tool dependencies> Use "" quotation to quote Dependencies for your tool. Or instead, put it in tool path in commandline logic. Default is NONE -outfile <output file> Output sequencing data directory -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Parallelize an operation to each worker nodes: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfParallelizer Sparkhit.jar [parameters] -nodes 10 -tool "kill -u ec2-user" sparkhit parallelizer [spark parameter] [parameters] -nodes 10 -tool "kill -u ec2-user"
Options | Description |
---|---|
-nodes | How many nodes are there on the Spark cluster |
-tool | A script or tool which you want to use. Make sure it is accessible by Spark on each worker node, eg. place it in a shared file sytem or copy it to each worker node. If it is a script or one executable file, add the -copy option to copy the script or executable file to each worker node. |
-toolparam | The correspond parameter for the invoked tool. |
-tooldepend | The dependencies for the invoked tool. | -copy | Copy a script or one executable file to each worker node so that it can be used by each Spark executor. |
-outfile | Deprecated |
-partition | Deprecated |
-version | Print out the Sparkhit version |
-help | Print the help information |
-
-tool -toolparam -tooldepend
are options used for formulating a command which will be parallelized on all worker nodes. You can leave either one of them empty, as long as the combined command is correct.
Sparkhit cluster options
Sparkhit cluster implements clustering algorithms of the Spark Machine learning library (mllib). It can cluster biological cohorts based on their genotype information or cluster SNPs based on their profile in different biological cohorts. Sparkhit cluster reads the input file(s) and builds a tabular matrix, in which each line is a vector/an unit (a samlpe or a SNP) to be clustered.
To list all options, type command:
$ ./bin/sparkhit cluster
SparkHit 16:21:35 SparkHit Clustering initiating ... SparkHit 16:21:35 interpreting parameters. Name: SparkHit Machine Learning library Options: -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output cluster index file -model <cluster model> clustering model, 0 for hierarchical, 1 for centroid (k-mean), default is 0 -window <SNP window size> window size for a block of snps -column <Columns for Alleles> columns where allele info is set, default is 2-3 -cache weather to cache data in memory or not, default no -cluster <Cluster number> how many leaf clusters, default is 1 -iteration <Iteration number> how many iterations for learning, default is 20 -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfCluster Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file that contains genotype information and annotations. |
-tab | Input a tabular file with raw data (eg. a table as a matrix) |
-outfile | The directory of the output file. |
-model | Clustering model, 0 for hierarchical, 1 for centroid (k means), default is 0 | -window | An integer as the size of a window of SNPs. Instead of clustering based on each SNP, the clustering will use the profile of the SNPs located within each window. |
-column | The column(s) where genotype was placed. See Notes |
-cache | Set to cache input data into the memory. |
-cluster | Maximum number of clusters. |
-iteration | Maximum number of iterations for clustering. The default is 20 |
-partition | Re-distribute input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version |
-help | Print the help information |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please refer to the vcf file in the./example
folder.
Sparkhit tester options
Sparkhit tester implements the Chi-square test algorithm to carry out statistical tests to the input data and returns the P-value of each line in the input file. Thus, each line of the input file should have two groups of data points that will be assigned by -column
and -column2
options.
To list all options, type command:
$ ./bin/sparkhit tester
SparkHit 18:36:41 SparkHit ChisquareTester initiating ... SparkHit 18:36:41 interpreting parameters. Name: SparkHit Machine Learning library Options: -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output alleles p value -column <Columns for Alleles> 1, columns where allele info is set, as case set -column2 <Columns2 for Alleles> 2, columns where allele info is set, as control set -cache weather to cache data in memory or not, default no -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfChisquareTester Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file that contains genotype information and annotations. |
-tab | Input a tabular file with the raw data (eg. a table as a matrix) |
-outfile | The directory of the output file. |
-column | A block of columns where allele info or data points is placed. This dataset is the group A for the test. | -column2 | A block of columns where allele info or data points is placed. This dataset is the group B for the test. |
-cache | Set to cache the data in the memory. The default is not. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see vcf file in the./example
folder.
Sparkhit converter options
In case some tools require a different input file format, eg., a fasta file rather than a fastq file. Sparkhit converter provides parallel file format conversion functions for such case.
To list all options, type command:
$ ./bin/sparkhit converter
SparkHit 22:17:18 SparkHit Converter initiating ... SparkHit 22:17:18 interpreting parameters. Name: SparkHit Converter Options: -fastq <input fastq file> Input Next Generation Sequencing (NGS) data, fastq file format, four line per unit -outfile <output file> Output sequencing data directory -outfm <output file format> Output sequencing data format: 0 line without quality, 1 line with quality, 2 fasta format -partition <re-partition number> re generate number of partitions for .gz data, as .gz data only have one partition (spark parallelization) -version show version information -help print and show this information -h Usage: Convert different file format : spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfConverter Sparkhit.jar [parameters] -fastq query.fq.tar.bz2 -outfile ./outdir sparkhit converter [spark parameter] [parameters] -fastq query.fq.tar.bz2 -outfile ./outdir
Options | Description |
---|---|
-fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2 .
|
-outfile | The directory of the output file. |
-outfm | The output file format. 0 is the line-based format without quality info; 1 is the line-based format with quality info; 2 is the fasta format. |
-partition | Re-distribute input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
When input a "splitable" compressed fastq file, eg., a fastq.bz2, Sparkhit uses the Spark parallel decompresser. Until 2.0.0 version, the Spark parallel decompressor has a potential "thread safty" issue. Please use the Hadoop based decompressor located at
./lib/sparkhit-hadoopDecompressor-1.0-SNAPSHOT.jar
for large file decompressions.
Sparkhit correlationer options
Sparkhit correlationer calculates the Pearson correlation coefficient of two datasets.
SparkHit 22:18:45 SparkHit Correlationer initiating ... SparkHit 22:18:45 interpreting parameters. Name: SparkHit Machine Learning library Options: -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output alleles p value -column <Columns for Alleles> columns where allele info is set -cache weather to cache data in memory or not, default no -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfCorrelationer Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file that contains genotype information and annotations. |
-tab | Input a tabular file with the raw data (eg. a table as matrix) |
-outfile | The directory of the output file. |
-column | The column(s) where genotypes was placed. |
-cache | Set to cache the input data into the memory. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see vcf file in the./example
folder.
Sparkhit decompresser options
SparkHit 22:20:42 SparkHit Decompresser initiating ... SparkHit 22:20:42 interpreting parameters. Name: SparkHit Decompresser Options: -fastq <input compressed fastq file> Input compressed fastq file. Accept wild card, s3n schema, hdfs schema -line <input compressed line file> Input compressed line file format, one line per unit -filter Weather to filter input fastq file or not, default not (big data with small error, who knows) -tofasta Convert input fastq file to fasta before sending to external tool to process -linetofa Convert input line file to fasta before sending to external tool to process -outfile <output file> Output sequencing data directory -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Decomress zipball and tarball using spark codec: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfDecompresser Sparkhit.jar sparkhit.jar [parameters] -fastq input.fq.bz2 -outfile ./decompressed sparkhit decompresser [spark parameter][parameters] -fastq input.fq.bz2 -outfile ./decompressed
Options | Description |
---|---|
-fastq | The input sequencing data is in the fastq format. The input files can be compressed or uncompressed. Users can also use the wild card or comma seperated files to input a batch of files. eg. -fastq /vol/data/samples/*.fq.bz2 .
|
-line | The input sequencing data is a fastq file in the line-based format. This line-based sequecing data is useful when users have uncompressed files distributed across different nodes. In this case, a fastq unit is intact on one worker node. |
-filter | Set to filter input fastq files. This is useful when there are errors in a large number of fastq files. |
-tofasta | In case some tools only read fasta files. Set to convert input fastq files into the fasta file format. |
-linetofa | In combination with the -line parameter. In case some tools only read fasta files. Set to convert input fastq files into the fasta file format. |
-outfile | The directory of the output file. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
Again, the parallel decompression function of Spark, until 2.0.0 version, has a potential "thread safty" issue. Please use the Hadoop based decompressor located at
./lib/sparkhit-hadoopDecompressor-1.0-SNAPSHOT.jar
for large file decompressions.
Sparkhit reductioner options
SparkHit 22:22:14 SparkHit Reductioner (PCA) initiating ... SparkHit 22:22:14 interpreting parameters. Name: SparkHit Machine Learning library Options: -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output major components file -window <SNP window size> window size for a block of snps -column <Columns for Alleles> columns where allele info is set -row Samples listed in row or column, default is column -cache weather to cache data in memory or not, default no -component <Number of components> How many major components to calculate -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfReductioner Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file which contains genotype information and annotations. |
-tab | Input a tabular file with the raw data (eg. a table as matrix) |
-outfile | The directory of the output file. | -window | An integer as the size of a window of SNPs. Instead of analyzing based on each SNP, the dimensional reduction will use the profile of the SNPs located within each window. |
-column | The column(s) where genotypes are placed. |
-row | Choose the direction of your input table. By default, each column is a dimension and each row is an unit to be presented. Set these parameter to change the direction. |
-cache | Set to cache the input data into the memory. |
-component | How many major components to be presented in the result. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see the vcf file in the./example
folder.
Sparkhit regressioner options
SparkHit 22:23:27 SparkHit Regressioner initiating ... SparkHit 22:23:27 interpreting parameters. Name: SparkHit Machine Learning library Options: -train <training data> Input vcf file containing training data -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output cluster index file -model <cluster model> clustering model, 0 for hierarchical, 1 for centroid (k-mean), default is 0 -window <SNP window size> window size for a block of snps -column <Columns for Alleles> columns where allele info is set, default is 2-3 -cache weather to cache data in memory or not, default no -iteration <Iteration number> how many iterations for learning, default is 20 -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfRegressioner Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file which contains genotype information and annotations. |
-tab | Input a tabular file with the raw data (eg. a table as matrix) |
-outfile | The directory of the output file. | -window | An integer as the size of a window of SNPs. Instead of analyzing based on each SNP, the regression will use the profile of the SNPs located within each window. |
-column | The column(s) where genotypes are placed. See Notes |
-cache | Set to cache the input data into the memory. |
-iteration | Maximum number of iterations for regression. Default is 20 |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see the vcf file in the./example
folder.
Sparkhit statisticer options
SparkHit 22:25:27 SparkHit Statisticer (HWE, fisher EX, ChiSq) initiating ... SparkHit 22:25:27 interpreting parameters. Name: SparkHit Machine Learning library Options: -vcf <input VCF file> Input vcf file containing variation info -tab <input tabular file> Input tabular file containing variation info -outfile <output file> Output alleles p value -column <Columns for Alleles> columns where allele info is set -cache weather to cache data in memory or not, default no -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Machine learning library for vcf or tabular file: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfStatisticer Sparkhit.jar [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache sparkhit [command] [spark parameter] [parameters] -vcf genotype.vcf -outfile ./result -column 2-10 -cache
Options | Description |
---|---|
-vcf | Input a VCF format file which contains genotype information and annotations. |
-tab | Input a tabular file with the raw data (eg. a table as matrix) |
-outfile | The directory of the output file. |
-column | The column(s) where genotypes are placed. |
-cache | Set to cache the input data into the memory. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is designed to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see the vcf file in the./example
folder.
Sparkhit variationer options
SparkHit 22:27:11 SparkHit VariantCaller (HDFS bam reader) initiating ... SparkHit 22:27:11 interpreting parameters. Name: SparkHit ScriptPiper (bwa, bowtie2 or other aligner) ###### to be changed Options: -list <input bamfile list> Input list file of HDFS (with hdfs schema) bam file path, one file per line -tool <external tool path> Path to an external tool you want to use, a script or a tool -toolparam <external tool param> Use "" quotation to quote Parameter for your tool, please exclude input and output for your tool as it should be STDIN and STDOUT for Spark Pipe. Please include reference and place it in the right position in your command line -tooldepend <tool dependencies> Use "" quotation to quote Dependencies for your tool. Or instead, put it in tool path in commandline logic. Default is NONE -outfile <output file> Output sequencing data directory -partition <re-partition num> even the load of each task, 1 partition for a task or 4 partitions for a task is recommended. Default, not re-partition -version show version information -help print and show this information -h Usage: Parallelize your own tool like samtools mipleup: spark-submit [spark parameter] --class uni.bielefeld.cmg.sparkhit.main.MainOfVariantioner Sparkhit.jar [parameters] -fastq query.fq.tar.bz2 -outfile ./outbams_dir -tool "/mypath/bwa mem" -toolparam "/mypath/reference.fa -t 32" sparkhit piper [spark parameter] [parameters] -fastq query.fq.tar.bz2 -outfile ./outbams_dir -tool "/mypath/bwa mem" -toolparam "/mypath/reference.fa -t 32"
Options | Description |
---|---|
-list | Input a VCF format file which contains genotype information and annotations. |
-tool | a script or tool which you want to use. Make sure it is accessible by each worker node, eg. place it in a shared file sytem or copy it to each worker node. If it is a script or one executable file, add -copy option to copy the script or executable file to each worker node. |
-toolparam | The correspond parameter for the invoked tool. |
-tooldepend | The dependencies of the invoked tool. |
-outfile | The directory of the output file. |
-partition | Re-distribute the input data into a number of partitions. This is useful when you want to evenly distribute your data for parallel computations. |
-version | Print out the Sparkhit version. |
-help | Print the help information. |
-
-vcf
is used to read genotypes from VCF files. Genotypes are encoded to correspond integers of a matrix: 0|0 is encoded as 0, 0|1 and 1|0 are encoded as 1; 1|1 is encoded as 2. Please see the vcf file in the./example
folder.
Setup a Spark cluster on Sun Grid Engine (SGE)
To setup a Spark cluster on SGE, we can use scripts located in the ./sbin
folder where Spark is installed.
We can, firstly, start a Spark master node at the SGE login node:
sh start-master.sh
Then, a success info should be displayed on you screen:
starting org.apache.spark.deploy.master.Master, logging to $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
Open the log file to see the master node info:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
Spark Command: /usr/lib/jvm/java-1.8.0/jre//bin/java -cp $SPARK_HOME/conf/:$SPARK_HOME/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host zorin --port 7077 --webui-port 8080 ======================================== Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/03/10 13:38:36 INFO Master: Started daemon with process name: 12018@zorin 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for TERM 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for HUP 17/03/10 13:38:36 INFO SignalUtils: Registered signal handler for INT 17/03/10 13:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/10 13:38:38 INFO SecurityManager: Changing view acls to: huanglr 17/03/10 13:38:38 INFO SecurityManager: Changing modify acls to: huanglr 17/03/10 13:38:38 INFO SecurityManager: Changing view acls groups to: 17/03/10 13:38:38 INFO SecurityManager: Changing modify acls groups to: 17/03/10 13:38:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(huanglr); groups with view permissions: Set(); users with modify permissions: Set(huanglr); groups with modify permissions: Set() 17/03/10 13:38:39 INFO Utils: Successfully started service 'sparkMaster' on port 7077. 17/03/10 13:38:39 INFO Master: Starting Spark master at spark://zorin:7077 17/03/10 13:38:39 INFO Master: Running Spark version 2.0.0 17/03/10 13:38:40 INFO Utils: Successfully started service 'MasterUI' on port 8080. 17/03/10 13:38:40 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://zorin:8080 17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066. 17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066 17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE
Now, you can browse the WebUI of the master node via:
http://zorin:8080
, where zorin
is the hostname of the master node.
Next, we will add worker nodes to the master node. To do that, we use the start-slave.sh
script located at the ./sbin
folder where Spark is installed.
To submit a slave deamon to the SGE compute node, we can write a script for the SGE submission:
#!/bin/sh # Set SGE options: ## run the job in the current working directory (where qsub is called) #$ -cwd ## Specify an architecture #$ -l arch=lx24-amd64 ## Specify multislot pe #$ -pe multislot 48 ## Specify memory for each pe job #$ -l vf=2G sh $SPARK_HOME/sbin/start-slave.sh spark://zorin:7077 sleep 100000000
Where spark://zorin:7077
is used for assigning this worker node to the master node we launched before. The code sleep 1000000
is to keep the deamon running on the submitted SGE compute node.
Check the log file again to see if the worker node connection is successful:
less $SPARK_HOME/logs/spark-huanglr-org.apache.spark.deploy.master.Master-1-zorin.out
17/03/10 13:38:40 INFO Utils: Successfully started service on port 6066. 17/03/10 13:38:40 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066 17/03/10 13:38:41 INFO Master: I have been elected leader! New state: ALIVE 17/03/10 14:03:58 INFO Master: Registering worker statler:45341 with 48 cores, 250.8 GB RAM
To shutdown worker nodes, simply use qdel
command to delete submitted jobs.
-
To launch a group of worker nodes, use the
start-slaves.sh
script located in the./sbin
folder. Before running thestart-slaves.sh
script, include all IP addresses of the compute nodes to the./conf/slaves
file in$SPARK_HOME
.
Setup a Spark cluster on the AWS EC2 cloud
We use Spark-ec2 to setup a Spark cluster on the Amazon EC2 cloud. Detailed instructions can be found at Spark-ec2. Here, we provide an example to setup a 5-node Spark cluster.
To view all options:
$ $SPARK_HOME/ec2/spark-ec2
Usage: spark-ec2 [options] <action> <cluster_name> <action> can be: launch, destroy, login, stop, start, get-master, reboot-slaves Options: --version show program's version number and exit -h, --help show this help message and exit -s SLAVES, --slaves=SLAVES Number of slaves to launch (default: 1) -w WAIT, --wait=WAIT DEPRECATED (no longer necessary) - Seconds to wait for nodes to start -k KEY_PAIR, --key-pair=KEY_PAIR Key pair to use on instances -i IDENTITY_FILE, --identity-file=IDENTITY_FILE SSH private key file to use for logging into instances -p PROFILE, --profile=PROFILE If you have multiple profiles (AWS or boto config), you can configure additional, named profiles by using this option (default: none) -t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE Type of instance to launch (default: m1.large). WARNING: must be 64-bit; small instances won't work -m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE Master instance type (leave empty for same as instance-type) -r REGION, --region=REGION EC2 region used to launch instances in, or to find them in (default: us-east-1) -z ZONE, --zone=ZONE Availability zone to launch instances in, or 'all' to spread slaves across multiple (an additional $0.01/Gb for bandwidthbetween zones applies) (default: a single zone chosen at random) -a AMI, --ami=AMI Amazon Machine Image ID to use -v SPARK_VERSION, --spark-version=SPARK_VERSION Version of Spark to use: 'X.Y.Z' or a specific git hash (default: 1.6.0) --spark-git-repo=SPARK_GIT_REPO Github repo from which to checkout supplied commit hash (default: https://github.com/apache/spark) --spark-ec2-git-repo=SPARK_EC2_GIT_REPO Github repo from which to checkout spark-ec2 (default: https://github.com/amplab/spark-ec2) --spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH Github repo branch of spark-ec2 to use (default: branch-1.5) --deploy-root-dir=DEPLOY_ROOT_DIR A directory to copy into / on the first master. Must be absolute. Note that a trailing slash is handled as per rsync: If you omit it, the last directory of the --deploy-root-dir path will be created in / before copying its contents. If you append the trailing slash, the directory is not created and its contents are copied directly into /. (default: none). --hadoop-major-version=HADOOP_MAJOR_VERSION Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default: 1) -D [ADDRESS:]PORT Use SSH dynamic port forwarding to create a SOCKS proxy at the given local address (for use with login) --resume Resume installation on a previously launched cluster (for debugging) --ebs-vol-size=SIZE Size (in GB) of each EBS volume. --ebs-vol-type=EBS_VOL_TYPE EBS volume type (e.g. 'gp2', 'standard'). --ebs-vol-num=EBS_VOL_NUM Number of EBS volumes to attach to each node as /vol[x]. The volumes will be deleted when the instances terminate. Only possible on EBS-backed AMIs. EBS volumes are only attached if --ebs-vol-size > 0. Only support up to 8 EBS volumes. --placement-group=PLACEMENT_GROUP Which placement group to try and launch instances into. Assumes placement group is already created. --swap=SWAP Swap space to set up per node, in MB (default: 1024) --spot-price=PRICE If specified, launch slaves as spot instances with the given maximum price (in dollars) --ganglia Setup Ganglia monitoring on cluster (default: True). NOTE: the Ganglia page will be publicly accessible --no-ganglia Disable Ganglia monitoring for the cluster -u USER, --user=USER The SSH user you want to connect as (default: root) --delete-groups When destroying a cluster, delete the security groups that were created --use-existing-master Launch fresh slaves, but use an existing stopped master if possible --worker-instances=WORKER_INSTANCES Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN is used as Hadoop major version (default: 1) --master-opts=MASTER_OPTS Extra options to give to master through SPARK_MASTER_OPTS variable (e.g -Dspark.worker.timeout=180) --user-data=USER_DATA Path to a user-data file (most AMIs interpret this as an initialization script) --authorized-address=AUTHORIZED_ADDRESS Address to authorize on created security groups (default: 0.0.0.0/0) --additional-security-group=ADDITIONAL_SECURITY_GROUP Additional security group to place the machines in --additional-tags=ADDITIONAL_TAGS Additional tags to set on the machines; tags are comma-separated, while name and value are colon separated; ex: "Task:MySparkProject,Env:production" --copy-aws-credentials Add AWS credentials to hadoop configuration to allow Spark to access S3 --subnet-id=SUBNET_ID VPC subnet to launch instances in --vpc-id=VPC_ID VPC to launch instances in --private-ips Use private IPs for instances rather than public if VPC/subnet requires that. --instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR Whether instances should terminate when shut down or just stop --instance-profile-name=INSTANCE_PROFILE_NAME IAM profile name to launch instances under
To setup a Spark cluster, we have to specify the master node and the number of worker nodes with options -m
and -s
. If you want to use spot instances, set the option --spot-price=0.8
, where 0.8 is the bid price in dollar.
You also have to set your correspond AWS security Key with -k rhinempi
and -i /vol/AWS/Key/rhinempi.pem
. Here, rhinempi is the name of my security Key.
Also include your AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
to your ENV:
$ vi ~/.bash_profile
# --- spark ec2 env --- # export AWS_ACCESS_KEY_ID="xxxxxxxxxx" export AWS_SECRET_ACCESS_KEY="xxxxxxxxxxx"
Update your ENV:
source ~/.bash_profile
$ $SPARK_HOME/ec2/spark-ec2 -m m1.xlarge -k rhinempi -i /vol/AWS/Key/rhinempi.pem -s 5 --spot-price=0.8 --instance-type=c3.8xlarge --region=eu-west-1 launch Map-All-HMP
Setting up security groups... Searching for existing cluster Map-All-HMP in region eu-west-1... Spark AMI: ami-1ae0166d Launching instances... Requesting 5 slaves as spot instances with price $0.800 Waiting for spot instances to be granted... All 5 slaves granted Launched master in eu-west-1a, regid = r-0cd1d32a495250e99 Waiting for AWS to propagate instance metadata... Waiting for cluster to enter 'ssh-ready' state......
To terminate the cluster, use the destroy
command of Spark-ec2:
$ $SPARK_HOME/ec2/spark-ec2 --region=eu-west-1 destroy Map-All-HMP
Searching for existing cluster Map-All-HMP in region eu-west-1... Found 1 master, 5 slaves. The following instances will be terminated: > ec2-54-194-240-108.eu-west-1.compute.amazonaws.com > ec2-54-154-111-11.eu-west-1.compute.amazonaws.com > ec2-54-154-69-167.eu-west-1.compute.amazonaws.com > ec2-54-229-5-191.eu-west-1.compute.amazonaws.com > ec2-54-194-140-87.eu-west-1.compute.amazonaws.com > ec2-54-154-186-102.eu-west-1.compute.amazonaws.com ALL DATA ON ALL NODES WILL BE LOST!! Are you sure you want to destroy the cluster huanglrHMP? (y/N) y Terminating master... Terminating slaves...
-
For the Spark 2.0.0 version, the spark-ec2 script is not included in the package distribution. To download
spark-ec2
, use the following linkhttps://github.com/amplab/spark-ec2/archive/branch-2.0.zip
.