[infinispan-commits] Infinispan SVN: r1777 - in trunk: core/src/main/java/org/infinispan/distribution and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue May 11 08:56:42 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-05-11 08:56:39 -0400 (Tue, 11 May 2010)
New Revision: 1777

Modified:
   trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-425] (Stale data read when L1 invalidation happens while UnionConsistentHash is in use) Fixed by making sure rehash is completed before startup has finished. As a result, Hot Rod has been enhanced so that all defined caches are started up upon startup. Request to undefined caches will throw error.

Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	2010-05-11 12:56:39 UTC (rev 1777)
@@ -9,7 +9,11 @@
 import org.infinispan.marshall.Marshallable;
 import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
+import java.util.Arrays;
+
 /**
  * Invalidates an entry in a L1 cache (used with DIST mode)
  *
@@ -19,6 +23,7 @@
 @Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.INVALIDATE_L1_COMMAND)
 public class InvalidateL1Command extends InvalidateCommand {
    public static final int COMMAND_ID = 7;
+   private static final Log log = LogFactory.getLog(InvalidateL1Command.class);
    private DistributionManager dm;
    private DataContainer dataContainer;
    private Configuration config;
@@ -53,8 +58,10 @@
       if (forRehash && config.isL1OnRehash()) {
          for (Object k : getKeys()) {
             InternalCacheEntry ice = dataContainer.get(k);
-            if (ice != null)
+            if (ice != null) {
+               if (log.isTraceEnabled()) log.trace("Not removing, instead putting entry into L1.");
                dataContainer.put(k, ice.getValue(), config.getL1Lifespan(), config.getExpirationMaxIdle());
+            }
          }
       } else {
          for (Object k : getKeys()) {
@@ -121,4 +128,13 @@
       result = 31 * result + (forRehash ? 1 : 0);
       return result;
    }
+
+   @Override
+   public String toString() {
+      return getClass().getSimpleName() + "{" +
+            "keys=" + Arrays.toString(keys) +
+            ", forRehash=" + forRehash +
+            '}';
+   }
+
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-05-11 12:56:39 UTC (rev 1777)
@@ -37,6 +37,7 @@
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.MembershipArithmetic;
+import org.infinispan.remoting.transport.Transport;
 import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.ReclosableLatch;
 import org.infinispan.util.logging.Log;
@@ -107,6 +108,7 @@
    @ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.")
    @Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
    volatile boolean joinComplete = false;
+   Future<Void> joinFuture;
    final List<Address> leavers = new CopyOnWriteArrayList<Address>();
    volatile Future<Void> leaveTaskFuture;
    final ReclosableLatch startLatch = new ReclosableLatch(false);
@@ -135,13 +137,22 @@
       join();
    }
 
+   // To avoid blocking other components' start process, wait last, if necessary, for join to complete.
+   @Start(priority = 1000)
+   public void waitForJoinToComplete() throws Exception {
+      if (joinFuture != null)
+         joinFuture.get();
+   }
+
    private void join() throws Exception {
       startLatch.close();
-      consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
-      self = rpcManager.getTransport().getAddress();
-      if (rpcManager.getTransport().getMembers().size() > 1) {
+      Transport t = rpcManager.getTransport();
+      List<Address> members = t.getMembers();
+      consistentHash = createConsistentHash(configuration, members);
+      self = t.getAddress();
+      if (members.size() > 1 && !t.getCoordinator().equals(self)) {
          JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
-         rehashExecutor.submit(joinTask);
+         joinFuture = rehashExecutor.submit(joinTask);
       } else {
          joinComplete = true;
       }
@@ -278,7 +289,7 @@
          return new LinkedList<Address>(consistentHash.getCaches());
       } else {
          if (trace)
-            log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+            log.trace("Not alowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
          return null;
       }
    }
@@ -321,6 +332,7 @@
    }
 
    public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
+      if (trace) log.trace("Apply state with " + state);
       for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
          if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
             InternalCacheValue v = e.getValue();

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-05-11 12:56:39 UTC (rev 1777)
@@ -73,6 +73,7 @@
 
    protected void performRehash() throws Exception {
       long start = System.currentTimeMillis();
+      boolean trace = log.isTraceEnabled();
       if (log.isDebugEnabled()) log.debug("Commencing");
       boolean unlocked = false;
       try {
@@ -85,7 +86,7 @@
          Random rand = new Random();
          long giveupTime = System.currentTimeMillis() + maxWaitTime;
          do {
-            if (log.isTraceEnabled()) log.trace("Requesting old consistent hash from coordinator");
+            if (trace) log.trace("Requesting old consistent hash from coordinator");
             List<Response> resp;
             List<Address> addresses;
             try {
@@ -103,7 +104,7 @@
             if (addresses == null) {
                long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
                time = (time * 10) + minSleepTime;
-               if (log.isTraceEnabled()) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
+               if (trace) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
                Thread.sleep(time); // sleep for a while and retry
             } else {
                chOld = createConsistentHash(configuration, addresses);
@@ -147,6 +148,8 @@
 
             // 8.  Drain logs
             dmi.drainLocalTransactionLog();
+         } else {
+            if (trace) log.trace("Rehash not enabled, so not pulling state.");
          }
          unlocked = true;
 
@@ -163,7 +166,7 @@
             invalidateInvalidHolders(chOld, chNew);
          }
 
-         if (log.isInfoEnabled())
+         if (trace)
             log.info("{0} completed join in {1}!", self, Util.prettyPrintTime(System.currentTimeMillis() - start));
       } catch (Exception e) {
          log.error("Caught exception!", e);

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -26,7 +26,10 @@
       this.workerThreads = workerThreads
       this.cacheManager = cacheManager
 
+      // Register rank calculator before starting any cache so that we can capture all view changes
       cacheManager.addListener(getRankCalculatorListener)
+      // Start default cache
+      startDefaultCache
       val address =  new InetSocketAddress(host, port)
       val encoder = getEncoder
       val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
@@ -45,4 +48,5 @@
 
    def getPort = port
 
+   def startDefaultCache = cacheManager.getCache
 }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -1,6 +1,5 @@
 package org.infinispan.server.hotrod
 
-import org.infinispan.Cache
 import org.infinispan.stats.Stats
 import org.infinispan.server.core._
 import transport._
@@ -10,13 +9,13 @@
 import org.infinispan.server.hotrod.ProtocolFlag._
 import org.infinispan.server.hotrod.OperationResponse._
 import java.nio.channels.ClosedChannelException
+import org.infinispan.{CacheException, Cache}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 class HotRodDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[CacheKey, CacheValue] {
    import HotRodDecoder._
    
@@ -65,9 +64,13 @@
    }
 
    override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
-      // TODO: Document this in wiki
-      if (header.cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
-      else cacheManager.getCache(header.cacheName)
+      // TODO: Document DefaultCacheManager.DEFAULT_CACHE_NAME usage in wiki
+      val cacheName = header.cacheName
+      if (cacheName != DefaultCacheManager.DEFAULT_CACHE_NAME && !cacheManager.getCacheNames.contains(cacheName))
+         throw new CacheNotFoundException("Cache with name '" + cacheName + "' not found amongst the configured caches")
+      
+      if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
+      else cacheManager.getCache(cacheName)
    }
 
    override def readKey(h: HotRodHeader, b: ChannelBuffer): CacheKey =
@@ -151,4 +154,6 @@
          .append("messageId=").append(messageId)
          .append("}").toString
    }
-}
\ No newline at end of file
+}
+
+class CacheNotFoundException(msg: String) extends CacheException(msg)
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -19,7 +19,6 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-
 class HotRodServer extends AbstractProtocolServer("HotRod") with Logging {
    import HotRodServer._
    private var isClustered: Boolean = _
@@ -34,6 +33,10 @@
 
    override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
       super.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+      // Start defined caches to avoid issues with lazily started caches
+      for (cacheName <- asIterator(cacheManager.getCacheNames.iterator))
+         cacheManager.getCache(cacheName)
+
       isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null
       // If clustered, set up a cache for topology information
       if (isClustered)

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodDistributionTest.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -11,6 +11,7 @@
 import collection.mutable.ListBuffer
 import org.infinispan.distribution.UnionConsistentHash
 import org.infinispan.test.TestingUtil
+import org.infinispan.test.AbstractCacheTest._
 
 /**
  * // TODO: Document this
@@ -20,8 +21,6 @@
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodDistributionTest")
 class HotRodDistributionTest extends HotRodMultiNodeTest {
 
-   import HotRodServer._
-
    override protected def cacheName: String = "hotRodDistSync"
 
    override protected def createCacheConfig: Configuration = getDefaultClusteredConfig(CacheMode.DIST_SYNC)

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -52,6 +52,11 @@
       assertTrue(Arrays.equals(value.data, v(m)));
    }
 
+//   def testPutOnUndefinedCache(m: Method) {
+//      val status = client.execute(0xA0, 0x01, "boomooo", k(m), 0, 0, v(m), 0, 1, 0).status
+//      assertEquals(status, ServerError, "Status should have been 'ServerError' but instead was: " + status)
+//   }
+
    def testPutWithLifespan(m: Method) {
       client.assertPut(m, 1, 0)
       Thread.sleep(1100)
@@ -294,7 +299,7 @@
       for (i <- 1 to 5) {
          val key = k(m, "k" + i + "-");
          val value = v(m, "v" + i + "-");
-         assertStatus(client.put(key , 0, 0, value).status, Success)
+         assertStatus(client.put(key, 0, 0, value).status, Success)
          assertStatus(client.containsKey(key, 0).status, Success)
       }
 

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -7,13 +7,13 @@
 import org.testng.Assert._
 import org.testng.annotations.Test
 import org.infinispan.test.TestingUtil
+import org.infinispan.test.AbstractCacheTest._
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodReplicationTest")
 class HotRodReplicationTest extends HotRodMultiNodeTest {
 

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -24,6 +24,7 @@
    
    override def createCacheManager: CacheManager = {
       val cacheManager = createTestCacheManager
+      cacheManager.defineConfiguration(cacheName, cacheManager.getDefaultConfiguration)
       advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
       hotRodServer = createStartHotRodServer(cacheManager)
       hotRodClient = connectClient

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -19,17 +19,15 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
-class MemcachedDecoder(cacheManager: CacheManager, scheduler: ScheduledExecutorService)
+class MemcachedDecoder(cache: Cache[String, MemcachedValue], scheduler: ScheduledExecutorService)
       extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
    import RequestResolver._
 
    type SuitableParameters = MemcachedParameters
    type SuitableHeader = RequestHeader
 
-   private lazy val cache = createCache
    private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
    private final val incrMisses = new AtomicLong(0)
    private final val incrHits = new AtomicLong(0)
@@ -155,8 +153,6 @@
 
    override def getCache(h: RequestHeader): Cache[String, MemcachedValue] = cache
 
-   protected def createCache: Cache[String, MemcachedValue] = cacheManager.getCache[String, MemcachedValue]
-
    override def handleCustomRequest(h: RequestHeader, b: ChannelBuffer, cache: Cache[String, MemcachedValue]): AnyRef = {
       h.op match {
          case AppendRequest | PrependRequest => {

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -8,16 +8,15 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 class MemcachedServer extends AbstractProtocolServer("Memcached") {
 
    protected lazy val scheduler = Executors.newScheduledThreadPool(1)
 
    override def getEncoder: Encoder = null
 
-   override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager, scheduler)
+   override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager.getCache[String, MemcachedValue], scheduler)
 
    override def stop {
       super.stop

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -8,13 +8,13 @@
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
 import net.spy.memcached.{CASResponse, MemcachedClient}
+import org.infinispan.test.AbstractCacheTest._
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 @Test(groups = Array("functional"), testName = "server.memcached.MemcachedReplicationTest")
 class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
    private val cacheName = "MemcachedReplSync"

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-05-11 12:53:28 UTC (rev 1776)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-05-11 12:56:39 UTC (rev 1777)
@@ -12,9 +12,8 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 trait MemcachedTestingUtil {
    def host = "127.0.0.1"
 
@@ -48,11 +47,10 @@
 
    def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
       val server = new MemcachedServer {
-         override def getDecoder: Decoder = {
-            new MemcachedDecoder(getCacheManager, scheduler) {
-               override def createCache = getCacheManager.getCache[String, MemcachedValue](cacheName)
-            }
-         }
+         override def getDecoder: Decoder =
+            new MemcachedDecoder(getCacheManager.getCache[String, MemcachedValue](cacheName), scheduler)
+
+         override def startDefaultCache = getCacheManager.getCache(cacheName)
       }
       server.start(host, port, cacheManager, 0, 0, 0)
       server



More information about the infinispan-commits mailing list