Thursday, July 9, 2015

Custom Accumulators in Spark using python

Apache Spark is open source framework for big data processing with supports for machine learning,graph processing in memory computation.
This post is for how to right custom accumulators in spark using python (pyspark).

Accumulators are shared variable and and can be used to maintain the counters or sum across the RDD(Resilient Distributed Dataset).

While writing this page I am assuming that visitors are aware of RDD and apache spark.I am using python and pyspark to demonstrate the accumulator.

Assume that we  are having multidimensional vectors  and we want to add up them into a single vector. For Example,
 if vec1={'a':10,'b':30}   and vec2={'a':10,'b':40}, we want vect=vect1+vect2
i.e.vect={'a':20,'b':70}.

To create the custom accumulator,programmer have to subclass AccumulatorParam interface.Programmer has to implement zero and addInPlace method.Accumulator supports those operation which are associative,like + operator.So programmer have to provide the zero vector for this operator.

Here I  am considering addition of vectors in python dictionary form where each vector is having 100 dimensions.


Complete code is given below:



 from pyspark import SparkContext  
 from pyspark import AccumulatorParam  
 import random  
 #creating spark context   
 sc=SparkContext()  
 #custom accumulator class  
 class VectorAccumulatorParam(AccumulatorParam):  
      def zero(self, value):  
           dict1={}  
           for i in range(0,len(value)):  
                     dict1[i]=0  
           return dict1  
      def addInPlace(self, val1, val2):  
           for i in val1.keys():  
                     val1[i] += val2[i]  
           return val1  
 #creating zero vector for addition  
 c={}  
 rand=[]  
 for i in range(0,100):  
           c[i]=0   
 #creating 10 vectors each with dimension 100 and randomly initialized   
 rand=[]  
 for j in range(0,10):  
      dict1={}  
      for i in range(0,100):  
           dict1[i]=random.random()  
     rand.append(dict1)       
 #creating rdd from 10 vectors  
 rdd1=sc.parallelize(rand)  
 #creating accumulator   
 va = sc.accumulator(c, VectorAccumulatorParam())  
 #action to be executed on rdd in order to sumup vectors  
 def sum(x):  
      global va  
      va += x  
 rdd1.foreach(sum)   
 #print the value of accumulator  
 print va.value   

No comments:

Post a Comment