Thursday, October 8, 2015

Resilient Distributed Dataset and Related Operations (Transformations and actions) in apache spark(part 1).

These post is about the RDD(Resilient Distributed Dataset) the backbone of apache spark.Apache spark is  memory based distributed computing framework an alternative to haoop for  some scenario.It is based on the immutable abstraction called as RDD.Unlike, similar distributed computing frameworks like hadoop,apache spark is just execution engine.It supports various types of input source like HDFS,Mongodb,cassandra etc.The core of apache spark is written in Scala.But it also supports Java and Python language for writing driver code.

The developers kept it as simple and as lightweight as possible for end user.User just have to focus on the main logic rather than focusing on the complete flow of algorithm.In runtime environment like mapreduce,user have to code considering mapreduce design pattern while writing the application.Even in case of MPI(Message passing interface),programmer takes care of communication between multiple processes and also takes care of sending data to processes.But in apache spark user focuses on driver code and required operation on data,run time engine will take care of parallelizing the computation and scheduling.
Now days apache spark is having number of use cases where we want to handle huge data and computation pattern is mainly iterative and demands user interaction.The most common use cases are the machine learning algorithm like K Means.Here the apache spark stays ahead of hadoop and MPI.
As I said the apache spark is distributed computing framework ,it must have supports for the traditional requirements of distributed computing frameworks like
1)Distributed Computation
2)Fault tolerance

It also supports requirements which are very common in todays big data era.

1)Persistence level.
2)Lazy evaluation.

Now,lets try to discuss them in detail.

1) Distributed Computation:

As the other distributed and parallel computing frameworks supports distributed computing,apache spark also supports the parallelization using RDD and partition.Partition are logical division of RDD and number of partitions are number of independent tasks that we want to execute in parallel.RDD is immutable i.e you can create new RDD but you can not modify it.The operation on RDD is creates new RDD.Its like string in java and tuple in python.

2)Fault tolerance:

As user creates the first RDD,he/she can execute number of operations on it,some of them causes creation of new RDDs and respective partitions.In other words we can say that RDD is just a metadata rather calling it as actual data.It contains information about how to compute it from other RDD or a data source.Thus the complete driver program may be having more than one RDD each having complete details and operation required to compute the RDD.If single node fails respective partition will be lost,but as RDD is having complete information required to compute it we can recompute the partition as and when needed.Thus it provides fault tolerance.

3)Lazy evaluation:
Most of the mining algorithm that works on big data require to shuffle and load huge size data that does not fit in main memory of single machine.Even we successfully club the memory of all machines to form single logical main memory it is not possible to accommodate this intermediate results completely using just main memory of all nodes.Thus apache spark postponed the computation of such intermediate results until required in some action.This is called as lazy evaluation.

4)Persistence level:
 Most of the algorithms like K Means(Centroid list) require the same RDD again and again.So instead of maintaining it in harddisk apache spark allows you to maintain it in memory by using persistence level concept.You can maintain it in hard disk,memory or both as required.

This a logical view of apache spark RDD concept.In next part we will discuss what are transformations and actions possible on RDD.

Saturday, July 18, 2015

Custom RDD in apache spark using python(Using custom classes for creating RDD).

RDD is resilient distributed dataset.It is an abstraction provided by apache spark for distributed operation on data.The post is about how to  write the custom RDD in python(pyspark).

Following code shows simple python class with single data member.

 class Data(object):  
           def __init__(self,data):  
           def increment(self):  
            += 1  
           def printData(self):  
Currently,apache spark does not support the class definition to be in the current script because of pickling issue.That is why we have to maintain the class definition in different module(say Following code list gives how to use the above mentioned class to create the custom RDD. file.
from pyspark import SparkContext  
from mod import Data as Data  
#creating the rdd from list of integers.
#Function to transform simple list rdd to custom rdd
def func(item):  
           return d  
#Function to cal increment on each element of rdd1
def func2(item):  
           return item  
#actual list rdd to custom rdd conversion  
print rdd1.count()  
#increment data field of each item from rdd1 by calling member function increment 
for i in rdd1.collect():  
To execute the code just type spark-submit --py-files

Friday, July 10, 2015

Installing apache spark standalone mode to cluster.

Hello all,

This post is about how to install apache spark on cluster.Apache Spark supports cluster manager like
1)Standalone cluster manager embedded with spark itself.
2)Apache MESOS
3)Apache YARN

This post elaborates how to install apache spark standalone on cluster.

1)The key thing for any cluster is password less ssh.

I am creating cluster of 2 machines( and So I have to configure password less communication between them.Assume that we are having spark user on both the machines.

Follow the steps given below.

 #on machine  
 spark@]#ssh-keygen -t RSA -P ""  
 #copy to  
 spark@]#ssh-copy-id  spark@  
 #on machine  
 spark@]#ssh-keygen -t RSA -P ""  
 #copy to  
 spark@]# ssh-copy-id  spark@  

2)Download  the compiled binary of apache spark , copy and untar  at the same location.In my case it is /opt/spark.

3)Go inside /{$spark-home}/conf  ( in my case it is /opt/spark/conf)
It contains slave.template file. Just execute the following commands

 #on machine  as I am considering it as master machine.
 spark@ conf]#cp slaves.template slaves  

Modify this file and add the ip address of worker nodes.
Content of file will look like this.

 # A Spark Worker will be started on each of the machines listed below.  
4)Then execute the following commands
 #on machine  
 spark@ conf]#cp  
 #on machine  
 spark@ conf]#cp  
5)Modify file on both the machine and add SPARK_MASTER_IP=
6)Now go inside the /{$spark-home}/sbin directory on master machine (In my case and execute ./ will start master and worker on master node ( and worker on 2nd slave node (
You can check it by executing jps command on both the machines.
7)Assume that we want to submit python script to spark cluster.Go inside 
 /{$spark-home}/bin and execute ./spark-submit --master spark://  path-to-python-script

Thursday, July 9, 2015

Custom Accumulators in Spark using python

Apache Spark is open source framework for big data processing with supports for machine learning,graph processing in memory computation.
This post is for how to right custom accumulators in spark using python (pyspark).

Accumulators are shared variable and and can be used to maintain the counters or sum across the RDD(Resilient Distributed Dataset).

While writing this page I am assuming that visitors are aware of RDD and apache spark.I am using python and pyspark to demonstrate the accumulator.

Assume that we  are having multidimensional vectors  and we want to add up them into a single vector. For Example,
 if vec1={'a':10,'b':30}   and vec2={'a':10,'b':40}, we want vect=vect1+vect2

To create the custom accumulator,programmer have to subclass AccumulatorParam interface.Programmer has to implement zero and addInPlace method.Accumulator supports those operation which are associative,like + operator.So programmer have to provide the zero vector for this operator.

Here I  am considering addition of vectors in python dictionary form where each vector is having 100 dimensions.

Complete code is given below:

 from pyspark import SparkContext  
 from pyspark import AccumulatorParam  
 import random  
 #creating spark context   
 #custom accumulator class  
 class VectorAccumulatorParam(AccumulatorParam):  
      def zero(self, value):  
           for i in range(0,len(value)):  
           return dict1  
      def addInPlace(self, val1, val2):  
           for i in val1.keys():  
                     val1[i] += val2[i]  
           return val1  
 #creating zero vector for addition  
 for i in range(0,100):  
 #creating 10 vectors each with dimension 100 and randomly initialized   
 for j in range(0,10):  
      for i in range(0,100):  
 #creating rdd from 10 vectors  
 #creating accumulator   
 va = sc.accumulator(c, VectorAccumulatorParam())  
 #action to be executed on rdd in order to sumup vectors  
 def sum(x):  
      global va  
      va += x  
 #print the value of accumulator  
 print va.value   

Friday, May 15, 2015

Integration Of Maui with TORQUE.

Maui is open source cluster scheduler for cluster and is written in C.TORQUE is open source resource manager,originally known as PBS.It is purely written in C. It manages both job and node.It provides simple scheduler.But default scheduler is not customizable.Maui is customizable and provides various algorithms and policies (for example Fairshare,backfill etc.)
While integrating maui with torque we have to specify some option at the time of installation of maui.
This options are listed below.
--prefix=< location where you want to install architecture independent file.>
--exec-prefix=<location where you want to install architecture dependent file.>
--with-pbs=<location where pbs lib directory  is located.>
--with-spooldir= <location where you want to maintain config files,logs and stat directory.>

Suppose location for installing architecture dependent as well as independent files is /opt/maui then  and suppose TORQUE is installed at location /opt/torque and I am using maui-3.2.6p20 for installation.Following are the steps for installation.
1)tar -xvf  maui-3.2.6p20.tar.gz
2)cd  maui-3.2.6p20
3)./configure --prefix=/opt/maui --exec-prefix=/opt/maui --with-pbs=/opt/torque press enter.
5)make install

And maui is integrated with TORQUE.


How to allow root user to submit and run job in TORQUE.

By default TORQUE(Tera scale open source resource and queue manager) does not allow root user to submit and run the job.But you can change this behavior,using following command.
1)type qmgr
You will get something like this on screen.
Max open servers: 4 
2)Now enter following command line.
Qmgr: qmgr -c 's s acl_roots+=root@*'


Creating user with authentication in mongodb.

1.Start mongodb server
 ./mongod --dbpath datbaseDir  

2.Start mongo client command line interface.

3.Create admin user for all databases.
 >use admin  

4.Press CTRL+C

5.Now type
 mongo --port 27017 -u adminDatabase -p 123 --authenticationDatabase admin  

6.Now it will jump to prompt with adminDatabase user as logged in user.Now assume that you want to create new database newTest and want to create a admin user mahesh for it.Assume that mahesh must be able to add new user for database newTest.Steps are given below
 > use newTest  
 switched to db newTest  
 > db.createUser({user:"mahesh",pwd:"123",roles:[{role:"userAdmin",db:"newTest"}]})  

Now you can logged in using mahesh user.
 mongo newTest --port 27017 -u mahesh -p  

But if you try to create document using mahesh user it will show the error.
 > db.doc1.insert({"name":"shailesh"})  
     "writeError" : {  
         "code" : 13,  
         "errmsg" : "not authorized on newTest to execute command { insert: \"doc1\", documents: [ { _id: ObjectId('5555991b5102d7a82ac1daef'), name: \"shailesh\" } ], ordered: true }"  

It is because the permissions given to mahesh.Mahesh can just create new user not new collection.

7.Lets create new user on same database newTest but with read and write permissions.Logged in again with mahesh user.
 mongo newTest --port 27017 -u mahesh -p  
 > db.createUser({user:"mahesh1",pwd:"123",roles:[{role:"readWrite",db:"newTest"}]})  

Now mahesh1 can easily insert document into newTest.

Thursday, May 14, 2015

MongoDB java connectivity "Exception in thread "main" java.lang.NoClassDefFoundError: org/bson/codecs/Decoder"


MongoDB is cross platform,document based,NoSQL database.I am using mongodb 3.0.3.If u search for mongodb java driver,you will get the following link as search result.

This page is the download page for mongodb java driver.You have to select the Operating System and version for the driver.While writing this blog the driver was having 2 versions 3.0.1 and 2.13.1.I used 3.0.1.

Here is the simple java code.
 import com.mongodb.MongoClient;  
 public class mongoCon {  
      public static void main(String[] args)  
           MongoClient c1=new MongoClient("localhost",27017);  
           List<String> dbs=c1.getDatabaseNames();  
           for(String db: dbs)  

I used eclipse IDE and configured it for mongodb java driver by adding driver as external jar file.

When I execute the code it gets melt down.

It shows following error.
 Exception in thread "main" java.lang.NoClassDefFoundError: org/bson/codecs/Decoder  
   at mongo_java.mongoCon.main(  
 Caused by: java.lang.ClassNotFoundException: org.bson.codecs.Decoder  

It is because of jar file that I was using.When I opened it using archive manager there was no class file for Decoder.Even there was no folder codecs inside org/bson.

The solution is to download source code of driver and build it to get jar.
I downloaded it from
Then extract it and change directory to the extracted location and type ./gradlew.
 tar -xf mongo-java-driver-r3.0.1.tar.gz  
 cd mongo-java-driver-r3.0.1  
 ./gradlew jar  
  It will build the project and will  show "BUILD SUCCESSFUL" if everything is ok. If your network requires proxy setting just change "./gradlew jar" to
 ./gradlew -Dhttps.proxyHost= -Dhttps.proxyPort=3128 jar  
After this,you can get the jar file in mongo-java-driver directory of mongo-java-driver-r3.0.1. Just add this jar file as the external jar for your eclipse project and it will work.

Thursday, March 26, 2015

Opencl Installation on Fedora 19(32 bit).

OpenCl is well known framework when we talk about the parallelization involving heterogeneous platforms like CPU,GPU etc.

This post is about how to install opencl on Fedora 19(32 bit).The target machine is having Intel C2D Processor with no other hardware(device).I am using AMD SDK for installation which supports both amd and intel platform.

Steps are listed below.

This steps require the super user permissions.
2.Download  AMD-APP-SDK-linux-v2.9-1.599.381-GA-x64.tar.bz2

3. tar -xvf  AMD-APP-SDK-linux-v2.9-1.599.381-GA-x64.tar.bz2

It will be extracted to

4. chmod 755

5.Create directory for installation "$ mkdir /opt/opencl"

6.Execute the extracted shell script


It will show long agreements.Just press q.It will ask whether u want to install.Just type yes press enter

7.It will ask for installation location.Just specify /opt/opencl.

8.Install opencl headers yum install opencl-headers

9. modify the .bashrc file for root user and write

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/intel/opencl/lib64:/opt/intel/opencl/libmic:/opt/intel/opencl-1.2-

10. to compile the file just use this command
gcc -L /opt/opencl/AMDAPPSDK-2.9-1/lib/x86_64/ -lOpenCL main.c -o binary