[infinispan-commits] Infinispan SVN: r1944 - in branches/4.1.x: cachestore/remote/src/test/java/org/infinispan/loaders/remote and 17 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jul 1 03:31:38 EDT 2010
Author: mircea.markus
Date: 2010-07-01 03:31:37 -0400 (Thu, 01 Jul 2010)
New Revision: 1944
Added:
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
Removed:
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
Modified:
branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java
branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java
branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java
branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java
branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java
branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java
branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java
branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
branches/4.1.x/core/src/test/resources/log4j.xml
Log:
implemented retry logic in hotrod client
Modified: branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java
===================================================================
--- branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -2,7 +2,7 @@
import org.infinispan.CacheException;
import org.infinispan.loaders.AbstractCacheStoreConfig;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
import org.infinispan.util.FileLookup;
import java.io.IOException;
@@ -43,12 +43,12 @@
public void setUseDefaultRemoteCache(boolean useDefaultRemoteCache) {
if (useDefaultRemoteCache) {
- setRemoteCacheName(DefaultCacheManager.DEFAULT_CACHE_NAME);
+ setRemoteCacheName(CacheContainer.DEFAULT_CACHE_NAME);
}
}
public boolean isUseDefaultRemoteCache() {
- return DefaultCacheManager.DEFAULT_CACHE_NAME.equals(getRemoteCacheName());
+ return CacheContainer.DEFAULT_CACHE_NAME.equals(getRemoteCacheName());
}
@Override
Modified: branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java
===================================================================
--- branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -3,7 +3,7 @@
import org.infinispan.client.hotrod.TestHelper;
import org.infinispan.loaders.BaseCacheStoreFunctionalTest;
import org.infinispan.loaders.CacheStoreConfig;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.fwk.TestCacheManagerFactory;
@@ -27,7 +27,7 @@
localCacheManager = TestCacheManagerFactory.createLocalCacheManager();
hrServer = TestHelper.startHotRodServer(localCacheManager);
- remoteCacheStoreConfig.setRemoteCacheName(DefaultCacheManager.DEFAULT_CACHE_NAME);
+ remoteCacheStoreConfig.setRemoteCacheName(CacheContainer.DEFAULT_CACHE_NAME);
Properties properties = new Properties();
properties.put("hotrod-servers", "localhost:"+ hrServer.getPort());
remoteCacheStoreConfig.setHotRodClientProperties(properties);
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -285,6 +285,7 @@
}
public <K, V> RemoteCache<K, V> getCache(boolean forceReturnValue) {
+ //As per the HotRod protocol specification, the default cache is identified by an empty string
return createRemoteCache("", forceReturnValue);
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -39,6 +39,9 @@
private volatile boolean forceReturnValue;
public RemoteCacheImpl(RemoteCacheManager rcm, String name, boolean forceReturnValue) {
+ if (log.isTraceEnabled()) {
+ log.trace("Creating remote cache: " + name);
+ }
this.name = name;
this.forceReturnValue = forceReturnValue;
this.remoteCacheManager = rcm;
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -14,6 +14,7 @@
/**
* Default marshaller implementation based on object serialization.
+ * todo - the marshaller should only be when writing to the actual transport
*
* @author Mircea.Markus at jboss.com
* @since 4.1
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -20,7 +20,7 @@
* @since 4.1
*/
public class HotRodOperationsHelper {
- static Log log = LogFactory.getLog(HotRodOperationsImpl.class);
+ static Log log = LogFactory.getLog(HotRodOperationsHelper.class);
static final AtomicLong MSG_ID = new AtomicLong();
final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -36,7 +36,7 @@
public byte[] get(byte[] key, Flag[] flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
try {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -57,7 +57,7 @@
public byte[] remove(byte[] key, Flag[] flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -67,7 +67,7 @@
}
} catch (TransportException te) {
logErrorAndThrowExceptionIfNeeded(i, te);
- }finally {
+ } finally {
releaseTransport(transport);
}
}
@@ -76,7 +76,7 @@
public boolean containsKey(byte[] key, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -96,7 +96,7 @@
public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -111,7 +111,7 @@
return new BinaryVersionedValue(version, value);
}
} catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(i, te);
+ logErrorAndThrowExceptionIfNeeded(i, te);
} finally {
releaseTransport(transport);
}
@@ -122,7 +122,7 @@
public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
if (status != NO_ERROR_STATUS) {
@@ -131,7 +131,7 @@
return returnPossiblePrevValue(transport, flags);
} catch (TransportException te) {
logErrorAndThrowExceptionIfNeeded(i, te);
- }finally {
+ } finally {
releaseTransport(transport);
}
}
@@ -140,7 +140,7 @@
public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -162,7 +162,7 @@
public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -185,7 +185,7 @@
*/
public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
// 1) write header
long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
@@ -212,7 +212,7 @@
*/
public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
for (int i = 0; i < transportFactory.getTransportCount(); i++) {
- Transport transport = transportFactory.getTransport(key);
+ Transport transport = getTransport(key, i == 0);
try {
// 1) write header
long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
@@ -354,4 +354,12 @@
log.trace(message + ":" + te);
}
}
+
+ private Transport getTransport(byte[] key, boolean hashAware) {
+ if (hashAware) {
+ return transportFactory.getTransport(key);
+ } else {
+ return transportFactory.getTransport();
+ }
+ }
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -85,7 +85,7 @@
if (!string.isEmpty()) {
writeArray(string.getBytes(CHARSET));
} else {
- writeVInt(0);
+ writeVInt(0);
}
}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -22,8 +22,8 @@
*/
public abstract class HitsAwareCacheManagersTest extends MultipleCacheManagersTest {
- Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
- Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
+ protected Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
+ protected Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
@BeforeMethod
public void createBeforeMethod() throws Throwable {
Deleted: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -1,189 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
-import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.CacheContainer;
-import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
-public class ReplicationRetryTest extends HitsAwareCacheManagersTest {
- HotRodServer hotRodServer1;
- HotRodServer hotRodServer2;
- HotRodServer hotRodServer3;
-
- RemoteCache remoteCache;
- private RemoteCacheManager remoteCacheManager;
- private TcpTransportFactory tcpConnectionFactory;
- private Configuration config;
- private RoundRobinBalancingStrategy strategy;
-
- public ReplicationRetryTest() {
- cleanup = CleanupPhase.AFTER_METHOD;
- }
-
- @Override
- protected void assertSupportedConfig() {
- }
-
- @Override
- protected void createCacheManagers() throws Throwable {
-
- assert cleanup == CleanupPhase.AFTER_METHOD;
-
-
- config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
- CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
- CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
- CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
- registerCacheManager(cm1);
- registerCacheManager(cm2);
- registerCacheManager(cm3);
-
- hotRodServer1 = TestHelper.startHotRodServer(manager(0));
- hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
- hotRodServer2 = TestHelper.startHotRodServer(manager(1));
- hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
- hotRodServer3 = TestHelper.startHotRodServer(manager(2));
- hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
-
- manager(0).getCache();
- manager(1).getCache();
- manager(2).getCache();
-
- TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-
- Properties clientConfig = new Properties();
- clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
- clientConfig.put("force-return-value", "true");
- clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
-
- remoteCacheManager = new RemoteCacheManager(clientConfig);
- remoteCache = remoteCacheManager.getCache();
- tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
- strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
- addInterceptors();
-
- assert super.cacheManagers.size() == 3;
-
- }
-
-
- public void testGet() {
- validateSequenceAndStopServer();
- //now make sure that next call won't fail
- resetStats();
- for (int i = 0; i < 100; i++) {
- assert remoteCache.get("k").equals("v");
- }
- }
-
- public void testPut() {
-
- validateSequenceAndStopServer();
- resetStats();
-
- assert "v".equals(remoteCache.put("k", "v0"));
- for (int i = 1; i < 100; i++) {
- assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
- }
- }
-
- public void testRemove() {
- validateSequenceAndStopServer();
- resetStats();
-
- assertEquals("v", remoteCache.remove("k"));
- }
-
- public void testContains() {
- validateSequenceAndStopServer();
- resetStats();
- assertEquals(true, remoteCache.containsKey("k"));
- }
-
- public void testGetWithVersion() {
- validateSequenceAndStopServer();
- resetStats();
- VersionedValue value = remoteCache.getVersioned("k");
- assertEquals("v", value.getValue());
- }
-
- public void testPutIfAbsent() {
- validateSequenceAndStopServer();
- resetStats();
- assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
- assertEquals("someValue", remoteCache.get("noSuchKey"));
- }
-
- public void testReplace() {
- validateSequenceAndStopServer();
- resetStats();
- assertEquals("v", remoteCache.replace("k", "v2"));
- }
-
- public void testReplaceIfUnmodified() {
- validateSequenceAndStopServer();
- resetStats();
- assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
- }
-
- public void testRemoveIfUnmodified() {
- validateSequenceAndStopServer();
- resetStats();
- assertEquals(false, remoteCache.removeWithVersion("k", 12));
- }
-
- public void testClear() {
- validateSequenceAndStopServer();
- resetStats();
- remoteCache.clear();
- assertEquals(false, remoteCache.containsKey("k"));
- }
-
- private void validateSequenceAndStopServer() {
- resetStats();
- assertNoHits();
- InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
- assertNoHits();
- remoteCache.put("k","v");
-
- assert strategy.getServers().length == 3;
- assertOnlyServerHit(expectedServer);
-
- resetStats();
- expectedServer = strategy.getServers()[strategy.getNextPosition()];
- remoteCache.put("k2","v2");
- assertOnlyServerHit(expectedServer);
-
- resetStats();
- expectedServer = strategy.getServers()[strategy.getNextPosition()];
- remoteCache.put("k3","v3");
- assertOnlyServerHit(expectedServer);
-
- resetStats();
- expectedServer = strategy.getServers()[strategy.getNextPosition()];
- remoteCache.put("k","v");
- assertOnlyServerHit(expectedServer);
-
- //this would be the next server to be shutdown
- expectedServer = strategy.getServers()[strategy.getNextPosition()];
- HotRodServer toStop = addr2hrServer.get(expectedServer);
- toStop.stop();
- }
-}
Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,93 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.client.hotrod.HitsAwareCacheManagersTest;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.TestHelper;
+import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+import java.util.Properties;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractRetryTest extends HitsAwareCacheManagersTest {
+
+ protected HotRodServer hotRodServer1;
+ protected HotRodServer hotRodServer2;
+ protected HotRodServer hotRodServer3;
+
+ RemoteCacheImpl remoteCache;
+ protected RemoteCacheManager remoteCacheManager;
+ protected TcpTransportFactory tcpConnectionFactory;
+ protected Configuration config;
+ protected RoundRobinBalancingStrategy strategy;
+
+ public AbstractRetryTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected void assertSupportedConfig() {
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+
+ assert cleanup == CleanupPhase.AFTER_METHOD;
+
+
+ config = getCacheConfig();
+ CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ registerCacheManager(cm1);
+ registerCacheManager(cm2);
+ registerCacheManager(cm3);
+
+ hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+ hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
+ hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+ hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
+ hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+ hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
+
+ manager(0).getCache();
+ manager(1).getCache();
+ manager(2).getCache();
+
+ waitForClusterToForm();
+
+ Properties clientConfig = new Properties();
+ clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
+ clientConfig.put("force-return-value", "true");
+ clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
+
+ remoteCacheManager = new RemoteCacheManager(clientConfig);
+ remoteCache = (RemoteCacheImpl) remoteCacheManager.getCache();
+ tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+ strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
+ addInterceptors();
+
+ assert super.cacheManagers.size() == 3;
+
+ }
+
+ protected abstract Configuration getCacheConfig();
+
+ protected void waitForClusterToForm() {
+ TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,148 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.Cache;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyGenerator;
+import org.infinispan.client.hotrod.VersionedValue;
+import org.infinispan.client.hotrod.impl.SerializationMarshaller;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransport;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Random;
+import java.util.concurrent.Executors;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "hotrod.retry.DistributionRetryTest", groups = "functional")
+public class DistributionRetryTest extends AbstractRetryTest {
+
+ @Override
+ protected Configuration getCacheConfig() {
+ Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+ config.setNumOwners(1);
+ return config;
+ }
+
+ @Override
+ protected void waitForClusterToForm() {
+ super.waitForClusterToForm();
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+ }
+
+ public void testGet() {
+ log.info("Starting actual test");
+ Object key = generateKeyAndShutdownServer();
+ //now make sure that next call won't fail
+ resetStats();
+ assertEquals(remoteCache.get(key), "v");
+ }
+
+ public void testPut() {
+ Object key = generateKeyAndShutdownServer();
+ log.info("Here it starts");
+ assertEquals(remoteCache.put(key, "v0"), "v");
+ }
+
+ public void testRemove() {
+ Object key = generateKeyAndShutdownServer();
+ assertEquals("v", remoteCache.remove(key));
+ }
+
+ public void testContains() {
+ Object key = generateKeyAndShutdownServer();
+ resetStats();
+ assertEquals(true, remoteCache.containsKey(key));
+ }
+
+ public void testGetWithVersion() {
+ Object key = generateKeyAndShutdownServer();
+ resetStats();
+ VersionedValue value = remoteCache.getVersioned(key);
+ assertEquals("v", value.getValue());
+ }
+
+ public void testPutIfAbsent() {
+ Object key = generateKeyAndShutdownServer();
+ assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+ assertEquals("someValue", remoteCache.get("noSuchKey"));
+ }
+
+ public void testReplace() {
+ Object key = generateKeyAndShutdownServer();
+ assertEquals("v", remoteCache.replace(key, "v2"));
+ }
+
+ public void testReplaceIfUnmodified() {
+ Object key = generateKeyAndShutdownServer();
+ assertEquals(false, remoteCache.replaceWithVersion(key, "v2", 12));
+ }
+
+ public void testRemoveIfUnmodified() {
+ Object key = generateKeyAndShutdownServer();
+ resetStats();
+ assertEquals(false, remoteCache.removeWithVersion(key, 12));
+ }
+
+ public void testClear() {
+ Object key = generateKeyAndShutdownServer();
+ resetStats();
+ remoteCache.clear();
+ assertEquals(false, remoteCache.containsKey(key));
+ }
+
+ private Object generateKeyAndShutdownServer() {
+ resetStats();
+ Cache<Object,Object> cache = manager(1).getCache();
+ KeyAffinityService kaf = KeyAffinityServiceFactory.newKeyAffinityService(cache, Executors.newSingleThreadExecutor(), new ByteKeyGenerator(), 2, true);
+ Address address = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
+ byte[] keyBytes = (byte[]) kaf.getKeyForAddress(address);
+ String key = ByteKeyGenerator.getStringObject(keyBytes);
+ kaf.stop();
+
+ remoteCache.put(key, "v");
+ assertOnlyServerHit(getAddress(hotRodServer2));
+ TcpTransportFactory tcpTp = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+
+ SerializationMarshaller sm = new SerializationMarshaller();
+ TcpTransport transport = (TcpTransport) tcpTp.getTransport(sm.marshallObject(key));
+ try {
+ assertEquals(transport.getServerAddress(), new InetSocketAddress("localhost", hotRodServer2.getPort()));
+ } finally {
+ tcpTp.releaseTransport(transport);
+ }
+
+
+ log.info("About to stop hotrod server 2");
+ hotRodServer2.stop();
+
+
+ return key;
+ }
+
+ static class ByteKeyGenerator implements KeyGenerator {
+ Random r = new Random();
+ @Override
+ public Object getKey() {
+ String result = String.valueOf(r.nextLong());
+ SerializationMarshaller sm = new SerializationMarshaller();
+ return sm.marshallObject(result);
+ }
+
+ static String getStringObject(byte[] bytes) {
+ SerializationMarshaller sm = new SerializationMarshaller();
+ return (String) sm.readObject(bytes);
+ }
+ }
+
+}
Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,137 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.client.hotrod.HitsAwareCacheManagersTest;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.TestHelper;
+import org.infinispan.client.hotrod.VersionedValue;
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
+public class ReplicationRetryTest extends AbstractRetryTest {
+
+ public void testGet() {
+ validateSequenceAndStopServer();
+ //now make sure that next call won't fail
+ resetStats();
+ for (int i = 0; i < 100; i++) {
+ assert remoteCache.get("k").equals("v");
+ }
+ }
+
+ public void testPut() {
+
+ validateSequenceAndStopServer();
+ resetStats();
+
+ assert "v".equals(remoteCache.put("k", "v0"));
+ for (int i = 1; i < 100; i++) {
+ assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
+ }
+ }
+
+ public void testRemove() {
+ validateSequenceAndStopServer();
+ resetStats();
+
+ assertEquals("v", remoteCache.remove("k"));
+ }
+
+ public void testContains() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(true, remoteCache.containsKey("k"));
+ }
+
+ public void testGetWithVersion() {
+ validateSequenceAndStopServer();
+ resetStats();
+ VersionedValue value = remoteCache.getVersioned("k");
+ assertEquals("v", value.getValue());
+ }
+
+ public void testPutIfAbsent() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+ assertEquals("someValue", remoteCache.get("noSuchKey"));
+ }
+
+ public void testReplace() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals("v", remoteCache.replace("k", "v2"));
+ }
+
+ public void testReplaceIfUnmodified() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
+ }
+
+ public void testRemoveIfUnmodified() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(false, remoteCache.removeWithVersion("k", 12));
+ }
+
+ public void testClear() {
+ validateSequenceAndStopServer();
+ resetStats();
+ remoteCache.clear();
+ assertEquals(false, remoteCache.containsKey("k"));
+ }
+
+ private void validateSequenceAndStopServer() {
+ resetStats();
+ assertNoHits();
+ InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ assertNoHits();
+ remoteCache.put("k","v");
+
+ assert strategy.getServers().length == 3;
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k2","v2");
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k3","v3");
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k","v");
+ assertOnlyServerHit(expectedServer);
+
+ //this would be the next server to be shutdown
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ HotRodServer toStop = addr2hrServer.get(expectedServer);
+ toStop.stop();
+ }
+
+ @Override
+ protected Configuration getCacheConfig() {
+ return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+ }
+}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -53,7 +53,7 @@
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.MarshalledValue;
import org.infinispan.marshall.Marshaller;
@@ -368,7 +368,7 @@
@ManagedAttribute(description = "Returns the cache name")
@Metric(displayName = "Cache name", dataType = DataType.TRAIT, displayType = DisplayType.SUMMARY)
public String getCacheName() {
- return getName().equals(DefaultCacheManager.DEFAULT_CACHE_NAME) ? "Default Cache" : getName();
+ return getName().equals(CacheContainer.DEFAULT_CACHE_NAME) ? "Default Cache" : getName();
}
public String getVersion() {
Modified: branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -107,7 +107,7 @@
private InvocationContextContainer icc;
@ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state. If false, the join process is still in progress.")
@Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
- volatile boolean joinComplete = false;
+ private volatile boolean joinComplete = false;
Future<Void> joinFuture;
final List<Address> leavers = new CopyOnWriteArrayList<Address>();
volatile Future<Void> leaveTaskFuture;
@@ -131,12 +131,19 @@
@Start(priority = 20)
public void start() throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("Starting distribution manager on " + getMyAddress());
+ }
replCount = configuration.getNumOwners();
listener = new ViewChangeListener();
notifier.addListener(listener);
join();
}
+ private Address getMyAddress() {
+ return rpcManager != null? rpcManager.getAddress(): null;
+ }
+
// To avoid blocking other components' start process, wait last, if necessary, for join to complete.
@Start(priority = 1000)
public void waitForJoinToComplete() throws Throwable {
@@ -162,7 +169,7 @@
JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
joinFuture = rehashExecutor.submit(joinTask);
} else {
- joinComplete = true;
+ setJoinComplete(true);
}
startLatch.open();
}
@@ -171,7 +178,7 @@
public void stop() {
notifier.removeListener(listener);
rehashExecutor.shutdownNow();
- joinComplete = false;
+ setJoinComplete(false);
}
public void rehash(List<Address> newMembers, List<Address> oldMembers) {
@@ -368,7 +375,10 @@
public class ViewChangeListener {
@ViewChanged
public void handleViewChange(ViewChangedEvent e) {
- boolean started = false;
+ if (log.isTraceEnabled()) {
+ log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
+ }
+ boolean started;
// how long do we wait for a startup?
if (e.isNeedsToRejoin()) {
try {
@@ -416,6 +426,13 @@
return joinComplete;
}
+ public void setJoinComplete(boolean joinComplete) {
+ if (log.isTraceEnabled()) {
+ log.trace("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());
+ }
+ this.joinComplete = joinComplete;
+ }
+
void drainLocalTransactionLog() {
List<WriteCommand> c;
while (transactionLogger.shouldDrainWithoutLock()) {
Modified: branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -72,13 +72,15 @@
protected void performRehash() throws Exception {
long start = System.currentTimeMillis();
boolean trace = log.isTraceEnabled();
- if (log.isDebugEnabled()) log.debug("Commencing");
+ if (log.isDebugEnabled()) log.debug("Commencing rehash on node: " + getMyAddress() + ". Before start, dmi.joinComplete = " + dmi.isJoinComplete());
TransactionLogger transactionLogger = dmi.getTransactionLogger();
boolean unlocked = false;
ConsistentHash chOld;
ConsistentHash chNew;
try {
- dmi.joinComplete = false;
+ if (dmi.isJoinComplete()) {
+ throw new IllegalStateException("Join cannot be complete without rehash to finish (node " + getMyAddress() + " )");
+ }
// 1. Get chOld from coord.
chOld = retrieveOldCH(trace);
@@ -142,7 +144,7 @@
throw new CacheException("Unexpected exception", e);
} finally {
if (!unlocked) transactionLogger.unlockAndDisable();
- dmi.joinComplete = true;
+ dmi.setJoinComplete(true);
}
}
@@ -230,4 +232,8 @@
if (!l.contains(plusOne)) l.add(plusOne);
return l;
}
+
+ public Address getMyAddress() {
+ return rpcManager != null && rpcManager.getTransport() != null ? rpcManager.getTransport().getAddress() : null;
+ }
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -27,6 +27,7 @@
import org.infinispan.config.Configuration;
import org.infinispan.config.ConfigurationException;
import org.infinispan.jmx.CacheJmxRegistration;
+import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
/**
@@ -68,7 +69,7 @@
}
public Cache<K, V> createDefaultCache(Configuration configuration) throws ConfigurationException {
- return createCache(configuration, null, DefaultCacheManager.DEFAULT_CACHE_NAME);
+ return createCache(configuration, null, CacheContainer.DEFAULT_CACHE_NAME);
}
protected AdvancedCache<K, V> createAndWire(Configuration configuration, GlobalComponentRegistry globalComponentRegistry, String cacheName) throws Exception {
Modified: branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -54,6 +54,7 @@
* @since 4.0
*/
public interface CacheContainer extends Lifecycle {
+ String DEFAULT_CACHE_NAME = "___defaultcache";
/**
* Retrieves the default cache associated with this cache container.
Modified: branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -106,7 +106,6 @@
@SurvivesRestarts
@MBean(objectName = DefaultCacheManager.OBJECT_NAME, description = "Component that acts as a manager, factory and container for caches in the system.")
public class DefaultCacheManager implements EmbeddedCacheManager, CacheManager {
- public static final String DEFAULT_CACHE_NAME = "___defaultcache";
public static final String OBJECT_NAME = "CacheManager";
private static final Log log = LogFactory.getLog(DefaultCacheManager.class);
protected final GlobalConfiguration globalConfiguration;
Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -207,4 +207,9 @@
* a null otherwise.
*/
Address getCurrentStateTransferSource();
+
+ /**
+ * Returns the address associated with this RpcManager or null if not part of the cluster.
+ */
+ public Address getAddress();
}
\ No newline at end of file
Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -322,7 +322,7 @@
@ManagedAttribute(description = "The network address associated with this instance")
@Metric(displayName = "Network address", dataType = DataType.TRAIT, displayType = DisplayType.SUMMARY)
- public String getAddress() {
+ public String getNodeAddress() {
if (t == null || !isStatisticsEnabled()) return "N/A";
Address address = t.getAddress();
return address == null ? "N/A" : address.toString();
@@ -383,5 +383,9 @@
// mainly for unit testing
public void setTransport(Transport t) {
this.t = t;
+ }@Override
+
+ public Address getAddress() {
+ return t != null ? t.getAddress() : null;
}
}
Modified: branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -15,6 +15,8 @@
import org.infinispan.test.TestingUtil;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
@@ -93,16 +95,21 @@
* This is a separate class because some tools try and run this method as a test
*/
public static class RehashWaiter {
+ private static Log log = LogFactory.getLog(RehashWaiter.class);
public static void waitForInitRehashToComplete(Cache... caches) {
int gracetime = 60000; // 60 seconds?
long giveup = System.currentTimeMillis() + gracetime;
for (Cache c : caches) {
DistributionManagerImpl dmi = (DistributionManagerImpl) TestingUtil.extractComponent(c, DistributionManager.class);
- while (!dmi.joinComplete) {
- if (System.currentTimeMillis() > giveup)
- throw new RuntimeException("Timed out waiting for initial join sequence to complete!");
+ while (!dmi.isJoinComplete()) {
+ if (System.currentTimeMillis() > giveup) {
+ String message = "Timed out waiting for initial join sequence to complete on node " + dmi.rpcManager.getAddress() + " !";
+ log.error(message);
+ throw new RuntimeException(message);
+ }
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}
+ log.trace("Node " + dmi.rpcManager.getAddress() + " finished rehash task.");
}
}
@@ -151,7 +158,7 @@
boolean allOK = true;
for (Cache c : joiners) {
DistributionManagerImpl dmi = (DistributionManagerImpl) getDistributionManager(c);
- allOK &= dmi.joinComplete;
+ allOK &= dmi.isJoinComplete();
}
if (allOK) return;
TestingUtil.sleepThread(100);
Modified: branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -38,7 +38,7 @@
joinerManager.defineConfiguration(cacheName, configuration);
Cache joiner = joinerManager.getCache(cacheName);
DistributionManager dmi = joiner.getAdvancedCache().getDistributionManager();
- assert dmi.isJoinComplete() == true;
+ assert dmi.isJoinComplete();
}
}
Modified: branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -31,7 +31,6 @@
import org.infinispan.CacheException;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.manager.CacheContainer;
-import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
@@ -53,7 +52,7 @@
}
public void testStartStopManagedOperations() throws Exception {
- ObjectName defaultOn = new ObjectName(JMX_DOMAIN + ":cache-name=" + DefaultCacheManager.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
+ ObjectName defaultOn = new ObjectName(JMX_DOMAIN + ":cache-name=" + CacheContainer.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
ObjectName managerON = new ObjectName(JMX_DOMAIN + ":cache-name=[global],jmx-resource=CacheManager");
server.invoke(managerON, "startCache", new Object[]{}, new String[]{});
assert ComponentStatus.RUNNING.toString().equals(server.getAttribute(defaultOn, "CacheStatus"));
@@ -83,7 +82,7 @@
public void testManagerStopRemovesCacheMBean(Method m) throws Exception {
final String otherJmxDomain = JMX_DOMAIN + '.' + m.getName();
- ObjectName defaultOn = new ObjectName(otherJmxDomain + ":cache-name=" + DefaultCacheManager.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
+ ObjectName defaultOn = new ObjectName(otherJmxDomain + ":cache-name=" + CacheContainer.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
ObjectName galderOn = new ObjectName(otherJmxDomain + ":cache-name=galder(local),jmx-resource=Cache");
ObjectName managerON = new ObjectName(otherJmxDomain + ":cache-name=[global],jmx-resource=CacheManager");
CacheContainer otherContainer = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(otherJmxDomain);
Modified: branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -21,10 +21,10 @@
try {
assert cm.getCache().getStatus() == ComponentStatus.RUNNING;
- assert cm.getCache().getName().equals(DefaultCacheManager.DEFAULT_CACHE_NAME);
+ assert cm.getCache().getName().equals(CacheContainer.DEFAULT_CACHE_NAME);
try {
- cm.defineConfiguration(DefaultCacheManager.DEFAULT_CACHE_NAME, new Configuration());
+ cm.defineConfiguration(CacheContainer.DEFAULT_CACHE_NAME, new Configuration());
assert false : "Should fail";
}
catch (IllegalArgumentException e) {
Modified: branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2010-07-01 07:31:37 UTC (rev 1944)
@@ -87,7 +87,7 @@
replicationLatch = new CountDownLatch(1);
controlledRpcManager1.setReplicationLatch(replicationLatch);
controlledRpcManager2.setReplicationLatch(replicationLatch);
- log.trace("_________________________ Here is beggins");
+ log.trace("_________________________ Here it begins");
}
@AfterMethod
@@ -310,5 +310,10 @@
public Address getCurrentStateTransferSource() {
return realOne.getCurrentStateTransferSource();
}
+
+ @Override
+ public Address getAddress() {
+ return null;
+ }
}
}
Modified: branches/4.1.x/core/src/test/resources/log4j.xml
===================================================================
--- branches/4.1.x/core/src/test/resources/log4j.xml 2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/resources/log4j.xml 2010-07-01 07:31:37 UTC (rev 1944)
@@ -8,7 +8,7 @@
<!-- A time/date based rolling appender -->
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="infinispan.log"/>
+ <param name="File" value="infinispan_test.log"/>
<param name="Append" value="false"/>
<!-- Rollover at midnight each day -->
@@ -65,7 +65,7 @@
<!-- ======================= -->
<root>
- <priority value="WARN"/>
+ <priority value="TRACE"/>
<!--<appender-ref ref="CONSOLE"/>-->
<appender-ref ref="FILE"/>
</root>
More information about the infinispan-commits
mailing list