[Jboss-cvs] JBossAS SVN: r56520 - trunk/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Sep 2 00:14:28 EDT 2006


Author: bstansberry at jboss.com
Date: 2006-09-02 00:14:27 -0400 (Sat, 02 Sep 2006)
New Revision: 56520

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-3540] Streamable state transfer

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-09-02 02:12:19 UTC (rev 56519)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-09-02 04:14:27 UTC (rev 56520)
@@ -23,12 +23,16 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Vector;
 
 import javax.naming.Context;
@@ -39,12 +43,15 @@
 import javax.naming.StringRefAddr;
 import javax.management.MBeanServer;
 
+import org.jgroups.ExtendedMessageListener;
 import org.jgroups.JChannel;
+import org.jgroups.MembershipListener;
 import org.jgroups.MergeView;
 import org.jgroups.View;
 import org.jgroups.Message;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.stack.IpAddress;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
@@ -67,17 +74,32 @@
  * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
  * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
  * @author Scott.Stark at jboss.org
+ * @author brian.stansberry at jboss.com
  * @version $Revision$
  */
 public class HAPartitionImpl
-   extends org.jgroups.blocks.RpcDispatcher
-   implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
+   extends RpcDispatcher
+   implements ExtendedMessageListener, MembershipListener,
       HAPartition, AsynchEventHandler.AsynchEventProcessor
 {
+   private static final byte NULL_VALUE   = 0;
+   private static final byte SERIALIZABLE_VALUE = 1;
+   // TODO add Streamable support
+   // private static final byte STREAMABLE_VALUE = 2;
+   
+   /**
+    * Returned when an RPC call arrives for a service that isn't registered.
+    */
    private static class NoHandlerForRPC implements Serializable
    {
       static final long serialVersionUID = -1263095408483622838L;
    }
+   
+   private static class StateStreamEnd implements Serializable
+   {
+      /** The serialVersionUID */
+      private static final long serialVersionUID = -3705345735451504946L;      
+   }
 
    // Constants -----------------------------------------------------
 
@@ -362,8 +384,8 @@
          log.warn("Failed to stop asynchHandler", e);
       }
 
-      // Stop the DRM and DS services
-      //
+      // Stop the DRM service
+      // TODO remove when DRM is independent
       try
       {
          this.replicantManager.stop();
@@ -373,21 +395,11 @@
          log.error("operation failed", e);
       }
 
-//      try
-//      {
-//         this.dsManager.stop();
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("operation failed", e);
-//      }
-
 //    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
 //    add the destroyPartition() step
       try
       {
-//          channel.close();
-          channel.disconnect();
+         channel.disconnect();
       }
       catch (Exception e)
       {
@@ -409,8 +421,7 @@
 
       log.info("Partition " + partitionName + " closed.");
    }
-
-// NR 200505 : [JBCLUSTER-38] destroy partition close the channel
+   
    public void destroyPartition()  throws Exception
    {
 
@@ -420,145 +431,250 @@
       }
       catch (Exception e)
       {
-         log.error("operation failed", e);
-      }
-
-//      try
-//      {
-//         this.dsManager.destroy();
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("operation failed", e);
-//      }
+         log.error("Destroying DRM failed", e);
+      }      
       
-      
       try
       {
          channel.close();
       }
       catch (Exception e)
       {
-         log.error("operation failed", e);
+         log.error("Closing channel failed", e);
       }
 
       log.info("Partition " + partitionName + " destroyed.");
-  }
+   }
+   
    // org.jgroups.MessageListener implementation ----------------------------------------------
 
-   // MessageListener methods
-   //
    public byte[] getState()
    {
       logHistory ("getState called on partition");
-      boolean debug = log.isDebugEnabled();
       
       log.debug("getState called.");
       try
       {
-         // we now get the sub-state of each HAPartitionStateTransfer subscribers and
-         // build a "macro" state
-         //
-         HashMap state = new HashMap();
-         Iterator keys = stateHandlers.keySet().iterator();
-         while (keys.hasNext())
-         {
-            String key = (String)keys.next();
-            HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
-            if (debug)
-               log.debug("getState for " + key);
-            state.put(key, subscriber.getCurrentState());
-         }
-         return objectToByteBuffer(state);
+         ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+         getStateInternal(baos);
+         return baos.toByteArray();
       }
       catch (Exception ex)
       {
          log.error("getState failed", ex);
       }
-      return null;
+      return null; // This will cause the receiver to get a "false" on the channel.getState() call
    }
    
+   public void getState(OutputStream stream)
+   {
+      logHistory ("getState called on partition");
+      
+      log.debug("getState called.");
+      try
+      {
+         getStateInternal(stream);
+      }
+      catch (Exception ex)
+      {
+         log.error("getState failed", ex);
+      }
+      
+   }
+   
+   private void getStateInternal(OutputStream stream) throws IOException
+   {
+      MarshalledValueOutputStream mvos = null; // don't create until we know we need it
+      
+      for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+      {
+         Map.Entry entry = (Map.Entry)keys.next();
+         HAPartition.HAPartitionStateTransfer subscriber = 
+            (HAPartition.HAPartitionStateTransfer) entry.getValue();
+         log.debug("getState for " + entry.getKey());
+         Object state = subscriber.getCurrentState();
+         if (state != null)
+         {
+            if (mvos == null)
+            {
+               // This is our first write, so need to write the header first
+               stream.write(SERIALIZABLE_VALUE);
+               
+               mvos = new MarshalledValueOutputStream(stream);
+            }
+            
+            mvos.writeObject(entry.getKey());
+            mvos.writeObject(state);
+         }
+      }
+      
+      if (mvos == null)
+      {
+         // We never wrote any state, so write the NULL header
+         stream.write(NULL_VALUE);
+      }
+      else
+      {
+         mvos.writeObject(new StateStreamEnd());
+      }
+      
+   }
+   
    public void setState(byte[] obj)
    {
       logHistory ("setState called on partition");
       try
       {
-         log.debug("setState called");
          if (obj == null)
          {
-            log.debug("state is null");
-            return;
+            log.debug("transferred state is null (may be first member in cluster)");
          }
-         
-         long used_mem_before, used_mem_after;
-         int state_size=obj != null? obj.length : 0;
-         Runtime rt=Runtime.getRuntime();
-         used_mem_before=rt.totalMemory() - rt.freeMemory();
-
-         HashMap state = (HashMap)objectFromByteBuffer(obj);
-         java.util.Iterator keys = state.keySet().iterator();
-         while (keys.hasNext())
+         else
          {
-            String key = (String)keys.next();
-            log.debug("setState for " + key);
-            Object someState = state.get(key);
-            HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
-            if (subscriber != null)
-            {
-               try
-               {
-                  subscriber.setCurrentState((java.io.Serializable)someState);
-               }
-               catch (Exception e)
-               {
-                  // Don't let issues with one subscriber affect others
-                  // unless it is DRM, which is really an internal function
-                  // of the HAPartition
-                  // FIXME remove this once DRM is JBC-based
-                  if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
-                  {
-                     if (e instanceof RuntimeException)
-                        throw (RuntimeException) e;
-                     else
-                        throw new RuntimeException(e);
-                  }
-                  else
-                  {
-                     log.error("Caught exception setting state to " + subscriber, e);
-                  }
-               }
-            }
-            else
-            {
-               log.debug("There is no stateHandler for: " + key);
-            }
+            ByteArrayInputStream bais = new ByteArrayInputStream(obj);
+            setStateInternal(bais);
+            bais.close();
          }
-
-         used_mem_after=rt.totalMemory() - rt.freeMemory();
-         log.debug("received a state of " + state_size + " bytes; expanded memory by " +
-               (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
-               ", used memory after: " + used_mem_after + ")");
          
          isStateSet = true;
       }
       catch (Throwable t)
       {
-         log.error("failed setting state", t);
-         if (t instanceof Exception)
-            setStateException = (Exception) t;
+         recordSetStateFailure(t);
+      }
+      finally
+      {
+         notifyStateTransferCompleted();
+      }
+   }
+   
+   public void setState(InputStream stream)
+   {
+      logHistory ("setState called on partition");
+      try
+      {
+         if (stream == null)
+         {
+            log.debug("transferred state is null (may be first member in cluster)");
+         }
          else
-            setStateException = new Exception(t);
+         {
+            setStateInternal(stream);
+         }
+         
+         isStateSet = true;
       }
+      catch (Throwable t)
+      {
+         recordSetStateFailure(t);
+      }
       finally
       {
-         synchronized (stateLock)
+         notifyStateTransferCompleted();
+      }
+   }
+   
+   private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
+   {
+      byte type = (byte) stream.read();
+         
+      if (type == NULL_VALUE)
+      {
+         log.debug("state is null");
+         return;
+      }
+      
+      long used_mem_before, used_mem_after;
+      Runtime rt=Runtime.getRuntime();
+      used_mem_before=rt.totalMemory() - rt.freeMemory();
+      
+      MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
+      
+      while (true)
+      {
+         Object obj = mvis.readObject(); 
+         if (obj instanceof StateStreamEnd)
+            break;
+         
+         String key = (String) obj;
+         log.debug("setState for " + key);
+         Object someState = mvis.readObject();
+         HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+         if (subscriber != null)
          {
-            // Notify wait that state has been set.
-            stateLock.notifyAll();
+            try
+            {
+               subscriber.setCurrentState((Serializable)someState);
+            }
+            catch (Exception e)
+            {
+               // Don't let issues with one subscriber affect others
+               // unless it is DRM, which is really an internal function
+               // of the HAPartition
+               // FIXME remove this once DRM is JBC-based
+               if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+               {
+                  if (e instanceof RuntimeException)
+                     throw (RuntimeException) e;
+                  else
+                     throw new RuntimeException(e);
+               }
+               else
+               {
+                  log.error("Caught exception setting state to " + subscriber, e);
+               }
+            }
          }
+         else
+         {
+            log.debug("There is no stateHandler for: " + key);
+         }      
       }
+
+      used_mem_after=rt.totalMemory() - rt.freeMemory();
+      log.debug("received state; expanded memory by " +
+            (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
+            ", used memory after: " + used_mem_after + ")");
    }
+
+   private void recordSetStateFailure(Throwable t)
+   {
+      log.error("failed setting state", t);
+      if (t instanceof Exception)
+         setStateException = (Exception) t;
+      else
+         setStateException = new Exception(t);
+   }
+
+   private void notifyStateTransferCompleted()
+   {
+      synchronized (stateLock)
+      {
+         // Notify wait that state has been set.
+         stateLock.notifyAll();
+      }
+   }
    
+   public void getState(String state_id, OutputStream ostream)
+   {
+      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+   }
+
+   public byte[] getState(String state_id)
+   {
+      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+   }
+
+   public void setState(String state_id, byte[] state)
+   {
+      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+   }
+
+   public void setState(String state_id, InputStream istream)
+   {
+      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+   }
+
    public void receive(org.jgroups.Message msg)
    { /* complete */}
    




More information about the jboss-cvs-commits mailing list