Author: manik.surtani(a)jboss.com
Date: 2007-12-23 08:16:29 -0500 (Sun, 23 Dec 2007)
New Revision: 4917
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
Log:
Fixed failing unit tests
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2007-12-22 17:04:39 UTC (rev
4916)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2007-12-23 13:16:29 UTC (rev
4917)
@@ -25,8 +25,6 @@
public List callRemoteMethods(List<Address> recipients, Method method, Object[]
arguments, boolean synchronous, boolean excludeSelf, long timeout) throws Exception;
- public void setCache(CacheSPI c);
-
/**
* @return Returns the replication queue (if one is used), null otherwise.
*/
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-12-22 17:04:39 UTC
(rev 4916)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-12-23 13:16:29 UTC
(rev 4917)
@@ -77,11 +77,6 @@
return c.callRemoteMethods(recipients, method, arguments, synchronous, excludeSelf,
timeout);
}
- public void setCache(CacheSPI c)
- {
- this.c = (CacheImpl) c;
- }
-
/**
* @return Returns the replication queue (if one is used), null otherwise.
*/
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2007-12-22
17:04:39 UTC (rev 4916)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2007-12-23
13:16:29 UTC (rev 4917)
@@ -116,12 +116,26 @@
public void registerComponent(String name, Object component)
{
// this will make sure all dependent components are stopped or set to CONSTRUCTED
so they can be re-wired later.
- unregisterComponent(name);
+ Component c = new Component(name, component);
+ Component old = componentLookup.get(name);
+ if (old != null)
+ {
+ // unregister the old component so that components that depend on it can be
stopped if necessary
+ unregisterComponent(name);
+ // components that depended on the old component should now depend on the new
one.
+ c.dependencyFor.addAll(old.dependencyFor);
+ }
+ addComponentDependencies(c);
- Component c = new Component(name, component);
componentLookup.put(name, c);
- addComponentDependencies(c);
- c.changeState(overallState == null ? CONSTRUCTED : overallState);
+ State stateToMoveTo = overallState == null ? CONSTRUCTED : overallState;
+ c.changeState(stateToMoveTo);
+
+ // make sure any other omponents that have inadvertently been stopped are now
restarted.
+ for (Component comp : componentLookup.values())
+ {
+ if (comp.state != stateToMoveTo) comp.changeState(stateToMoveTo);
+ }
}
protected void addComponentDependencies(Component c)
@@ -130,6 +144,8 @@
for (Dependency d : c.dependencies)
{
getOrCreateComponent(d.name, d.type);
+ Component dependencyComponent = componentLookup.get(d.name);
+ if (dependencyComponent != null)
dependencyComponent.dependencyFor.add(c.asDependency());
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-12-22
17:04:39 UTC (rev 4916)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-12-23
13:16:29 UTC (rev 4917)
@@ -5,9 +5,11 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Option;
+import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
@@ -26,16 +28,22 @@
*/
public abstract class BaseRpcInterceptor extends MethodDispacherInterceptor
{
-
private BuddyManager buddyManager;
+ private RPCManager rpcManager;
private boolean usingBuddyReplication;
protected boolean defaultSynchronous;
+ @Inject
+ private void injectComponents(RPCManager rpcManager, BuddyManager buddyManager)
+ {
+ this.rpcManager = rpcManager;
+ this.buddyManager = buddyManager;
+ usingBuddyReplication = buddyManager != null && buddyManager.isEnabled();
+ }
+
public void setCache(CacheSPI cache)
{
super.setCache(cache);
- buddyManager = cache.getBuddyManager();
- usingBuddyReplication = buddyManager != null;
CacheMode mode = cache.getConfiguration().getCacheMode();
defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode ==
CacheMode.INVALIDATION_SYNC);
}
@@ -81,7 +89,7 @@
else if (te.isForceSyncReplication()) sync = true;
}
}
- if (!sync && cache.getRPCManager().getReplicationQueue() != null &&
!usingBuddyReplication)
+ if (!sync && rpcManager.getReplicationQueue() != null &&
!usingBuddyReplication)
{
putCallOnAsyncReplicationQueue(call);
}
@@ -100,7 +108,7 @@
long syncReplTimeout = o.getSyncReplTimeout();
if (syncReplTimeout < 0) syncReplTimeout =
configuration.getSyncReplTimeout();
- List rsps = cache.getRPCManager().callRemoteMethods(callRecipients,
+ List rsps = rpcManager.callRemoteMethods(callRecipients,
MethodDeclarations.replicateMethod,
new Object[]{call},
sync, // is synchronised?
@@ -118,7 +126,7 @@
protected void putCallOnAsyncReplicationQueue(MethodCall call)
{
if (log.isDebugEnabled()) log.debug("Putting call " + call + " on
the replication queue.");
-
cache.getRPCManager().getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod,
call));
+
rpcManager.getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod,
call));
}
//todo info expt for this is InvocationContext, move method there
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2007-12-22
17:04:39 UTC (rev 4916)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2007-12-23
13:16:29 UTC (rev 4917)
@@ -6,6 +6,7 @@
*/
package org.jboss.cache.interceptors;
+import org.apache.commons.logging.Log;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
@@ -21,13 +22,12 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
-import org.apache.commons.logging.Log;
import org.jgroups.Address;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.Map;
/**
* Replication interceptor for the optimistically locked interceptor chain. Responsible
for replicating
@@ -53,6 +53,7 @@
return log;
}
+ @Override
protected boolean skipMethodCall(InvocationContext ctx)
{
// bypass for buddy group org metod calls.
@@ -68,6 +69,7 @@
return false;
}
+ @Override
protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
{
// pass up the chain.
@@ -82,6 +84,7 @@
return retval;
}
+ @Override
protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
{
//lets broadcast the commit first
@@ -109,6 +112,7 @@
return retval;
}
+ @Override
protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
{
// lets broadcast the rollback first
@@ -136,13 +140,21 @@
return retval;
}
+ @Override
protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
{
+ return handlePutForExternalReadMethod(ctx, gtx, fqn, key, value);
+ }
+
+ @Override
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value) throws Throwable
+ {
gtx = getGlobalTransaction(ctx);
cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
return nextInterceptor(ctx);
}
+
private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
{
// get the current gtx
@@ -174,8 +186,8 @@
if (log.isDebugEnabled())
{
log.debug("(" + cache.getLocalAddress()
- + "): broadcasting prepare for " + gtx
- + " (" + num_mods + " modifications");
+ + "): broadcasting prepare for " + gtx
+ + " (" + num_mods + " modifications");
}
replicateCall(toBroadcast, remoteCallSync, ctx.getOptionOverrides());
@@ -186,7 +198,7 @@
if (log.isDebugEnabled())
{
log.debug("(" + cache.getLocalAddress()
- + "):not broadcasting prepare as members are " +
cache.getMembers());
+ + "):not broadcasting prepare as members are " +
cache.getMembers());
}
}
}
Modified:
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2007-12-22
17:04:39 UTC (rev 4916)
+++
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2007-12-23
13:16:29 UTC (rev 4917)
@@ -170,7 +170,8 @@
RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
// inject a mock RPC manager so that we can test whether calls made are sync or
async.
- cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+ //cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
rpcManager);
// invalidations will not trigger any rpc call sfor PFER
if (!isUsingInvalidation())
@@ -186,8 +187,9 @@
// now try a simple replication. Since the RPCManager is a mock object it will not
actually replicate anything.
cache1.putForExternalRead(fqn, key, value);
verify(rpcManager);
+
// cleanup
- cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
originalRpcManager);
cache1.removeNode(fqn);
}
@@ -241,7 +243,7 @@
}
};
- barfingRpcManager.setCache((CacheSPI<String, String>) cache1);
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
barfingRpcManager);
cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
try