[jboss-cvs] JBossAS SVN: r104449 - trunk/cluster/src/main/java/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 4 13:34:17 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-05-04 13:34:16 -0400 (Tue, 04 May 2010)
New Revision: 104449

Modified:
   trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
Log:
[JBAS-3594] First crack at implementation

Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java	2010-05-04 17:03:12 UTC (rev 104448)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java	2010-05-04 17:34:16 UTC (rev 104449)
@@ -28,20 +28,26 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
+import java.security.AccessController;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
@@ -80,6 +86,8 @@
 import org.jboss.managed.api.annotation.ViewUse;
 import org.jboss.naming.NonSerializableFactory;
 import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.util.loading.ContextClassLoaderSwitcher;
+import org.jboss.util.loading.ContextClassLoaderSwitcher.SwitchContext;
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.Event;
@@ -284,9 +292,14 @@
    private HAPartitionDependencyCreator  haPartitionDependencyCreator;
    private KernelControllerContext kernelControllerContext;
    
+   private final Map<String, StateTransferTask> stateTransferTasks = new Hashtable<String, StateTransferTask>();
+   
+   @SuppressWarnings("unchecked")
+   private final ContextClassLoaderSwitcher classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
+
    // Static --------------------------------------------------------
 
-   // Constructors --------------------------------------------------
+    // Constructors --------------------------------------------------
    
    public ClusterPartition()
    {
@@ -1473,7 +1486,22 @@
    
    public Future<Serializable> getServiceState(String serviceName, ClassLoader classloader)
    {
-      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+      RunnableFuture<Serializable> future = null;
+      StateTransferTask task = stateTransferTasks.get(serviceName);
+      if (task == null)
+      {
+         task = new StateTransferTask(serviceName, classloader);
+         stateTransferTasks.put(serviceName, task);
+         future = new FutureTask<Serializable>(task);
+         Executor e = threadPool == null ? Executors.newSingleThreadExecutor() : threadPool;
+         e.execute(future);
+      }
+      else
+      {
+         // Unlikely scenario
+         future = new FutureTask<Serializable>(task);
+      }
+      return future;
    }
 
    public Future<Serializable> getServiceState(String serviceName)
@@ -2159,12 +2187,81 @@
       
       public void getState(String state_id, OutputStream ostream)
       {
-         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+         // FIXME refactor to share logic
+         ClusterPartition.this.log.debug("getState called for service " + state_id);
+         
+         StateTransferProvider provider = stateProviders.get(state_id);
+         if (provider != null)
+         {
+            MarshalledValueOutputStream mvos = null;
+            // FIXME add a streaming api to StateTransferProvider
+            Object state = provider.getCurrentState();
+            try
+            {
+               mvos = new MarshalledValueOutputStream(ostream);
+               mvos.writeObject(state);
+            }
+            catch (Exception ex)
+            {
+               ClusterPartition.this.log.error("getState failed for service " + state_id, ex);
+            }
+            finally
+            {
+               if (mvos != null)
+               {
+                  try
+                  {
+                     mvos.flush();
+                     mvos.close();
+                  }
+                  catch (IOException ignored)
+                  {
+                     log.debug("Caught exception closing stream used for marshalling state", ignored);
+                  }
+               }
+            }
+         }
       }
 
       public byte[] getState(String state_id)
       {
-         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+         ClusterPartition.this.log.debug("getState called for service " + state_id);
+         
+         StateTransferProvider provider = stateProviders.get(state_id);
+         if (provider != null)
+         {
+            MarshalledValueOutputStream mvos = null;
+            Object state = provider.getCurrentState();
+            try
+            {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+               mvos = new MarshalledValueOutputStream(baos);
+               mvos.writeObject(state);
+               mvos.flush();
+               mvos.close();
+               return baos.toByteArray();
+            }
+            catch (Exception ex)
+            {
+               ClusterPartition.this.log.error("getState failed for service " + state_id, ex);
+            }
+            finally
+            {
+               if (mvos != null)
+               {
+                  try
+                  {
+                     mvos.close();
+                  }
+                  catch (IOException ignored)
+                  {
+                     log.debug("Caught exception closing stream used for marshalling state", ignored);
+                  }
+               }
+            }
+         }
+
+         return null; // This will cause the receiver to get a "false" on the channel.getState() call
       }
       
       public void setState(InputStream stream)
@@ -2214,12 +2311,30 @@
 
       public void setState(String state_id, byte[] state)
       {
-         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+         StateTransferTask task = ClusterPartition.this.stateTransferTasks.get(state_id);
+         if (task == null)
+         {
+            ClusterPartition.this.log.warn("No " + StateTransferTask.class.getSimpleName() + 
+                  " registered to receive state for service " + state_id);
+         }
+         else
+         {
+            task.setState(state);
+         }
       }
 
       public void setState(String state_id, InputStream istream)
       {
-         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+         StateTransferTask task = ClusterPartition.this.stateTransferTasks.get(state_id);
+         if (task == null)
+         {
+            ClusterPartition.this.log.warn("No " + StateTransferTask.class.getSimpleName() + 
+                  " registered to receive state for service " + state_id);
+         }
+         else
+         {
+            task.setState(istream);
+         }
       }
 
       public void receive(org.jgroups.Message msg)
@@ -2619,203 +2734,203 @@
       }
    }
    
-//   private class StateTransferTask implements Callable<Serializable>
-//   {
-//      private final String serviceName;
-//      private final WeakReference<ClassLoader> classloader;
-//      private Serializable result;
-//      private boolean isStateSet;
-//      private Exception setStateException;
-//      
-//      StateTransferTask(String serviceName, ClassLoader cl)
-//      {
-//         this.serviceName = serviceName;
-//         if (cl != null)
-//         {
-//            classloader = null;
-//         }
-//         else
-//         {
-//            classloader = new WeakReference<ClassLoader>(cl);
-//         }
-//      }
-//
-//      public Serializable call() throws Exception
-//      {
-//         boolean intr = false;
-//         try
-//         {
-//            long start, stop;
-//            this.isStateSet = false;
-//            start = System.currentTimeMillis();
-//            boolean rc = ClusterPartition.this.channel.getState(null, serviceName, ClusterPartition.this.getStateTransferTimeout());
-//            if (rc)
-//            {
-//               synchronized (this)
-//               {
-//                  while (!this.isStateSet)
-//                  {
-//                     if (this.setStateException != null)
-//                     {
-//                        throw this.setStateException;
-//                     }
-//
-//                     try
-//                     {
-//                        wait();
-//                     }
-//                     catch (InterruptedException iex)
-//                     {
-//                        intr = true;
-//                     }
-//                  }
-//               }
-//               stop = System.currentTimeMillis();
-//               ClusterPartition.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
-//            }
-//            else
-//            {
-//               // No one provided us with serviceState.
-//               // We need to find out if we are the coordinator, so we must
-//               // block until viewAccepted() is called at least once
-//
-//               synchronized (ClusterPartition.this.members)
-//               {
-//                  while (ClusterPartition.this.members.size() == 0)
-//                  {
-//                     ClusterPartition.this.log.debug("waiting on viewAccepted()");
-//                     try
-//                     {
-//                        ClusterPartition.this.members.wait();
-//                     }
-//                     catch (InterruptedException iex)
-//                     {
-//                        intr = true;
-//                     }
-//                  }
-//               }
-//
-//               if (ClusterPartition.this.isCurrentNodeCoordinator())
-//               {
-//                  ClusterPartition.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
-//               }
-//               else
-//               {
-//                  throw new IllegalStateException("Initial serviceState transfer failed: " +
-//                     "Channel.getState() returned false");
-//               }
-//            }
-//         }
-//         finally
-//         {
-//            if (intr) Thread.currentThread().interrupt();
-//         }
-//         
-//         return result;
-//      }     
-//      
-//      void setState(byte[] state)
-//      {
-//         try
-//         {
-//            if (state == null)
-//            {
-//               ClusterPartition.this.log.debug("transferred state for service " + 
-//                     serviceName + " is null (may be first member in cluster)");
-//            }
-//            else
-//            {
-//               ByteArrayInputStream bais = new ByteArrayInputStream(state);
-//               setStateInternal(bais);
-//               bais.close();
-//            }
-//            
-//            this.isStateSet = true;
-//         }
-//         catch (Throwable t)
-//         {
-//            recordSetStateFailure(t);
-//         }
-//         finally
-//         {
-//            // Notify waiting thread that serviceState has been set.
-//            synchronized(this)
-//            {
-//               notifyAll();
-//            }
-//         }
-//      }     
-//      
-//      void setState(InputStream state)
-//      {
-//         try
-//         {
-//            if (state == null)
-//            {
-//               ClusterPartition.this.log.debug("transferred state for service " + 
-//                     serviceName + " is null (may be first member in cluster)");
-//            }
-//            else
-//            {
-//               setStateInternal(state);
-//            }
-//            
-//            this.isStateSet = true;
-//         }
-//         catch (Throwable t)
-//         {
-//            recordSetStateFailure(t);
-//         }
-//         finally
-//         {
-//            // Notify waiting thread that serviceState has been set.
-//            synchronized(this)
-//            {
-//               notifyAll();
-//            }
-//         }
-//         
-//      }
-//      
-//      private void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
-//      {
-//         ClassLoader cl = getStateTransferClassLoader();
-//         SwitchContext switchContext = ClusterPartition.this.classLoaderSwitcher.getSwitchContext(cl);
-//         try
-//         {
-//            MarshalledValueInputStream mvis = new MarshalledValueInputStream(is);
-//            this.result = (Serializable) mvis.readObject();
-//         }
-//         finally
-//         {
-//            switchContext.reset();
-//         }
-//      }
-//
-//      private void recordSetStateFailure(Throwable t)
-//      {
-//         ClusterPartition.this.log.error("failed setting serviceState for service " + serviceName, t);
-//         if (t instanceof Exception)
-//         {
-//            this.setStateException = (Exception) t;
-//         }
-//         else
-//         {
-//            this.setStateException = new Exception(t);
-//         }
-//      }
-//      
-//      private ClassLoader getStateTransferClassLoader()
-//      {
-//         ClassLoader cl = classloader == null ? null : classloader.get();
-//         if (cl == null)
-//         {
-//            cl = this.getClass().getClassLoader();
-//         }
-//         return cl;
-//      }
-//      
-//   }
+   private class StateTransferTask implements Callable<Serializable>
+   {
+      private final String serviceName;
+      private final WeakReference<ClassLoader> classloader;
+      private Serializable result;
+      private boolean isStateSet;
+      private Exception setStateException;
+      
+      StateTransferTask(String serviceName, ClassLoader cl)
+      {
+         this.serviceName = serviceName;
+         if (cl != null)
+         {
+            classloader = null;
+         }
+         else
+         {
+            classloader = new WeakReference<ClassLoader>(cl);
+         }
+      }
+
+      public Serializable call() throws Exception
+      {
+         boolean intr = false;
+         try
+         {
+            long start, stop;
+            this.isStateSet = false;
+            start = System.currentTimeMillis();
+            boolean rc = ClusterPartition.this.channel.getState(null, serviceName, ClusterPartition.this.getStateTransferTimeout());
+            if (rc)
+            {
+               synchronized (this)
+               {
+                  while (!this.isStateSet)
+                  {
+                     if (this.setStateException != null)
+                     {
+                        throw this.setStateException;
+                     }
+
+                     try
+                     {
+                        wait();
+                     }
+                     catch (InterruptedException iex)
+                     {
+                        intr = true;
+                     }
+                  }
+               }
+               stop = System.currentTimeMillis();
+               ClusterPartition.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+            }
+            else
+            {
+               // No one provided us with serviceState.
+               // We need to find out if we are the coordinator, so we must
+               // block until viewAccepted() is called at least once
+
+               synchronized (ClusterPartition.this.members)
+               {
+                  while (ClusterPartition.this.members.size() == 0)
+                  {
+                     ClusterPartition.this.log.debug("waiting on viewAccepted()");
+                     try
+                     {
+                        ClusterPartition.this.members.wait();
+                     }
+                     catch (InterruptedException iex)
+                     {
+                        intr = true;
+                     }
+                  }
+               }
+
+               if (ClusterPartition.this.isCurrentNodeCoordinator())
+               {
+                  ClusterPartition.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
+               }
+               else
+               {
+                  throw new IllegalStateException("Initial serviceState transfer failed: " +
+                     "Channel.getState() returned false");
+               }
+            }
+         }
+         finally
+         {
+            if (intr) Thread.currentThread().interrupt();
+         }
+         
+         return result;
+      }     
+      
+      void setState(byte[] state)
+      {
+         try
+         {
+            if (state == null)
+            {
+               ClusterPartition.this.log.debug("transferred state for service " + 
+                     serviceName + " is null (may be first member in cluster)");
+            }
+            else
+            {
+               ByteArrayInputStream bais = new ByteArrayInputStream(state);
+               setStateInternal(bais);
+               bais.close();
+            }
+            
+            this.isStateSet = true;
+         }
+         catch (Throwable t)
+         {
+            recordSetStateFailure(t);
+         }
+         finally
+         {
+            // Notify waiting thread that serviceState has been set.
+            synchronized(this)
+            {
+               notifyAll();
+            }
+         }
+      }     
+      
+      void setState(InputStream state)
+      {
+         try
+         {
+            if (state == null)
+            {
+               ClusterPartition.this.log.debug("transferred state for service " + 
+                     serviceName + " is null (may be first member in cluster)");
+            }
+            else
+            {
+               setStateInternal(state);
+            }
+            
+            this.isStateSet = true;
+         }
+         catch (Throwable t)
+         {
+            recordSetStateFailure(t);
+         }
+         finally
+         {
+            // Notify waiting thread that serviceState has been set.
+            synchronized(this)
+            {
+               notifyAll();
+            }
+         }
+         
+      }
+      
+      private void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
+      {
+         ClassLoader cl = getStateTransferClassLoader();
+         SwitchContext switchContext = ClusterPartition.this.classLoaderSwitcher.getSwitchContext(cl);
+         try
+         {
+            MarshalledValueInputStream mvis = new MarshalledValueInputStream(is);
+            this.result = (Serializable) mvis.readObject();
+         }
+         finally
+         {
+            switchContext.reset();
+         }
+      }
+
+      private void recordSetStateFailure(Throwable t)
+      {
+         ClusterPartition.this.log.error("failed setting serviceState for service " + serviceName, t);
+         if (t instanceof Exception)
+         {
+            this.setStateException = (Exception) t;
+         }
+         else
+         {
+            this.setStateException = new Exception(t);
+         }
+      }
+      
+      private ClassLoader getStateTransferClassLoader()
+      {
+         ClassLoader cl = classloader == null ? null : classloader.get();
+         if (cl == null)
+         {
+            cl = this.getClass().getClassLoader();
+         }
+         return cl;
+      }
+      
+   }
    
    @SuppressWarnings("unchecked")
    private static Vector<Address> cloneMembers(View view)




More information about the jboss-cvs-commits mailing list