Thursday, February 18, 2016

Finding most frequent word using apache spark.

Hello all this code is for finding the most frequent word from corpus of text file using apache spark.I am assuming that the readers are aware of basic rdd concepts.Rdd provides the transformations and actions for writing the logic.It also provides api for creating rdd from various data source(hdfs,mongodb etc).

The flow of algorithm is as follow.
1.Load data from hdfs
  
 from pyspark import SparkContext  
 sc=SparkContext()  
 rddD=sc.textFile("hdfs://localhost:54310/data/hadoop/spark-input/wc.txt")   

2.Calculate the total occurrence of every word using transformations.

 flM=rddD.flatMap(lambda x: x.split()).map(lambda x: (x.lower(),1)).reduceByKey(lambda x,y:x+y)  

3.Subtract the stop words from the rdd.
 As the normal text corpus has very high proportion of stop words we have to eliminate them.To achieve this we have to create rdd of stop words and we have to subtract it from the rddD.The list of stop words can be maintained as file in hdfs.We can load this file to create the respective rdd directly.
 rddS=sc.textFile("hdfs://localhost:54310/data/hadoop/spark-input/stopwords_en.txt")  
 flS=rddS.flatMap(lambda x: x.split("\n")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)  
 flM=flM.subtractByKey(flS)  
4.Find the most frequent word.It can be achieved using the reduce action on rdd.The logic is to compare the words with there frequency as criteria of comparison.
 def comp(x,y):  
         if x[1]<y[1]:  
              return y  
         else:  
              return x  
 maxOcc=flM.reduce(comp)  







maxOcc will have the most frequent word with its occurrence.

You can get this source code at https://github.com/shaileshcheke/spark-examples.git also.
     

Friday, February 12, 2016

MPI clustering using beaglebone and hostmachine.


The page explains how to install 2 node mpi cluster,where one node is my laptop with Fedora 19 and C2D processor,other node is beaglebone black with debian linux and arm processor.(as OpenMPI supports heterogeneous nodes.) I am using openmpi-1.6.5 for installing cluster.
1.Plugin beaglebone to hostmachine using usb cable.It will configure passwordless ssh between the root user of host and that of beaglebone automatically.You can check it by executing command ssh 192.168.7.2. 192.168.7.2 is ip address assigned to usb interface of the beaglebone.It can be changed by modifying /etc/network/interfaces file.For my beaglebone device the usb0 interface is configured as given below.

iface usb0 inet static

address 192.168.7.2

netmask 255.255.255.0

network 192.168.7.0

gateway 192.168.7.1

You can configure this setting for ip address that you want.

Once Plugged ,host machine will have the 192.168.7.1 as ip address for other end of the usb connection.

4.On master machine open terminal.
$su
$mkdir /opt/mpi

Open new terminal
$su
$ssh 192.168.7.2

You will jump to beaglebone terminal with root user logged in.

$mkdir /opt/mpi

5.Download openmpi-1.6.5.tar.gz on master machine.

6.On master machine open terminal.
$su
$scp {path to }/openmpi-1.6.5.tar.gz root@192.168.7.2:/root/
This command copies the openmpi tar to beaglebone.
7.On master machine extract the tar file.
$tar -xvf openmpi-1.6.5.tar.gz
$cd openmpi-1.6.5
$./configure --prefix=/opt/mpi –exec-prefix=/opt/mpi
$make install
On beaglebone follow the same steps.
8.Now installation is completed.We have to set the enviornment varibales to reflect the changes.
On master machine open terminal
$su
$vi ~/.bashrc
write following lines inside it
export PATH=$PATH:/opt/mpi/bin
export LD_LIBRARY_PATH=$ LD_LIBRARY_PATH:/opt/mpi/lib
Now
$ssh 192.168.7.2
and follow the above steps.

9.We are done with base work.Now the next few steps makes difference between the normal beowulf cluster and beaglebone-host cluster.
In my case the hostmachine is fedora system.Fedora system required configuration (iptables )for incoming connection from beaglebone to fedora machine.Follow following steps to configure iptables for incoming traffic on fedora (host machine)

$su
$iptables -I INPUT -s 192.168.7.2 -p tcp -j ACCEPT

10.Now lets check whether the compute cluster is configured.In following code every mpi process spawned will print hostname and rank.

/* Hello_c.c*/
#include <stdio.h>
#include "mpi.h"
#include <unistd.h>
int main(int argc, char* argv[])
{
int rank, size;
char* hostname=(char *)malloc(sizeof(char)*10);
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
gethostname(hostname,10);
printf("Hello, world, I am %d of %d my host name is %s\n ", rank, size,hostname);
MPI_Finalize();
return 0;
}

To compile the code execute

$mpicc Hello_c.c


11.Now create a text file,lets say “hostaddress” and add ip address of host and beaglebone,one ip address per line.

12.When you submit the job openmpi runtime enviornment will go through all the interfaces up to locate the machines mentioned in hostaddress.But some times because of this approach it fails to locate the machine.So to avoid it,we have to configure the mpi to use specified interface.To do it just execute
$export HYDRA_IFACE=enp0s29f7u3


,where the enp0s29f7u3 is usb interface name with ip address 192.168.7.1 for hostside.

13.Done with configuration lets submit the job to the cluster.

$mpirun -machinefile hostaddress -np 10 ./a.out

It must work.


Thursday, October 8, 2015

Resilient Distributed Dataset and Related Operations (Transformations and actions) in apache spark(part 1).

These post is about the RDD(Resilient Distributed Dataset) the backbone of apache spark.Apache spark is  memory based distributed computing framework an alternative to haoop for  some scenario.It is based on the immutable abstraction called as RDD.Unlike, similar distributed computing frameworks like hadoop,apache spark is just execution engine.It supports various types of input source like HDFS,Mongodb,cassandra etc.The core of apache spark is written in Scala.But it also supports Java and Python language for writing driver code.

The developers kept it as simple and as lightweight as possible for end user.User just have to focus on the main logic rather than focusing on the complete flow of algorithm.In runtime environment like mapreduce,user have to code considering mapreduce design pattern while writing the application.Even in case of MPI(Message passing interface),programmer takes care of communication between multiple processes and also takes care of sending data to processes.But in apache spark user focuses on driver code and required operation on data,run time engine will take care of parallelizing the computation and scheduling.
Now days apache spark is having number of use cases where we want to handle huge data and computation pattern is mainly iterative and demands user interaction.The most common use cases are the machine learning algorithm like K Means.Here the apache spark stays ahead of hadoop and MPI.
As I said the apache spark is distributed computing framework ,it must have supports for the traditional requirements of distributed computing frameworks like
1)Distributed Computation
2)Fault tolerance

It also supports requirements which are very common in todays big data era.

1)Persistence level.
2)Lazy evaluation.

Now,lets try to discuss them in detail.

1) Distributed Computation:

As the other distributed and parallel computing frameworks supports distributed computing,apache spark also supports the parallelization using RDD and partition.Partition are logical division of RDD and number of partitions are number of independent tasks that we want to execute in parallel.RDD is immutable i.e you can create new RDD but you can not modify it.The operation on RDD is creates new RDD.Its like string in java and tuple in python.

2)Fault tolerance:

As user creates the first RDD,he/she can execute number of operations on it,some of them causes creation of new RDDs and respective partitions.In other words we can say that RDD is just a metadata rather calling it as actual data.It contains information about how to compute it from other RDD or a data source.Thus the complete driver program may be having more than one RDD each having complete details and operation required to compute the RDD.If single node fails respective partition will be lost,but as RDD is having complete information required to compute it we can recompute the partition as and when needed.Thus it provides fault tolerance.

3)Lazy evaluation:
Most of the mining algorithm that works on big data require to shuffle and load huge size data that does not fit in main memory of single machine.Even we successfully club the memory of all machines to form single logical main memory it is not possible to accommodate this intermediate results completely using just main memory of all nodes.Thus apache spark postponed the computation of such intermediate results until required in some action.This is called as lazy evaluation.

4)Persistence level:
 Most of the algorithms like K Means(Centroid list) require the same RDD again and again.So instead of maintaining it in harddisk apache spark allows you to maintain it in memory by using persistence level concept.You can maintain it in hard disk,memory or both as required.

This a logical view of apache spark RDD concept.In next part we will discuss what are transformations and actions possible on RDD.

Saturday, July 18, 2015

Custom RDD in apache spark using python(Using custom classes for creating RDD).

RDD is resilient distributed dataset.It is an abstraction provided by apache spark for distributed operation on data.The post is about how to  write the custom RDD in python(pyspark).

Following code shows simple python class with single data member.

 class Data(object):  
           def __init__(self,data):  
                     self.data=data  
           def increment(self):  
                     self.data += 1  
           def printData(self):  
                     print self.data  
Currently,apache spark does not support the class definition to be in the current script because of pickling issue.That is why we have to maintain the class definition in different module(say mod.py). Following code list gives how to use the above mentioned class to create the custom RDD.

#code.py file.
from pyspark import SparkContext  
from mod import Data as Data  
sc=SparkContext()  
#creating the rdd from list of integers.
rdd=sc.parallelize([1,2,3,4])  
#Function to transform simple list rdd to custom rdd
def func(item):  
           d=Data(item)  
           return d  
#Function to cal increment on each element of rdd1
def func2(item):  
           item.increment()  
           return item  
#actual list rdd to custom rdd conversion 
rdd1=rdd.map(func)  
print rdd1.count()  
#increment data field of each item from rdd1 by calling member function increment 
rdd1.foreach(func2) 
for i in rdd1.collect():  
           i.printData()  
To execute the code just type spark-submit --py-files mod.py code.py.

Friday, July 10, 2015

Installing apache spark standalone mode to cluster.

Hello all,

This post is about how to install apache spark on cluster.Apache Spark supports cluster manager like
1)Standalone cluster manager embedded with spark itself.
2)Apache MESOS
3)Apache YARN

This post elaborates how to install apache spark standalone on cluster.

1)The key thing for any cluster is password less ssh.

I am creating cluster of 2 machines(192.168.5.11 and 192.168.9.159). So I have to configure password less communication between them.Assume that we are having spark user on both the machines.

Follow the steps given below.

 #on machine 192.168.5.11  
 spark@192.168.5.11]#ssh-keygen -t RSA -P ""  
 #copy id_rsa.pub to 192.168.9.159  
 spark@192.168.5.11]#ssh-copy-id  spark@192.168.9.159  
 #on machine 192.168.9.159  
 spark@192.168.9.159]#ssh-keygen -t RSA -P ""  
 #copy id_rsa.pub to 192.168.5.11  
 spark@192.168.9.159]# ssh-copy-id  spark@192.168.5.11  

2)Download  the compiled binary of apache spark , copy and untar  at the same location.In my case it is /opt/spark.

3)Go inside /{$spark-home}/conf  ( in my case it is /opt/spark/conf)
It contains slave.template file. Just execute the following commands
 

 #on machine 192.168.9.159  as I am considering it as master machine.
 spark@192.168.9.159 conf]#cp slaves.template slaves  

Modify this file and add the ip address of worker nodes.
Content of file will look like this.

 # A Spark Worker will be started on each of the machines listed below.  
 192.168.9.159  
 192.168.5.11  
4)Then execute the following commands
 #on machine 192.168.5.11  
 spark@192.168.5.11 conf]#cp spark-env.sh.template spark-env.sh  
 #on machine 192.168.9.159  
 spark@192.168.9.159 conf]#cp spark-env.sh.template spark-env.sh  
5)Modify spark-env.sh file on both the machine and add SPARK_MASTER_IP=192.168.9.159
6)Now go inside the /{$spark-home}/sbin directory on master machine (In my case 192.168.9.159) and execute ./start-all.sh.It will start master and worker on master node (192.168.9.159) and worker on 2nd slave node (192.168.5.11)
You can check it by executing jps command on both the machines.
7)Assume that we want to submit python script to spark cluster.Go inside 
 /{$spark-home}/bin and execute ./spark-submit --master spark://192.168.9.159:7077  path-to-python-script

Thursday, July 9, 2015

Custom Accumulators in Spark using python

Apache Spark is open source framework for big data processing with supports for machine learning,graph processing in memory computation.
This post is for how to right custom accumulators in spark using python (pyspark).

Accumulators are shared variable and and can be used to maintain the counters or sum across the RDD(Resilient Distributed Dataset).

While writing this page I am assuming that visitors are aware of RDD and apache spark.I am using python and pyspark to demonstrate the accumulator.

Assume that we  are having multidimensional vectors  and we want to add up them into a single vector. For Example,
 if vec1={'a':10,'b':30}   and vec2={'a':10,'b':40}, we want vect=vect1+vect2
i.e.vect={'a':20,'b':70}.

To create the custom accumulator,programmer have to subclass AccumulatorParam interface.Programmer has to implement zero and addInPlace method.Accumulator supports those operation which are associative,like + operator.So programmer have to provide the zero vector for this operator.

Here I  am considering addition of vectors in python dictionary form where each vector is having 100 dimensions.


Complete code is given below:



 from pyspark import SparkContext  
 from pyspark import AccumulatorParam  
 import random  
 #creating spark context   
 sc=SparkContext()  
 #custom accumulator class  
 class VectorAccumulatorParam(AccumulatorParam):  
      def zero(self, value):  
           dict1={}  
           for i in range(0,len(value)):  
                     dict1[i]=0  
           return dict1  
      def addInPlace(self, val1, val2):  
           for i in val1.keys():  
                     val1[i] += val2[i]  
           return val1  
 #creating zero vector for addition  
 c={}  
 rand=[]  
 for i in range(0,100):  
           c[i]=0   
 #creating 10 vectors each with dimension 100 and randomly initialized   
 rand=[]  
 for j in range(0,10):  
      dict1={}  
      for i in range(0,100):  
           dict1[i]=random.random()  
     rand.append(dict1)       
 #creating rdd from 10 vectors  
 rdd1=sc.parallelize(rand)  
 #creating accumulator   
 va = sc.accumulator(c, VectorAccumulatorParam())  
 #action to be executed on rdd in order to sumup vectors  
 def sum(x):  
      global va  
      va += x  
 rdd1.foreach(sum)   
 #print the value of accumulator  
 print va.value   

Friday, May 15, 2015

Integration Of Maui with TORQUE.

Maui is open source cluster scheduler for cluster and is written in C.TORQUE is open source resource manager,originally known as PBS.It is purely written in C. It manages both job and node.It provides simple scheduler.But default scheduler is not customizable.Maui is customizable and provides various algorithms and policies (for example Fairshare,backfill etc.)
While integrating maui with torque we have to specify some option at the time of installation of maui.
This options are listed below.
--prefix=< location where you want to install architecture independent file.>
--exec-prefix=<location where you want to install architecture dependent file.>
--with-pbs=<location where pbs lib directory  is located.>
--with-spooldir= <location where you want to maintain config files,logs and stat directory.>

Suppose location for installing architecture dependent as well as independent files is /opt/maui then  and suppose TORQUE is installed at location /opt/torque and I am using maui-3.2.6p20 for installation.Following are the steps for installation.
1)tar -xvf  maui-3.2.6p20.tar.gz
2)cd  maui-3.2.6p20
3)./configure --prefix=/opt/maui --exec-prefix=/opt/maui --with-pbs=/opt/torque press enter.
4)make 
5)make install

And maui is integrated with TORQUE.

Enjoy.....