Author: manik.surtani(a)jboss.com
Date: 2008-01-02 14:32:44 -0500 (Wed, 02 Jan 2008)
New Revision: 4946
Modified:
core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java
Log:
Updated state transfer and BR codebases
Modified: core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/InvocationContext.java 2008-01-02 19:32:14
UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/InvocationContext.java 2008-01-02 19:32:44
UTC (rev 4946)
@@ -88,6 +88,11 @@
return optionOverrides;
}
+ public boolean isOptionsUninitialised()
+ {
+ return optionOverrides == null;
+ }
+
/**
* Sets the option overrides associated with this invocation
*
@@ -122,13 +127,13 @@
public String toString()
{
return "InvocationContext{" +
- "methodCall=" + methodCall +
- "transaction=" + transaction +
- ", globalTransaction=" + globalTransaction +
- ", optionOverrides=" + optionOverrides +
- ", originLocal=" + originLocal +
- ", txHasMods=" + txHasMods +
- '}';
+ "methodCall=" + methodCall +
+ "transaction=" + transaction +
+ ", globalTransaction=" + globalTransaction +
+ ", optionOverrides=" + optionOverrides +
+ ", originLocal=" + originLocal +
+ ", txHasMods=" + txHasMods +
+ '}';
}
public boolean isTxHasMods()
@@ -255,10 +260,11 @@
* If the acq timeout if overwritten for current call, then return that one.
* If not overwritten return default value.
*/
- public long getContextLockAcquisitionTimeout(long defaultFalue) {
+ public long getContextLockAcquisitionTimeout(long defaultFalue)
+ {
long timeout = defaultFalue;
if (getOptionOverrides() != null
- && getOptionOverrides().getLockAcquisitionTimeout() >= 0)
+ && getOptionOverrides().getLockAcquisitionTimeout() >= 0)
{
timeout = getOptionOverrides().getLockAcquisitionTimeout();
}
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-02
19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -11,12 +11,14 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
import org.jboss.cache.RPCManager;
import org.jboss.cache.Region;
import org.jboss.cache.RegionManager;
import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Option;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Stop;
import org.jboss.cache.lock.TimeoutException;
@@ -251,6 +253,8 @@
broadcastBuddyPoolMembership();
+ if (!cache.exists(BUDDY_BACKUP_SUBTREE_FQN))
cache.getRoot().addChildDirect(BUDDY_BACKUP_SUBTREE_FQN);
+
// allow waiting threads to process.
initialisationLatch.countDown();
@@ -519,43 +523,60 @@
Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
newGroup.getGroupName());
- for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
+
+ if (state.isEmpty())
{
- Fqn fqn = entry.getKey();
- if (!regionManager.isInactive(fqn))
+ if (configuredToFetchState())
+ log.info("Data owner has no state to set, even though buddy is
configured to accept state. Assuming there is no data on the data owner.");
+ // create the backup region anyway
+ Option o = cache.getInvocationContext().getOptionOverrides();
+ o.setSkipCacheStatusCheck(true);
+ Node root = cache.getRoot();
+ o = cache.getInvocationContext().getOptionOverrides();
+ o.setCacheModeLocal(true);
+ o.setSkipCacheStatusCheck(true);
+ root.addChild(new Fqn<String>(BUDDY_BACKUP_SUBTREE,
newGroup.getGroupName()));
+ }
+ else
+ {
+ for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
{
- //ClassLoader cl = (marshaller == null) ? null :
marshaller.getClassLoader(fqnS);
- Fqn integrationRoot = new Fqn(integrationBase, fqn);
+ Fqn fqn = entry.getKey();
+ if (!regionManager.isInactive(fqn))
+ {
+ //ClassLoader cl = (marshaller == null) ? null :
marshaller.getClassLoader(fqnS);
+ Fqn integrationRoot = new Fqn(integrationBase, fqn);
- byte[] stateBuffer = entry.getValue();
- MarshalledValueInputStream in = null;
- try
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
- in = new MarshalledValueInputStream(bais);
- //stateMgr.setState(in, integrationRoot, cl);
- stateTransferManager.setState(in, integrationRoot);
- }
- catch (Throwable t)
- {
- if (t instanceof CacheException)
+ byte[] stateBuffer = entry.getValue();
+ MarshalledValueInputStream in = null;
+ try
{
- //excepected/common and can happen due to inactive regions and so on
- log.debug(t);
+ ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
+ in = new MarshalledValueInputStream(bais);
+ //stateMgr.setState(in, integrationRoot, cl);
+ stateTransferManager.setState(in, integrationRoot);
}
- else
+ catch (Throwable t)
{
- //something has gone wrong
- log.error("State for fqn " + fqn
- + " could not be transferred to a buddy at "
- + cache.getLocalAddress(), t);
+ if (t instanceof CacheException)
+ {
+ //excepected/common and can happen due to inactive regions and so
on
+ log.debug(t);
+ }
+ else
+ {
+ //something has gone wrong
+ log.error("State for fqn " + fqn
+ + " could not be transferred to a buddy at "
+ + cache.getLocalAddress(), t);
+ }
}
- }
- finally
- {
- if (in != null)
+ finally
{
- in.close();
+ if (in != null)
+ {
+ in.close();
+ }
}
}
}
@@ -746,25 +767,36 @@
// Create the state transfer map
Map<Fqn, byte[]> stateMap = new HashMap<Fqn, byte[]>();
- byte[] state;
- if (configuration.isUseRegionBasedMarshalling())
+ if (configuredToFetchState())
{
- Collection<Region> regions =
regionManager.getAllRegions(Region.Type.MARSHALLING);
- if (regions.size() > 0)
+ byte[] state;
+ if (configuration.isUseRegionBasedMarshalling())
{
- for (Region r : regions)
+ Collection<Region> regions =
regionManager.getAllRegions(Region.Type.MARSHALLING);
+ if (regions.size() > 0)
{
- Fqn f = r.getFqn();
- state = acquireState(f);
+ for (Region r : regions)
+ {
+ Fqn f = r.getFqn();
+ state = acquireState(f);
+ if (state != null)
+ {
+ stateMap.put(f, state);
+ }
+ }
+ }
+ else if (!configuration.isInactiveOnStartup())
+ {
+ // No regions defined; try the root
+ state = acquireState(Fqn.ROOT);
if (state != null)
{
- stateMap.put(f, state);
+ stateMap.put(Fqn.ROOT, state);
}
}
}
- else if (!configuration.isInactiveOnStartup())
+ else
{
- // No regions defined; try the root
state = acquireState(Fqn.ROOT);
if (state != null)
{
@@ -772,14 +804,6 @@
}
}
}
- else
- {
- state = acquireState(Fqn.ROOT);
- if (state != null)
- {
- stateMap.put(Fqn.ROOT, state);
- }
- }
// now broadcast a message to the newly assigned buddies.
MethodCall membershipCall =
MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup,
stateMap);
@@ -827,6 +851,11 @@
log.trace("addToGroup notification complete");
}
+ private boolean configuredToFetchState()
+ {
+ return configuration.isFetchInMemoryState() || (cache.getCacheLoaderManager() !=
null && cache.getCacheLoaderManager().isFetchPersistentState());
+ }
+
private byte[] acquireState(Fqn fqn) throws CacheException
{
// Call _getState with progressively longer timeouts until we
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-01-02
19:32:14 UTC (rev 4945)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -37,6 +37,7 @@
{
Option txScopeOption = new Option();
txScopeOption.setCacheModeLocal(ctx.getOptionOverrides() != null &&
ctx.getOptionOverrides().isCacheModeLocal());
+ txScopeOption.setSkipCacheStatusCheck(ctx.getOptionOverrides() != null
&& ctx.getOptionOverrides().isSkipCacheStatusCheck());
entry.setOption(txScopeOption);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-01-02
19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -1170,8 +1170,6 @@
public void beforeCompletion()
{
- if (!cache.getCacheStatus().allowInvocations()) throw new
IllegalStateException("Cache not in STARTED state!");
-
if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx "
+ gtx);
entry = txTable.get(gtx);
if (entry == null)
@@ -1183,6 +1181,7 @@
modifications = entry.getModifications();
ctx = cache.getInvocationContext();
+ if (ctx.isOptionsUninitialised() && entry.getOption() != null)
ctx.setOptionOverrides(entry.getOption());
ctx.setOriginLocal(false);
}
@@ -1190,11 +1189,21 @@
// it is supposed to be post commit not actually run the commit
public void afterCompletion(int status)
{
- if (!cache.getCacheStatus().allowInvocations()) throw new
IllegalStateException("Cache not in STARTED state!");
+ // could happen if a rollback is called and beforeCompletion() doesn't get
called.
+ if (ctx == null)
+ {
+ ctx = cache.getInvocationContext();
+ }
+
+ if (ctx.isOptionsUninitialised() && entry.getOption() != null)
+ {
+ // use the options from the transaction entry instead
+ ctx.setOptionOverrides(entry.getOption());
+ }
+
try
{
- // could happen if a rollback is called and beforeCompletion() doesn't
get called.
- if (ctx == null) ctx = cache.getInvocationContext();
+ assertCanContinue();
setTransactionalContext(tx, gtx, ctx);
try
@@ -1249,6 +1258,12 @@
}
}
+ private void assertCanContinue()
+ {
+ if (!cache.getCacheStatus().allowInvocations() &&
(ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
+ throw new IllegalStateException("Cache not in STARTED state!");
+ }
+
/**
* Cleans out (nullifies) member variables held by the sync object for easier gc.
Could be (falsely) seen as a mem
* leak if the TM implementation hangs on to the synchronizations for an
unnecessarily long time even after the tx
Modified:
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-01-02
19:32:14 UTC (rev 4945)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -481,6 +481,7 @@
if (peek(fqn, false, false) == null)
{
getInvocationContext().getOptionOverrides().setFailSilently(true);
+ getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
//GlobalTransaction tx = cache.getCurrentTransaction();
MethodCall m =
MethodCallFactory.create(MethodDeclarations.putForExternalReadMethodLocal, null, fqn, key,
value);
invoke(m);
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-01-02
19:32:14 UTC (rev 4945)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -381,6 +381,7 @@
private Node getInternalNode(Node parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
Node result = parent.getChild(new Fqn(name));
if (result != null)
{
Modified: core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java 2008-01-02
19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java 2008-01-02
19:32:44 UTC (rev 4946)
@@ -51,7 +51,7 @@
try
{
// only scan the current ClassPath location that contains this file. Could be a
directory or a JAR file.
- URL url = getURLPathFromClassLoader("org/jboss/cache/Version.class");
+ URL url = getURLPathFromClassLoader("org/jboss/cache/Cache.class");
String urlPath = url.getFile();
if (urlPath.endsWith("/"))
{