Saturday, July 18, 2015

Custom RDD in apache spark using python(Using custom classes for creating RDD).

RDD is resilient distributed dataset.It is an abstraction provided by apache spark for distributed operation on data.The post is about how to  write the custom RDD in python(pyspark).

Following code shows simple python class with single data member.

 class Data(object):  
           def __init__(self,data):  
                     self.data=data  
           def increment(self):  
                     self.data += 1  
           def printData(self):  
                     print self.data  
Currently,apache spark does not support the class definition to be in the current script because of pickling issue.That is why we have to maintain the class definition in different module(say mod.py). Following code list gives how to use the above mentioned class to create the custom RDD.

#code.py file.
from pyspark import SparkContext  
from mod import Data as Data  
sc=SparkContext()  
#creating the rdd from list of integers.
rdd=sc.parallelize([1,2,3,4])  
#Function to transform simple list rdd to custom rdd
def func(item):  
           d=Data(item)  
           return d  
#Function to cal increment on each element of rdd1
def func2(item):  
           item.increment()  
           return item  
#actual list rdd to custom rdd conversion 
rdd1=rdd.map(func)  
print rdd1.count()  
#increment data field of each item from rdd1 by calling member function increment 
rdd1.foreach(func2) 
for i in rdd1.collect():  
           i.printData()  
To execute the code just type spark-submit --py-files mod.py code.py.

Friday, July 10, 2015

Installing apache spark standalone mode to cluster.

Hello all,

This post is about how to install apache spark on cluster.Apache Spark supports cluster manager like
1)Standalone cluster manager embedded with spark itself.
2)Apache MESOS
3)Apache YARN

This post elaborates how to install apache spark standalone on cluster.

1)The key thing for any cluster is password less ssh.

I am creating cluster of 2 machines(192.168.5.11 and 192.168.9.159). So I have to configure password less communication between them.Assume that we are having spark user on both the machines.

Follow the steps given below.

 #on machine 192.168.5.11  
 spark@192.168.5.11]#ssh-keygen -t RSA -P ""  
 #copy id_rsa.pub to 192.168.9.159  
 spark@192.168.5.11]#ssh-copy-id  spark@192.168.9.159  
 #on machine 192.168.9.159  
 spark@192.168.9.159]#ssh-keygen -t RSA -P ""  
 #copy id_rsa.pub to 192.168.5.11  
 spark@192.168.9.159]# ssh-copy-id  spark@192.168.5.11  

2)Download  the compiled binary of apache spark , copy and untar  at the same location.In my case it is /opt/spark.

3)Go inside /{$spark-home}/conf  ( in my case it is /opt/spark/conf)
It contains slave.template file. Just execute the following commands
 

 #on machine 192.168.9.159  as I am considering it as master machine.
 spark@192.168.9.159 conf]#cp slaves.template slaves  

Modify this file and add the ip address of worker nodes.
Content of file will look like this.

 # A Spark Worker will be started on each of the machines listed below.  
 192.168.9.159  
 192.168.5.11  
4)Then execute the following commands
 #on machine 192.168.5.11  
 spark@192.168.5.11 conf]#cp spark-env.sh.template spark-env.sh  
 #on machine 192.168.9.159  
 spark@192.168.9.159 conf]#cp spark-env.sh.template spark-env.sh  
5)Modify spark-env.sh file on both the machine and add SPARK_MASTER_IP=192.168.9.159
6)Now go inside the /{$spark-home}/sbin directory on master machine (In my case 192.168.9.159) and execute ./start-all.sh.It will start master and worker on master node (192.168.9.159) and worker on 2nd slave node (192.168.5.11)
You can check it by executing jps command on both the machines.
7)Assume that we want to submit python script to spark cluster.Go inside 
 /{$spark-home}/bin and execute ./spark-submit --master spark://192.168.9.159:7077  path-to-python-script

Thursday, July 9, 2015

Custom Accumulators in Spark using python

Apache Spark is open source framework for big data processing with supports for machine learning,graph processing in memory computation.
This post is for how to right custom accumulators in spark using python (pyspark).

Accumulators are shared variable and and can be used to maintain the counters or sum across the RDD(Resilient Distributed Dataset).

While writing this page I am assuming that visitors are aware of RDD and apache spark.I am using python and pyspark to demonstrate the accumulator.

Assume that we  are having multidimensional vectors  and we want to add up them into a single vector. For Example,
 if vec1={'a':10,'b':30}   and vec2={'a':10,'b':40}, we want vect=vect1+vect2
i.e.vect={'a':20,'b':70}.

To create the custom accumulator,programmer have to subclass AccumulatorParam interface.Programmer has to implement zero and addInPlace method.Accumulator supports those operation which are associative,like + operator.So programmer have to provide the zero vector for this operator.

Here I  am considering addition of vectors in python dictionary form where each vector is having 100 dimensions.


Complete code is given below:



 from pyspark import SparkContext  
 from pyspark import AccumulatorParam  
 import random  
 #creating spark context   
 sc=SparkContext()  
 #custom accumulator class  
 class VectorAccumulatorParam(AccumulatorParam):  
      def zero(self, value):  
           dict1={}  
           for i in range(0,len(value)):  
                     dict1[i]=0  
           return dict1  
      def addInPlace(self, val1, val2):  
           for i in val1.keys():  
                     val1[i] += val2[i]  
           return val1  
 #creating zero vector for addition  
 c={}  
 rand=[]  
 for i in range(0,100):  
           c[i]=0   
 #creating 10 vectors each with dimension 100 and randomly initialized   
 rand=[]  
 for j in range(0,10):  
      dict1={}  
      for i in range(0,100):  
           dict1[i]=random.random()  
     rand.append(dict1)       
 #creating rdd from 10 vectors  
 rdd1=sc.parallelize(rand)  
 #creating accumulator   
 va = sc.accumulator(c, VectorAccumulatorParam())  
 #action to be executed on rdd in order to sumup vectors  
 def sum(x):  
      global va  
      va += x  
 rdd1.foreach(sum)   
 #print the value of accumulator  
 print va.value