Saturday, November 26, 2016

DEMISTIFYING SPARK SERIALIZATION

To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable.
  • A class is never serialized only object of a class is serialized . Object serialization is needed if object needs to be persisted or transmitted over the network .    



Class Component   Serialization
   Instance variable   yes
  Static instance variable    no
  Methods    no
 Static methods    no
 Static inner class    no
 local variables    no





Lets take a sample Spark code and go through various scenarios

public class SparkSample {

      public int instanceVariable                =10 ;
      public static int staticInstanceVariable   =20 ;

      public int run(){
          
         int localVariable                       =30;

         // create Spark conf
         final SparkConf sparkConf = new SparkConf().setAppName(config.get(JOB_NAME).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

         // create spark context 
         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
        
        // read DATA 
  JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); 


        // Anonymous class used for lambda implementation
  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
    public Iterator<String> call(String s) {
                // How will the listed varibles be accessed in RDD across driver and Executors 
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
    return Arrays.asList(SPACE.split(s)).iterator();
     });

        // SAVE OUTPUT
        words.saveAsTextFile(OUTPUT_PATH));

      }

       // Inner Static class for the funactional interface which can replace the lambda implementation above 
       public static class MapClass extends FlatMapFunction<String, String>() {
        @Override
    public Iterator<String> call(String s) {
                System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable);
    return Arrays.asList(SPACE.split(s)).iterator();
     }); 

  public static void main(String[] args) throws Exception {
   JavaWordCount count = new JavaWordCount();
   count.run();
        }

}






Accessibility and Serializability of  instance variable from Outer Class  inside inner class objects 



Function               
Instance Variable (Outer class)  
Static Instance Variable (Outer class)
  Local Variable (Outer class)
Anonymous class
Accessible And Serialized
Accessible yet not Serialized
   Accessible And Serialized 
Inner Static class
Not Accessible 
Accessible yet not Serialized
   Not Accessible 



Rule of thumb while understanding Spark job is :
  
1. All the lambda functions written inside the RDD are instantiated on the driver and the objects are serialized and sent to the executors.

 2. If any outer class instance variables are accessed within the inner class,
 compiler apply different logic to access them , hence outer class gets serialized or not depends what do you access.

 3. In terms of Java , the whole debate is about Outer class vs Inner class and how does accessing outer class references and variables leads to serialization issues .



Various Scenarios:
                                                                                                             




Outer class variables accessed within  Anonymous inner class.



Instance Variable (Outer class)
 Static Instance Variable (Outer class)Local Variable (Outer class) 
 
Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object .
The outer class object is used to access the instance variable


Anonymous-class(){
     final Outer-class reference;
     Anonymous-class( Outer-class outer-reference){
    reference = outer-reference;
    }
}

The outer class is  serialized and sent along with the
serialized object of the inner anonymous class
 As static variables are not serialized , outer class
object is still inserted into the Anonymous class constructor .
The value of the static variable is taken from the class state
present on that executor .
 Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object  AND local variable reference
The outer class object is used to access the instance variable


Anonymous-class(){
     final Outer-class reference;
    final Local-variable localRefrence ;
     Anonymous-class( Outer-class outer-reference, Local-variable localRefrence){
    reference = outer-reference;
    this.localRefrence = localRefrence;
    }
}
The outer class is  serialized , and the local variable object is also
serialized and sent along with the serialized object of the inner anonymous class




                                                                                                
 Outer class variables accessed with inner Static inner  class.



Instance Variable (Outer class)
 Static Instance Variable   (Outer class)Local Variable (Outer class) 
 
cant be accessed

 As static variables are not serialized hence no outer class object is serialized.

The value of the static variable is taken from the class state
present on that executor .
Outer class is not serialized and send along with the serialized Static inner class
cant be accessed




Points to Think Through .
  1. Java Serialization rules are followed to select which class object
    needs to be serialized .
  2. Use javap -p -c "abc.class" to unwrap the byte code and see the compiler generated code
  3. Depending on what you are trying to access within the inner class of the outer class, compiler generates different byte code.
  4. You don't need to make classes implement Serialization which are only accessed on driver .
  5. Any Anonymous/Static class(all lambada function are anonymous class) used within RDD will be instantiated on the driver .
  6. Any class/variable used inside RDD will be instantiated on driver and sent to the executors .
  7. Any instance variable declared transient will not be serialized on driver.
  8. By default Anonymous classes will force you to make the outer class serializable.
  9. Any local variable/object need not have to be serializable .
  10. Only if local variable is used inside the Anonymous class needs to be serialized
  11. One can create singleton inside the call() method of pair,mapToPair function , thus making sure its never initialized on driver
  12. static variables are never serialized hence are never sent from driver to executors.
    1. if u need any service to be executed only on the executor , make them static fields inside the lambda function , or make them transient and singelton and check for null condition to instantiate them
    2. 14. when to use spark broadcast :  http://g-chi.github.io/2015/10/21/Spark-why-use-broadcast-variables/