[jbosscache-commits] JBoss Cache SVN: r4520 - core/trunk/src/main/java/org/jboss/cache.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Oct 1 05:24:57 EDT 2007


Author: vblagojevic at jboss.com
Date: 2007-10-01 05:24:57 -0400 (Mon, 01 Oct 2007)
New Revision: 4520

Modified:
   core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
Log:
[JBCACHE-1189] - Implement JGroups connect + state transfer

Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-09-28 17:59:43 UTC (rev 4519)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-10-01 09:24:57 UTC (rev 4520)
@@ -58,6 +58,7 @@
 import org.jgroups.ExtendedMessageListener;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
+import org.jgroups.StateTransferException;
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RpcDispatcher;
@@ -132,7 +133,7 @@
    /**
     * True if this CacheImpl is the coordinator.
     */
-   private boolean coordinator = false;
+   private volatile boolean coordinator = false;
 
    /**
     * List of cluster group members.
@@ -748,34 +749,56 @@
             if (log.isDebugEnabled()) log.debug("cache mode is " + configuration.getCacheMode());
             initialiseChannelAndRpcDispatcher();
 
-            try
-            {
-               channel.connect(configuration.getClusterName());
-            }
-            catch (ChannelException e)
-            {
-               throw new CacheException("Unable to connect to JGroups channel", e);
-            }
-
-            if (log.isInfoEnabled())
-            {
-               log.info("CacheImpl local address is " + channel.getLocalAddress());
-            }
+            //connect and transfer state
             if (shouldFetchStateOnStartup())
             {
                try
-               {
-                  fetchStateOnStartup();
+               {                                     
+                  long start = System.currentTimeMillis();
+                  channel.connect(configuration.getClusterName(),null, null, configuration.getStateRetrievalTimeout());
+                  if(!isCoordinator())
+                  {
+                     ml.waitForState();
+                  }
+                   
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start)
+                                  + " milliseconds)");
+                  }
                }
-               catch (Exception e)
+               catch (StateTransferException ste)
                {
                   // make sure we disconnect from the channel before we throw this exception!
                   // JBCACHE-761
                   channel.disconnect();
                   channel.close();
-                  throw new CacheException("Unable to fetch state on startup", e);
+                  throw new CacheException("Unable to fetch state on startup", ste);
                }
+               catch (ChannelException e)
+               {
+                  throw new CacheException("Unable to connect to JGroups channel", e);
+               }  
+               catch (Exception ex){
+                   throw new CacheException("Unable to fetch state on startup", ex);
+               }
             }
+            //otherwise just connect
+            else
+            {
+               try
+               {
+                   channel.connect(configuration.getClusterName());
+               }
+               catch (ChannelException e)
+               {
+                   throw new CacheException("Unable to connect to JGroups channel", e);
+               }
+            }
+            if (log.isInfoEnabled())
+            {
+               log.info("CacheImpl local address is " + channel.getLocalAddress());
+            }  
             if (buddyManager != null)
             {
                buddyManager.init(this);
@@ -795,11 +818,8 @@
       if (cacheLoaderManager != null)
       {
          cacheLoaderManager.preloadCache();
-      }
+      }     
 
-      // Find out if we are coordinator (blocks until view is received)
-      determineCoordinator();
-
       // start any eviction threads.
       if (regionManager.isUsingEvictions())
       {
@@ -1072,16 +1092,6 @@
       }
    }
 
-   private void determineCoordinator()
-   {
-      // Synchronize on members to make the answer atomic for the current view
-      synchronized (members)
-      {
-         Address coord = getCoordinator();
-         coordinator = (coord == null ? false : coord.equals(getLocalAddress()));
-      }
-   }
-
    /**
     * Returns the address of the coordinator or null if there is no
     * coordinator.
@@ -1265,41 +1275,6 @@
       return result;
    }
 
-   protected void fetchStateOnStartup() throws Exception
-   {
-      long start, stop;
-      isStateSet = false;
-      start = System.currentTimeMillis();
-      boolean rc = channel.getState(null, configuration.getStateRetrievalTimeout());
-      if (rc)
-      {
-         ml.waitForState();
-         stop = System.currentTimeMillis();
-         if (log.isDebugEnabled())
-         {
-            log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
-         }
-      }
-      else
-      {
-         // No one provided us with state. We need to find out if that's because
-         // we are the coordinator. But we don't know if the viewAccepted() callback
-         // has been invoked, so call determineCoordinator(), which will block until
-         // viewAccepted() is called at least once
-         determineCoordinator();
-
-         if (isCoordinator())
-         {
-            log.debug("State could not be retrieved (we are the first member in group)");
-         }
-         else
-         {
-            throw new CacheException("Initial state transfer failed: " +
-                    "Channel.getState() returned false");
-         }
-      }
-   }
-
    // -----------  End Marshalling and State Transfer -----------------------
 
    /**




More information about the jbosscache-commits mailing list