[jboss-cvs] JBossAS SVN: r105665 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 3 12:21:57 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-06-03 12:21:56 -0400 (Thu, 03 Jun 2010)
New Revision: 105665

Added:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java
Modified:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
Log:
[JBCLUSTER-276] Fix state transfer bugs found in integration testing
Improve how the Cache-created Channel is integrated

Added: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java	                        (rev 0)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java	2010-06-03 16:21:56 UTC (rev 105665)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.core.framework.server;
+
+import org.jgroups.Channel;
+
+/**
+ * An object that provides a reference to a JGroups Channel.
+ * 
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public interface ChannelSource
+{
+   Channel getChannel();
+}

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-06-03 16:21:31 UTC (rev 105664)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-06-03 16:21:56 UTC (rev 105665)
@@ -150,9 +150,11 @@
    // Attributes ----------------------------------------------------
 
    @SuppressWarnings("deprecation")
-   private   org.jgroups.ChannelFactory channelFactory;
-   private   String stackName;
-   private   String groupName;
+   private org.jgroups.ChannelFactory channelFactory;
+   private ChannelSource channelSource; 
+   private boolean channelInjected = true;
+   private String stackName;
+   private String groupName;
 
    private boolean channelSelfConnected;
    
@@ -785,27 +787,28 @@
    {
       RunnableFuture<SerializableStateTransferResult> future = null;
       StateTransferTask<?, ?> task = stateTransferTasks.get(serviceName);
-      if (task == null)
+      if (task == null || (task.result != null && !task.result.stateReceived()))
       {
          SerializableStateTransferTask newTask = new SerializableStateTransferTask(serviceName, classloader);
          stateTransferTasks.put(serviceName, newTask);
          future = new FutureTask<SerializableStateTransferResult>(newTask);
-         Executor e = getThreadPool();
-         if (e == null)
-         {
-            e = Executors.newSingleThreadExecutor();
-         }
-         e.execute(future);
       }
       else if (task instanceof SerializableStateTransferTask)
       {
          // Unlikely scenario
+         log.warn("Received concurrent requests to get service state for " + serviceName);
          future = new FutureTask<SerializableStateTransferResult>((SerializableStateTransferTask) task);
       }
       else
       {
          throw new IllegalStateException("State transfer task for " + serviceName + " that will return an input stream is already pending");
       }
+      Executor e = getThreadPool();
+      if (e == null)
+      {
+         e = Executors.newSingleThreadExecutor();
+      }
+      e.execute(future);
       return future;
    }
 
@@ -818,27 +821,28 @@
    {
       RunnableFuture<StreamStateTransferResult> future = null;
       StateTransferTask<?, ?> task = stateTransferTasks.get(serviceName);
-      if (task == null)
+      if (task == null || (task.result != null && !task.result.stateReceived()))
       {
          StreamStateTransferTask newTask = new StreamStateTransferTask(serviceName);
          stateTransferTasks.put(serviceName, newTask);
          future = new FutureTask<StreamStateTransferResult>(newTask);
-         Executor e = getThreadPool();
-         if (e == null)
-         {
-            e = Executors.newSingleThreadExecutor();
-         }
-         e.execute(future);
       }
       else if (task instanceof StreamStateTransferTask)
       {
          // Unlikely scenario
+         log.warn("Received concurrent requests to get service state for " + serviceName);
          future = new FutureTask<StreamStateTransferResult>((StreamStateTransferTask) task);
       }
       else
       {
          throw new IllegalStateException("State transfer task for " + serviceName + " that will return an deserialized object is already pending");
       }
+      Executor e = getThreadPool();
+      if (e == null)
+      {
+         e = Executors.newSingleThreadExecutor();
+      }
+      e.execute(future);
       return future;
    }
 
@@ -916,6 +920,16 @@
    {
       return Version.description + "( " + Version.cvs + ")";
    }
+   
+   public ChannelSource getChannelSource()
+   {
+      return this.channelSource;
+   }
+   
+   public void setChannelSource(ChannelSource source)
+   {
+      this.channelSource = source;
+   }
 
    @SuppressWarnings("deprecation")
    public org.jgroups.ChannelFactory getChannelFactory()
@@ -1021,10 +1035,8 @@
       state = STOPPING;
       try
       {
-         this.log.info("Stopping partition " + this.getGroupName());
          stopService();
          state = STOPPED;
-         this.log.info("Partition " + this.getGroupName() + " stopped.");
       }
       catch (InterruptedException e)
       {
@@ -1098,6 +1110,14 @@
       
       this.stateIdPrefix = getClass().getName() + "." + this.scopeId + ".";
       
+      if (this.channel == null)
+      {
+         this.channelInjected = false;
+         if (this.channelSource != null)
+         {
+            this.channel = this.channelSource.getChannel();
+         }
+      }
       
       if (this.channel == null || !this.channel.isOpen())
       {
@@ -1106,6 +1126,7 @@
    
          this.channel = this.createChannel();               
       }
+      
       // Subscribe to events generated by the channel
       MembershipListener meml = new MembershipListenerImpl();
       MessageListener msgl = this.stateIdPrefix == null ? null : new MessageListenerImpl();
@@ -1158,29 +1179,28 @@
          {
             this.channelSelfConnected = false;
             this.channel.disconnect();
+            this.channel.close();
          }
       }
       catch (Exception e)
       {
          this.log.error("channel disconnection failed", e);
       }
-   }
-
-   protected void destroyService()
-   {
-      try
+      finally
       {
-         if (this.channelSelfConnected && this.channel != null && this.channel.isOpen())
+         if (!this.channelInjected)
          {
-            this.channel.close();
+            // Recreate the channel next time
+            this.channel = null;
          }
       }
-      catch (Exception e)
-      {
-         this.log.error("Closing channel failed", e);
-      }
    }
 
+   protected void destroyService()
+   {
+      // no-op
+   }
+
    @SuppressWarnings("deprecation")
    protected Channel createChannel()
    {
@@ -2361,6 +2381,8 @@
       V state;
       private boolean isStateSet;
       private Exception setStateException;
+      private T result;
+      private final Object callMutex = new Object();
       
       StateTransferTask(String serviceName)
       {
@@ -2369,82 +2391,91 @@
 
       public T call() throws Exception
       {
-         boolean intr = false;
-         boolean rc = false;
-         try
+         synchronized (callMutex)
          {
-            long start, stop;
-            this.isStateSet = false;
-            start = System.currentTimeMillis();
-            String state_id = CoreGroupCommunicationService.this.stateIdPrefix + serviceName;
-            rc = CoreGroupCommunicationService.this.getChannel().getState(null, state_id, CoreGroupCommunicationService.this.getStateTransferTimeout());
-            if (rc)
+            if (result != null)
             {
-               synchronized (this)
+               return result;
+            }
+            
+            boolean intr = false;
+            boolean rc = false;
+            try
+            {
+               long start, stop;
+               this.isStateSet = false;
+               start = System.currentTimeMillis();
+               String state_id = CoreGroupCommunicationService.this.stateIdPrefix + serviceName;
+               rc = CoreGroupCommunicationService.this.getChannel().getState(null, state_id, CoreGroupCommunicationService.this.getStateTransferTimeout());
+               if (rc)
                {
-                  while (!this.isStateSet)
+                  synchronized (this)
                   {
-                     if (this.setStateException != null)
+                     while (!this.isStateSet)
                      {
-                        throw this.setStateException;
+                        if (this.setStateException != null)
+                        {
+                           throw this.setStateException;
+                        }
+   
+                        try
+                        {
+                           wait();
+                        }
+                        catch (InterruptedException iex)
+                        {
+                           intr = true;
+                        }
                      }
-
-                     try
-                     {
-                        wait();
-                     }
-                     catch (InterruptedException iex)
-                     {
-                        intr = true;
-                     }
                   }
+                  stop = System.currentTimeMillis();
+                  CoreGroupCommunicationService.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
                }
-               stop = System.currentTimeMillis();
-               CoreGroupCommunicationService.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
-            }
-            else
-            {
-               // No one provided us with serviceState.
-               // We need to find out if we are the coordinator, so we must
-               // block until viewAccepted() is called at least once
-               
-               synchronized (CoreGroupCommunicationService.this.channelLock)
+               else
                {
-                  while (CoreGroupCommunicationService.this.getCurrentView().size() == 0)
+                  // No one provided us with serviceState.
+                  // We need to find out if we are the coordinator, so we must
+                  // block until viewAccepted() is called at least once
+                  
+                  synchronized (CoreGroupCommunicationService.this.channelLock)
                   {
-                     CoreGroupCommunicationService.this.log.debug("waiting on viewAccepted()");
-                     try
+                     while (CoreGroupCommunicationService.this.getCurrentView().size() == 0)
                      {
-                        CoreGroupCommunicationService.this.channelLock.wait();
+                        CoreGroupCommunicationService.this.log.debug("waiting on viewAccepted()");
+                        try
+                        {
+                           CoreGroupCommunicationService.this.channelLock.wait();
+                        }
+                        catch (InterruptedException iex)
+                        {
+                           intr = true;
+                        }
                      }
-                     catch (InterruptedException iex)
-                     {
-                        intr = true;
-                     }
                   }
+   
+                  if (CoreGroupCommunicationService.this.isCurrentNodeCoordinator())
+                  {
+                     CoreGroupCommunicationService.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
+                  }
+                  else
+                  {
+                     throw new IllegalStateException("Initial serviceState transfer failed: " +
+                        "Channel.getState() returned false");
+                  }
                }
-
-               if (CoreGroupCommunicationService.this.isCurrentNodeCoordinator())
-               {
-                  CoreGroupCommunicationService.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
-               }
-               else
-               {
-                  throw new IllegalStateException("Initial serviceState transfer failed: " +
-                     "Channel.getState() returned false");
-               }
+               
+               result = createStateTransferResult(rc, state, null);
             }
+            catch (Exception e)
+            {
+               result = createStateTransferResult(rc, null, e);
+            }
+            finally
+            {
+               if (intr) Thread.currentThread().interrupt();
+            }
+            return result;
          }
-         catch (Exception e)
-         {
-            return createStateTransferResult(rc, null, e);
-         }
-         finally
-         {
-            if (intr) Thread.currentThread().interrupt();
-         }
-         
-         return createStateTransferResult(rc, state, null);
       }     
       
       protected abstract T createStateTransferResult(boolean gotState, V state, Exception exception);

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java	2010-06-03 16:21:31 UTC (rev 105664)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java	2010-06-03 16:21:56 UTC (rev 105665)
@@ -35,7 +35,7 @@
 import org.jboss.ha.framework.interfaces.HAPartition;
 import org.jboss.ha.framework.interfaces.SerializableStateTransferResult;
 import org.jboss.ha.framework.interfaces.StateTransferProvider;
-//import org.jboss.ha.framework.server.spi.ManagedDistributedState;
+import org.jboss.ha.framework.server.spi.ManagedDistributedState;
 
 /**
  * Extends {@link CoreGroupCommunicationService} to add implemenation of the
@@ -172,6 +172,17 @@
       this.log.info("Initializing partition " + this.getPartitionName());
       this.logHistory ("Initializing partition " + this.getPartitionName());
       
+      if (this.distributedState instanceof ManagedDistributedState)
+      {
+         ((ManagedDistributedState) this.distributedState).createService();
+      }
+      
+      if (getChannelSource() == null && distributedState instanceof ChannelSource)
+      {
+         log.debug("Using " + distributedState + " as a " + ChannelSource.class.getSimpleName());
+         setChannelSource((ChannelSource) distributedState);
+      }
+      
       super.createService();
       if (this.replicantManager == null)
       {
@@ -182,11 +193,6 @@
       
       this.replicantManager.createService();
       
-//      if (this.distributedState instanceof ManagedDistributedState)
-//      {
-//         ((ManagedDistributedState) this.distributedState).createService();
-//      }
-      
       this.log.debug("done initializing partition "  + this.getPartitionName());
    }
    
@@ -195,16 +201,18 @@
    {
       this.logHistory ("Starting partition "  + this.getPartitionName());
       
+      // Start DS first, so it can start its cache which will want to
+      // do the channel connection stuff itself before we try
+      if (this.distributedState instanceof ManagedDistributedState)
+      {
+         ((ManagedDistributedState) this.distributedState).startService();
+      }      
+      
       super.startService();
       
       this.fetchInitialState();
       
       this.replicantManager.startService();
-      
-//      if (this.distributedState instanceof ManagedDistributedState)
-//      {
-//         ((ManagedDistributedState) this.distributedState).startService();
-//      }      
    }
 
    @Override
@@ -212,15 +220,15 @@
    {      
       this.logHistory ("Stopping partition");
       this.log.info("Stopping partition " + this.getPartitionName());
-      
-//      if (this.distributedState instanceof ManagedDistributedState)
-//      {
-//         ((ManagedDistributedState) this.distributedState).stopService();
-//      }
 
       this.replicantManager.stopService();      
 
       super.stopService();
+      
+      if (this.distributedState instanceof ManagedDistributedState)
+      {
+         ((ManagedDistributedState) this.distributedState).stopService();
+      }
 
       this.log.info("Partition " + this.getPartitionName() + " stopped.");
    }
@@ -230,30 +238,40 @@
    {
       this.log.debug("Destroying HAPartition: " + this.getPartitionName());
       
-      @SuppressWarnings("deprecation")
-      String svc = org.jboss.ha.framework.interfaces.DistributedState.class.getSimpleName();
       try
-      {         
-//         if (this.distributedState instanceof ManagedDistributedState)
-//         {
-//            ((ManagedDistributedState) this.distributedState).destroyService();
-//         }
-         svc = DistributedReplicantManager.class.getSimpleName();
+      { 
          this.replicantManager.destroyService();
 //       unregisterDRM();
       }
       catch (InterruptedException e)
       {
          Thread.currentThread().interrupt();
-         this.log.error("Destroying " + svc + " failed", e);
+         this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
       }
       catch (Exception e)
       {
-         this.log.error("Destroying " + svc + " failed", e);
+         this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
       }
       
       super.destroyService();
       
+      try
+      {
+         if (this.distributedState instanceof ManagedDistributedState)
+         {
+            ((ManagedDistributedState) this.distributedState).destroyService();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
+      }
+      catch (Exception e)
+      {
+         this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
+      }
+      
       this.log.info("Partition " + this.getPartitionName() + " destroyed.");
    }
    




More information about the jboss-cvs-commits mailing list