[infinispan-commits] Infinispan SVN: r1670 - in trunk: core/src/test/java/org/infinispan/test and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 8 09:49:35 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-04-08 09:49:34 -0400 (Thu, 08 Apr 2010)
New Revision: 1670

Modified:
   trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java
   trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java
   trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
Log:
[ISPN-388] (Cache view rank on a ViewChange listener for Hot Rod version generation) Done.

Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java	2010-04-08 13:49:34 UTC (rev 1670)
@@ -18,7 +18,7 @@
  * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
  * @since 4.0
  */
- at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, RemoteCommandsFactory.class, TransactionTable.class, GlobalTransactionFactory.class})
+ at DefaultFactoryFor(classes = {InboundInvocationHandler.class, RemoteCommandsFactory.class, TransactionTable.class, GlobalTransactionFactory.class})
 @Scope(Scopes.GLOBAL)
 public class EmptyConstructorFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
    public <T> T construct(Class<T> componentType) {

Modified: trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java	2010-04-08 13:49:34 UTC (rev 1670)
@@ -11,6 +11,8 @@
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.lifecycle.ModuleLifecycle;
 import org.infinispan.manager.CacheManager;
+import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
+import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifierImpl;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -58,6 +60,7 @@
          registerComponent(cacheManager, CacheManager.class);
          registerComponent(configuration, GlobalConfiguration.class);
          registerComponent(new CacheManagerJmxRegistration(), CacheManagerJmxRegistration.class);
+         registerComponent(new CacheManagerNotifierImpl(), CacheManagerNotifier.class);
       }
       catch (Exception e) {
          throw new CacheException("Unable to construct a GlobalComponentRegistry!", e);

Modified: trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-04-08 13:49:34 UTC (rev 1670)
@@ -56,7 +56,7 @@
          createCacheManagers();
       } catch (Throwable th) {
          th.printStackTrace();
-         log.error("Error in test setup: " + th);
+         log.error("Error in test setup: ", th);
          throw th;
       }
    }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -190,12 +190,7 @@
 
    protected def generateVersion(cache: Cache[K, V]): Long = {
       val rpcManager = cache.getAdvancedCache.getRpcManager
-      if (rpcManager != null) {
-         val transport = rpcManager.getTransport
-         newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
-      } else {
-         newVersion(None, None, 0)
-      }
+      newVersion(rpcManager != null)
    }
 
    /**

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -4,6 +4,7 @@
 import transport.netty.{EncoderAdapter, NettyTransport}
 import transport.{Decoder, Encoder, Transport}
 import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.VersionGenerator._
 
 /**
  * // TODO: Document this
@@ -27,6 +28,7 @@
       this.workerThreads = workerThreads
       this.cacheManager = cacheManager
 
+      cacheManager.addListener(getRankCalculatorListener)
       encoder = getEncoder
       // TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
       // TODO: ... requests such as when the lenght of data is bigger than the expected data itself.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -1,11 +1,16 @@
 package org.infinispan.server.core
 
 import org.infinispan.remoting.transport.Address
-import org.infinispan.Cache
-import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.notifications.Listener
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import scala.collection.JavaConversions._
 
 /**
- * // TODO: Document this
+ * This class generates version numbers to be stored with cache values whenever a value is created or modified.
+ * This version can later be queried by clients and used to guarantee that modifications are atomic.
+ *
  * @author Galder Zamarreño
  * @since 4.1
  */
@@ -13,21 +18,39 @@
 
    private val versionCounter = new AtomicInteger
 
-   def newVersion(address: Option[Address], members: Option[Iterable[Address]], viewId: Long): Long = {
+   private val versionPrefix = new AtomicLong
+
+   def newVersion(isClustered: Boolean): Long = {
+      if (isClustered && versionPrefix.get == 0)
+         throw new IllegalStateException("If clustered, Version prefix cannot be 0. Rank calculator probably not in use.")
       val counter = versionCounter.incrementAndGet
-      var version: Long = counter
-      if (address != None && members != None) {
-         // todo: perf tip: cache rank and viewId as a volatile and recalculate with each view change
-         val rank: Long = findAddressRank(address.get, members.get, 1)
-         version = (rank << 32) | version
-         version = (viewId << 48) | version
-      }
-      version
+      if (isClustered) versionPrefix.get | counter else counter
    }
 
+   def getRankCalculatorListener = RankCalculator
+
+   private[core] def resetCounter {
+      versionCounter.compareAndSet(versionCounter.get, 0)
+   }
+
    private def findAddressRank(address: Address, members: Iterable[Address], rank: Int): Int = {
       if (address.equals(members.head)) rank
       else findAddressRank(address, members.tail, rank + 1)
    }
-   
-}
\ No newline at end of file
+
+   @Listener
+   object RankCalculator extends Logging {
+      @ViewChanged
+      def calculateRank(e: ViewChangedEvent) {
+         val rank = calculateRank(e.getLocalAddress, asIterable(e.getNewMembers), e.getViewId)
+         trace("Calculated rank based on view {0} and result was {1}", e, rank)
+      }
+
+      private[core] def calculateRank(address: Address, members: Iterable[Address], viewId: Long): Long = {
+         val rank: Long = findAddressRank(address, members, 1)
+         val newVersionPrefix = (viewId << 48) | (rank << 32)
+         versionPrefix.compareAndSet(versionPrefix.get, newVersionPrefix)
+         versionPrefix.get
+      }
+   }
+}

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -1,15 +1,14 @@
 package org.infinispan.server.core.transport.netty
 
 import java.net.SocketAddress
-import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
+import org.jboss.netty.channel.group.DefaultChannelGroup
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
 import java.util.concurrent.atomic.AtomicInteger
-import org.jboss.netty.channel.{ChannelUpstreamHandler, ChannelDownstreamHandler, ChannelFactory, ChannelPipelineFactory}
+import org.jboss.netty.channel.ChannelDownstreamHandler
 import org.jboss.netty.bootstrap.ServerBootstrap
-import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
+import java.util.concurrent.{Executors, ThreadFactory}
 import org.infinispan.server.core.transport.Transport
 import scala.collection.JavaConversions._
-import org.infinispan.manager.CacheManager
 import org.infinispan.server.core.{ProtocolServer, Logging}
 import org.jboss.netty.util.{ThreadNameDeterminer, ThreadRenamingRunnable}
 

Modified: trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
===================================================================
--- trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -14,12 +14,17 @@
 class VersionGeneratorTest {
 
    def testGenerateVersion {
+      resetCounter
       val addr1 = new TestAddress(1)
       val addr2 = new TestAddress(2)
       val addr3 = new TestAddress(1)
       val members = List(addr1, addr2, addr3)
-      assertEquals(newVersion(Some(addr2), Some(members), 1), 0x1000200000001L)
+      RankCalculator.calculateRank(addr2, members, 1)
+      assertEquals(newVersion(true), 0x1000200000001L)
+      assertEquals(newVersion(true), 0x1000200000002L)
+      assertEquals(newVersion(true), 0x1000200000003L)
    }
+
 }
 
 class TestAddress(val addressNum: Int) extends Address {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -22,8 +22,12 @@
 
    @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
    override def createCacheManagers {
-      var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
-      createClusteredCaches(2, cacheName, replSync)
+      var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+      config.setFetchInMemoryState(true)
+      for (i <- 0 until 2) {
+         val cm = addClusterEnabledCacheManager()
+         cm.defineConfiguration(cacheName, config)
+      }
       servers = startHotRodServer(cacheManagers.get(0)) :: servers
       servers = startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50) :: servers
       servers.foreach {s =>

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -287,17 +287,18 @@
       t match {
          case se: ServerException => {
             se.getCause match {
-               case uoe: UnknownOperationException => ERROR
-               case cce: ClosedChannelException => null// no-op, only log
+               case u: UnknownOperationException => ERROR
+               case c: ClosedChannelException => null // no-op, only log
                case _ => {
                   t match {
-                     case ioe: IOException => sb.append("CLIENT_ERROR ")
+                     case i: IOException => sb.append("CLIENT_ERROR ")
                      case _ => sb.append("SERVER_ERROR ")
                   }
                   sb.append(t).append(CRLF)
                }
             }
          }
+         case c: ClosedChannelException => null // no-op, only log
          case _ => sb.append("SERVER_ERROR ").append(t).append(CRLF)
       }
    }

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-04-08 13:49:34 UTC (rev 1670)
@@ -24,8 +24,12 @@
 
    @Test(enabled = false) // Disable explicitly to avoid TestNG thinking this is a test!!
    override def createCacheManagers {
-      var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
-      createClusteredCaches(2, cacheName, replSync)
+      var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+      config.setFetchInMemoryState(true)
+      for (i <- 0 until 2) {
+         val cm = addClusterEnabledCacheManager()
+         cm.defineConfiguration(cacheName, config)
+      }
       servers = startMemcachedTextServer(cacheManagers.get(0), cacheName) :: servers
       servers = startMemcachedTextServer(cacheManagers.get(1), servers.head.getPort + 50, cacheName) :: servers
       servers.foreach(s => clients = createMemcachedClient(60000, s.getPort) :: clients)



More information about the infinispan-commits mailing list