[infinispan-commits] Infinispan SVN: r1777 - in trunk: core/src/main/java/org/infinispan/distribution and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue May 11 08:56:42 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-05-11 08:56:39 -0400 (Tue, 11 May 2010)
New Revision: 1777
Modified:
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-425] (Stale data read when L1 invalidation happens while UnionConsistentHash is in use) Fixed by making sure rehash is completed before startup has finished. As a result, Hot Rod has been enhanced so that all defined caches are started up upon startup. Request to undefined caches will throw error.
Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java 2010-05-11 12:56:39 UTC (rev 1777)
@@ -9,7 +9,11 @@
import org.infinispan.marshall.Marshallable;
import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import java.util.Arrays;
+
/**
* Invalidates an entry in a L1 cache (used with DIST mode)
*
@@ -19,6 +23,7 @@
@Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.INVALIDATE_L1_COMMAND)
public class InvalidateL1Command extends InvalidateCommand {
public static final int COMMAND_ID = 7;
+ private static final Log log = LogFactory.getLog(InvalidateL1Command.class);
private DistributionManager dm;
private DataContainer dataContainer;
private Configuration config;
@@ -53,8 +58,10 @@
if (forRehash && config.isL1OnRehash()) {
for (Object k : getKeys()) {
InternalCacheEntry ice = dataContainer.get(k);
- if (ice != null)
+ if (ice != null) {
+ if (log.isTraceEnabled()) log.trace("Not removing, instead putting entry into L1.");
dataContainer.put(k, ice.getValue(), config.getL1Lifespan(), config.getExpirationMaxIdle());
+ }
}
} else {
for (Object k : getKeys()) {
@@ -121,4 +128,13 @@
result = 31 * result + (forRehash ? 1 : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "keys=" + Arrays.toString(keys) +
+ ", forRehash=" + forRehash +
+ '}';
+ }
+
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-05-11 12:56:39 UTC (rev 1777)
@@ -37,6 +37,7 @@
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.MembershipArithmetic;
+import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
@@ -107,6 +108,7 @@
@ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state. If false, the join process is still in progress.")
@Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
volatile boolean joinComplete = false;
+ Future<Void> joinFuture;
final List<Address> leavers = new CopyOnWriteArrayList<Address>();
volatile Future<Void> leaveTaskFuture;
final ReclosableLatch startLatch = new ReclosableLatch(false);
@@ -135,13 +137,22 @@
join();
}
+ // To avoid blocking other components' start process, wait last, if necessary, for join to complete.
+ @Start(priority = 1000)
+ public void waitForJoinToComplete() throws Exception {
+ if (joinFuture != null)
+ joinFuture.get();
+ }
+
private void join() throws Exception {
startLatch.close();
- consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
- self = rpcManager.getTransport().getAddress();
- if (rpcManager.getTransport().getMembers().size() > 1) {
+ Transport t = rpcManager.getTransport();
+ List<Address> members = t.getMembers();
+ consistentHash = createConsistentHash(configuration, members);
+ self = t.getAddress();
+ if (members.size() > 1 && !t.getCoordinator().equals(self)) {
JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
- rehashExecutor.submit(joinTask);
+ joinFuture = rehashExecutor.submit(joinTask);
} else {
joinComplete = true;
}
@@ -278,7 +289,7 @@
return new LinkedList<Address>(consistentHash.getCaches());
} else {
if (trace)
- log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+ log.trace("Not alowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
return null;
}
}
@@ -321,6 +332,7 @@
}
public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
+ if (trace) log.trace("Apply state with " + state);
for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
InternalCacheValue v = e.getValue();
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-05-11 12:56:39 UTC (rev 1777)
@@ -73,6 +73,7 @@
protected void performRehash() throws Exception {
long start = System.currentTimeMillis();
+ boolean trace = log.isTraceEnabled();
if (log.isDebugEnabled()) log.debug("Commencing");
boolean unlocked = false;
try {
@@ -85,7 +86,7 @@
Random rand = new Random();
long giveupTime = System.currentTimeMillis() + maxWaitTime;
do {
- if (log.isTraceEnabled()) log.trace("Requesting old consistent hash from coordinator");
+ if (trace) log.trace("Requesting old consistent hash from coordinator");
List<Response> resp;
List<Address> addresses;
try {
@@ -103,7 +104,7 @@
if (addresses == null) {
long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
time = (time * 10) + minSleepTime;
- if (log.isTraceEnabled()) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
+ if (trace) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
Thread.sleep(time); // sleep for a while and retry
} else {
chOld = createConsistentHash(configuration, addresses);
@@ -147,6 +148,8 @@
// 8. Drain logs
dmi.drainLocalTransactionLog();
+ } else {
+ if (trace) log.trace("Rehash not enabled, so not pulling state.");
}
unlocked = true;
@@ -163,7 +166,7 @@
invalidateInvalidHolders(chOld, chNew);
}
- if (log.isInfoEnabled())
+ if (trace)
log.info("{0} completed join in {1}!", self, Util.prettyPrintTime(System.currentTimeMillis() - start));
} catch (Exception e) {
log.error("Caught exception!", e);
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -26,7 +26,10 @@
this.workerThreads = workerThreads
this.cacheManager = cacheManager
+ // Register rank calculator before starting any cache so that we can capture all view changes
cacheManager.addListener(getRankCalculatorListener)
+ // Start default cache
+ startDefaultCache
val address = new InetSocketAddress(host, port)
val encoder = getEncoder
val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
@@ -45,4 +48,5 @@
def getPort = port
+ def startDefaultCache = cacheManager.getCache
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -1,6 +1,5 @@
package org.infinispan.server.hotrod
-import org.infinispan.Cache
import org.infinispan.stats.Stats
import org.infinispan.server.core._
import transport._
@@ -10,13 +9,13 @@
import org.infinispan.server.hotrod.ProtocolFlag._
import org.infinispan.server.hotrod.OperationResponse._
import java.nio.channels.ClosedChannelException
+import org.infinispan.{CacheException, Cache}
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
class HotRodDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[CacheKey, CacheValue] {
import HotRodDecoder._
@@ -65,9 +64,13 @@
}
override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
- // TODO: Document this in wiki
- if (header.cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
- else cacheManager.getCache(header.cacheName)
+ // TODO: Document DefaultCacheManager.DEFAULT_CACHE_NAME usage in wiki
+ val cacheName = header.cacheName
+ if (cacheName != DefaultCacheManager.DEFAULT_CACHE_NAME && !cacheManager.getCacheNames.contains(cacheName))
+ throw new CacheNotFoundException("Cache with name '" + cacheName + "' not found amongst the configured caches")
+
+ if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
+ else cacheManager.getCache(cacheName)
}
override def readKey(h: HotRodHeader, b: ChannelBuffer): CacheKey =
@@ -151,4 +154,6 @@
.append("messageId=").append(messageId)
.append("}").toString
}
-}
\ No newline at end of file
+}
+
+class CacheNotFoundException(msg: String) extends CacheException(msg)
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -19,7 +19,6 @@
* @author Galder Zamarreño
* @since 4.1
*/
-
class HotRodServer extends AbstractProtocolServer("HotRod") with Logging {
import HotRodServer._
private var isClustered: Boolean = _
@@ -34,6 +33,10 @@
override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
super.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+ // Start defined caches to avoid issues with lazily started caches
+ for (cacheName <- asIterator(cacheManager.getCacheNames.iterator))
+ cacheManager.getCache(cacheName)
+
isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null
// If clustered, set up a cache for topology information
if (isClustered)
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -11,6 +11,7 @@
import collection.mutable.ListBuffer
import org.infinispan.distribution.UnionConsistentHash
import org.infinispan.test.TestingUtil
+import org.infinispan.test.AbstractCacheTest._
/**
* // TODO: Document this
@@ -20,8 +21,6 @@
@Test(groups = Array("functional"), testName = "server.hotrod.HotRodDistributionTest")
class HotRodDistributionTest extends HotRodMultiNodeTest {
- import HotRodServer._
-
override protected def cacheName: String = "hotRodDistSync"
override protected def createCacheConfig: Configuration = getDefaultClusteredConfig(CacheMode.DIST_SYNC)
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -52,6 +52,11 @@
assertTrue(Arrays.equals(value.data, v(m)));
}
+// def testPutOnUndefinedCache(m: Method) {
+// val status = client.execute(0xA0, 0x01, "boomooo", k(m), 0, 0, v(m), 0, 1, 0).status
+// assertEquals(status, ServerError, "Status should have been 'ServerError' but instead was: " + status)
+// }
+
def testPutWithLifespan(m: Method) {
client.assertPut(m, 1, 0)
Thread.sleep(1100)
@@ -294,7 +299,7 @@
for (i <- 1 to 5) {
val key = k(m, "k" + i + "-");
val value = v(m, "v" + i + "-");
- assertStatus(client.put(key , 0, 0, value).status, Success)
+ assertStatus(client.put(key, 0, 0, value).status, Success)
assertStatus(client.containsKey(key, 0).status, Success)
}
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -7,13 +7,13 @@
import org.testng.Assert._
import org.testng.annotations.Test
import org.infinispan.test.TestingUtil
+import org.infinispan.test.AbstractCacheTest._
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
@Test(groups = Array("functional"), testName = "server.hotrod.HotRodReplicationTest")
class HotRodReplicationTest extends HotRodMultiNodeTest {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -24,6 +24,7 @@
override def createCacheManager: CacheManager = {
val cacheManager = createTestCacheManager
+ cacheManager.defineConfiguration(cacheName, cacheManager.getDefaultConfiguration)
advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
hotRodServer = createStartHotRodServer(cacheManager)
hotRodClient = connectClient
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -19,17 +19,15 @@
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
-class MemcachedDecoder(cacheManager: CacheManager, scheduler: ScheduledExecutorService)
+class MemcachedDecoder(cache: Cache[String, MemcachedValue], scheduler: ScheduledExecutorService)
extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
import RequestResolver._
type SuitableParameters = MemcachedParameters
type SuitableHeader = RequestHeader
- private lazy val cache = createCache
private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
private final val incrMisses = new AtomicLong(0)
private final val incrHits = new AtomicLong(0)
@@ -155,8 +153,6 @@
override def getCache(h: RequestHeader): Cache[String, MemcachedValue] = cache
- protected def createCache: Cache[String, MemcachedValue] = cacheManager.getCache[String, MemcachedValue]
-
override def handleCustomRequest(h: RequestHeader, b: ChannelBuffer, cache: Cache[String, MemcachedValue]): AnyRef = {
h.op match {
case AppendRequest | PrependRequest => {
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -8,16 +8,15 @@
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
class MemcachedServer extends AbstractProtocolServer("Memcached") {
protected lazy val scheduler = Executors.newScheduledThreadPool(1)
override def getEncoder: Encoder = null
- override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager, scheduler)
+ override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager.getCache[String, MemcachedValue], scheduler)
override def stop {
super.stop
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -8,13 +8,13 @@
import java.util.concurrent.TimeUnit
import java.lang.reflect.Method
import net.spy.memcached.{CASResponse, MemcachedClient}
+import org.infinispan.test.AbstractCacheTest._
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
@Test(groups = Array("functional"), testName = "server.memcached.MemcachedReplicationTest")
class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
private val cacheName = "MemcachedReplSync"
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-05-11 12:56:39 UTC (rev 1777)
@@ -12,9 +12,8 @@
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
trait MemcachedTestingUtil {
def host = "127.0.0.1"
@@ -48,11 +47,10 @@
def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
val server = new MemcachedServer {
- override def getDecoder: Decoder = {
- new MemcachedDecoder(getCacheManager, scheduler) {
- override def createCache = getCacheManager.getCache[String, MemcachedValue](cacheName)
- }
- }
+ override def getDecoder: Decoder =
+ new MemcachedDecoder(getCacheManager.getCache[String, MemcachedValue](cacheName), scheduler)
+
+ override def startDefaultCache = getCacheManager.getCache(cacheName)
}
server.start(host, port, cacheManager, 0, 0, 0)
server
More information about the infinispan-commits
mailing list