Author: vblagojevic(a)jboss.com
Date: 2007-10-01 05:24:57 -0400 (Mon, 01 Oct 2007)
New Revision: 4520
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
Log:
[JBCACHE-1189] - Implement JGroups connect + state transfer
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-09-28 17:59:43 UTC (rev
4519)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-10-01 09:24:57 UTC (rev
4520)
@@ -58,6 +58,7 @@
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
import org.jgroups.Message;
+import org.jgroups.StateTransferException;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
@@ -132,7 +133,7 @@
/**
* True if this CacheImpl is the coordinator.
*/
- private boolean coordinator = false;
+ private volatile boolean coordinator = false;
/**
* List of cluster group members.
@@ -748,34 +749,56 @@
if (log.isDebugEnabled()) log.debug("cache mode is " +
configuration.getCacheMode());
initialiseChannelAndRpcDispatcher();
- try
- {
- channel.connect(configuration.getClusterName());
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups channel",
e);
- }
-
- if (log.isInfoEnabled())
- {
- log.info("CacheImpl local address is " +
channel.getLocalAddress());
- }
+ //connect and transfer state
if (shouldFetchStateOnStartup())
{
try
- {
- fetchStateOnStartup();
+ {
+ long start = System.currentTimeMillis();
+ channel.connect(configuration.getClusterName(),null, null,
configuration.getStateRetrievalTimeout());
+ if(!isCoordinator())
+ {
+ ml.waitForState();
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("connected, state was retrieved successfully (in
" + (System.currentTimeMillis() - start)
+ + " milliseconds)");
+ }
}
- catch (Exception e)
+ catch (StateTransferException ste)
{
// make sure we disconnect from the channel before we throw this
exception!
// JBCACHE-761
channel.disconnect();
channel.close();
- throw new CacheException("Unable to fetch state on startup",
e);
+ throw new CacheException("Unable to fetch state on startup",
ste);
}
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups
channel", e);
+ }
+ catch (Exception ex){
+ throw new CacheException("Unable to fetch state on startup",
ex);
+ }
}
+ //otherwise just connect
+ else
+ {
+ try
+ {
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups
channel", e);
+ }
+ }
+ if (log.isInfoEnabled())
+ {
+ log.info("CacheImpl local address is " +
channel.getLocalAddress());
+ }
if (buddyManager != null)
{
buddyManager.init(this);
@@ -795,11 +818,8 @@
if (cacheLoaderManager != null)
{
cacheLoaderManager.preloadCache();
- }
+ }
- // Find out if we are coordinator (blocks until view is received)
- determineCoordinator();
-
// start any eviction threads.
if (regionManager.isUsingEvictions())
{
@@ -1072,16 +1092,6 @@
}
}
- private void determineCoordinator()
- {
- // Synchronize on members to make the answer atomic for the current view
- synchronized (members)
- {
- Address coord = getCoordinator();
- coordinator = (coord == null ? false : coord.equals(getLocalAddress()));
- }
- }
-
/**
* Returns the address of the coordinator or null if there is no
* coordinator.
@@ -1265,41 +1275,6 @@
return result;
}
- protected void fetchStateOnStartup() throws Exception
- {
- long start, stop;
- isStateSet = false;
- start = System.currentTimeMillis();
- boolean rc = channel.getState(null, configuration.getStateRetrievalTimeout());
- if (rc)
- {
- ml.waitForState();
- stop = System.currentTimeMillis();
- if (log.isDebugEnabled())
- {
- log.debug("state was retrieved successfully (in " + (stop - start)
+ " milliseconds)");
- }
- }
- else
- {
- // No one provided us with state. We need to find out if that's because
- // we are the coordinator. But we don't know if the viewAccepted() callback
- // has been invoked, so call determineCoordinator(), which will block until
- // viewAccepted() is called at least once
- determineCoordinator();
-
- if (isCoordinator())
- {
- log.debug("State could not be retrieved (we are the first member in
group)");
- }
- else
- {
- throw new CacheException("Initial state transfer failed: " +
- "Channel.getState() returned false");
- }
- }
- }
-
// ----------- End Marshalling and State Transfer -----------------------
/**
Show replies by date