Saturday, July 18, 2015

Custom RDD in apache spark using python(Using custom classes for creating RDD).

RDD is resilient distributed dataset.It is an abstraction provided by apache spark for distributed operation on data.The post is about how to  write the custom RDD in python(pyspark).

Following code shows simple python class with single data member.

 class Data(object):  
           def __init__(self,data):  
           def increment(self):  
            += 1  
           def printData(self):  
Currently,apache spark does not support the class definition to be in the current script because of pickling issue.That is why we have to maintain the class definition in different module(say Following code list gives how to use the above mentioned class to create the custom RDD. file.
from pyspark import SparkContext  
from mod import Data as Data  
#creating the rdd from list of integers.
#Function to transform simple list rdd to custom rdd
def func(item):  
           return d  
#Function to cal increment on each element of rdd1
def func2(item):  
           return item  
#actual list rdd to custom rdd conversion  
print rdd1.count()  
#increment data field of each item from rdd1 by calling member function increment 
for i in rdd1.collect():  
To execute the code just type spark-submit --py-files

Friday, July 10, 2015

Installing apache spark standalone mode to cluster.

Hello all,

This post is about how to install apache spark on cluster.Apache Spark supports cluster manager like
1)Standalone cluster manager embedded with spark itself.
2)Apache MESOS
3)Apache YARN

This post elaborates how to install apache spark standalone on cluster.

1)The key thing for any cluster is password less ssh.

I am creating cluster of 2 machines( and So I have to configure password less communication between them.Assume that we are having spark user on both the machines.

Follow the steps given below.

 #on machine  
 spark@]#ssh-keygen -t RSA -P ""  
 #copy to  
 spark@]#ssh-copy-id  spark@  
 #on machine  
 spark@]#ssh-keygen -t RSA -P ""  
 #copy to  
 spark@]# ssh-copy-id  spark@  

2)Download  the compiled binary of apache spark , copy and untar  at the same location.In my case it is /opt/spark.

3)Go inside /{$spark-home}/conf  ( in my case it is /opt/spark/conf)
It contains slave.template file. Just execute the following commands

 #on machine  as I am considering it as master machine.
 spark@ conf]#cp slaves.template slaves  

Modify this file and add the ip address of worker nodes.
Content of file will look like this.

 # A Spark Worker will be started on each of the machines listed below.  
4)Then execute the following commands
 #on machine  
 spark@ conf]#cp  
 #on machine  
 spark@ conf]#cp  
5)Modify file on both the machine and add SPARK_MASTER_IP=
6)Now go inside the /{$spark-home}/sbin directory on master machine (In my case and execute ./ will start master and worker on master node ( and worker on 2nd slave node (
You can check it by executing jps command on both the machines.
7)Assume that we want to submit python script to spark cluster.Go inside 
 /{$spark-home}/bin and execute ./spark-submit --master spark://  path-to-python-script

Thursday, July 9, 2015

Custom Accumulators in Spark using python

Apache Spark is open source framework for big data processing with supports for machine learning,graph processing in memory computation.
This post is for how to right custom accumulators in spark using python (pyspark).

Accumulators are shared variable and and can be used to maintain the counters or sum across the RDD(Resilient Distributed Dataset).

While writing this page I am assuming that visitors are aware of RDD and apache spark.I am using python and pyspark to demonstrate the accumulator.

Assume that we  are having multidimensional vectors  and we want to add up them into a single vector. For Example,
 if vec1={'a':10,'b':30}   and vec2={'a':10,'b':40}, we want vect=vect1+vect2

To create the custom accumulator,programmer have to subclass AccumulatorParam interface.Programmer has to implement zero and addInPlace method.Accumulator supports those operation which are associative,like + operator.So programmer have to provide the zero vector for this operator.

Here I  am considering addition of vectors in python dictionary form where each vector is having 100 dimensions.

Complete code is given below:

 from pyspark import SparkContext  
 from pyspark import AccumulatorParam  
 import random  
 #creating spark context   
 #custom accumulator class  
 class VectorAccumulatorParam(AccumulatorParam):  
      def zero(self, value):  
           for i in range(0,len(value)):  
           return dict1  
      def addInPlace(self, val1, val2):  
           for i in val1.keys():  
                     val1[i] += val2[i]  
           return val1  
 #creating zero vector for addition  
 for i in range(0,100):  
 #creating 10 vectors each with dimension 100 and randomly initialized   
 for j in range(0,10):  
      for i in range(0,100):  
 #creating rdd from 10 vectors  
 #creating accumulator   
 va = sc.accumulator(c, VectorAccumulatorParam())  
 #action to be executed on rdd in order to sumup vectors  
 def sum(x):  
      global va  
      va += x  
 #print the value of accumulator  
 print va.value