[infinispan-commits] Infinispan SVN: r1670 - in trunk: core/src/test/java/org/infinispan/test and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 8 09:49:35 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-04-08 09:49:34 -0400 (Thu, 08 Apr 2010)
New Revision: 1670
Modified:
trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java
trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java
trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
Log:
[ISPN-388] (Cache view rank on a ViewChange listener for Hot Rod version generation) Done.
Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorFactory.java 2010-04-08 13:49:34 UTC (rev 1670)
@@ -18,7 +18,7 @@
* @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
* @since 4.0
*/
- at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, RemoteCommandsFactory.class, TransactionTable.class, GlobalTransactionFactory.class})
+ at DefaultFactoryFor(classes = {InboundInvocationHandler.class, RemoteCommandsFactory.class, TransactionTable.class, GlobalTransactionFactory.class})
@Scope(Scopes.GLOBAL)
public class EmptyConstructorFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
public <T> T construct(Class<T> componentType) {
Modified: trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/main/java/org/infinispan/factories/GlobalComponentRegistry.java 2010-04-08 13:49:34 UTC (rev 1670)
@@ -11,6 +11,8 @@
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.lifecycle.ModuleLifecycle;
import org.infinispan.manager.CacheManager;
+import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
+import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifierImpl;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -58,6 +60,7 @@
registerComponent(cacheManager, CacheManager.class);
registerComponent(configuration, GlobalConfiguration.class);
registerComponent(new CacheManagerJmxRegistration(), CacheManagerJmxRegistration.class);
+ registerComponent(new CacheManagerNotifierImpl(), CacheManagerNotifier.class);
}
catch (Exception e) {
throw new CacheException("Unable to construct a GlobalComponentRegistry!", e);
Modified: trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2010-04-08 13:49:34 UTC (rev 1670)
@@ -56,7 +56,7 @@
createCacheManagers();
} catch (Throwable th) {
th.printStackTrace();
- log.error("Error in test setup: " + th);
+ log.error("Error in test setup: ", th);
throw th;
}
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -190,12 +190,7 @@
protected def generateVersion(cache: Cache[K, V]): Long = {
val rpcManager = cache.getAdvancedCache.getRpcManager
- if (rpcManager != null) {
- val transport = rpcManager.getTransport
- newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
- } else {
- newVersion(None, None, 0)
- }
+ newVersion(rpcManager != null)
}
/**
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -4,6 +4,7 @@
import transport.netty.{EncoderAdapter, NettyTransport}
import transport.{Decoder, Encoder, Transport}
import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.VersionGenerator._
/**
* // TODO: Document this
@@ -27,6 +28,7 @@
this.workerThreads = workerThreads
this.cacheManager = cacheManager
+ cacheManager.addListener(getRankCalculatorListener)
encoder = getEncoder
// TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
// TODO: ... requests such as when the lenght of data is bigger than the expected data itself.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -1,11 +1,16 @@
package org.infinispan.server.core
import org.infinispan.remoting.transport.Address
-import org.infinispan.Cache
-import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.notifications.Listener
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import scala.collection.JavaConversions._
/**
- * // TODO: Document this
+ * This class generates version numbers to be stored with cache values whenever a value is created or modified.
+ * This version can later be queried by clients and used to guarantee that modifications are atomic.
+ *
* @author Galder Zamarreño
* @since 4.1
*/
@@ -13,21 +18,39 @@
private val versionCounter = new AtomicInteger
- def newVersion(address: Option[Address], members: Option[Iterable[Address]], viewId: Long): Long = {
+ private val versionPrefix = new AtomicLong
+
+ def newVersion(isClustered: Boolean): Long = {
+ if (isClustered && versionPrefix.get == 0)
+ throw new IllegalStateException("If clustered, Version prefix cannot be 0. Rank calculator probably not in use.")
val counter = versionCounter.incrementAndGet
- var version: Long = counter
- if (address != None && members != None) {
- // todo: perf tip: cache rank and viewId as a volatile and recalculate with each view change
- val rank: Long = findAddressRank(address.get, members.get, 1)
- version = (rank << 32) | version
- version = (viewId << 48) | version
- }
- version
+ if (isClustered) versionPrefix.get | counter else counter
}
+ def getRankCalculatorListener = RankCalculator
+
+ private[core] def resetCounter {
+ versionCounter.compareAndSet(versionCounter.get, 0)
+ }
+
private def findAddressRank(address: Address, members: Iterable[Address], rank: Int): Int = {
if (address.equals(members.head)) rank
else findAddressRank(address, members.tail, rank + 1)
}
-
-}
\ No newline at end of file
+
+ @Listener
+ object RankCalculator extends Logging {
+ @ViewChanged
+ def calculateRank(e: ViewChangedEvent) {
+ val rank = calculateRank(e.getLocalAddress, asIterable(e.getNewMembers), e.getViewId)
+ trace("Calculated rank based on view {0} and result was {1}", e, rank)
+ }
+
+ private[core] def calculateRank(address: Address, members: Iterable[Address], viewId: Long): Long = {
+ val rank: Long = findAddressRank(address, members, 1)
+ val newVersionPrefix = (viewId << 48) | (rank << 32)
+ versionPrefix.compareAndSet(versionPrefix.get, newVersionPrefix)
+ versionPrefix.get
+ }
+ }
+}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -1,15 +1,14 @@
package org.infinispan.server.core.transport.netty
import java.net.SocketAddress
-import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
+import org.jboss.netty.channel.group.DefaultChannelGroup
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import java.util.concurrent.atomic.AtomicInteger
-import org.jboss.netty.channel.{ChannelUpstreamHandler, ChannelDownstreamHandler, ChannelFactory, ChannelPipelineFactory}
+import org.jboss.netty.channel.ChannelDownstreamHandler
import org.jboss.netty.bootstrap.ServerBootstrap
-import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
+import java.util.concurrent.{Executors, ThreadFactory}
import org.infinispan.server.core.transport.Transport
import scala.collection.JavaConversions._
-import org.infinispan.manager.CacheManager
import org.infinispan.server.core.{ProtocolServer, Logging}
import org.jboss.netty.util.{ThreadNameDeterminer, ThreadRenamingRunnable}
Modified: trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
===================================================================
--- trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -14,12 +14,17 @@
class VersionGeneratorTest {
def testGenerateVersion {
+ resetCounter
val addr1 = new TestAddress(1)
val addr2 = new TestAddress(2)
val addr3 = new TestAddress(1)
val members = List(addr1, addr2, addr3)
- assertEquals(newVersion(Some(addr2), Some(members), 1), 0x1000200000001L)
+ RankCalculator.calculateRank(addr2, members, 1)
+ assertEquals(newVersion(true), 0x1000200000001L)
+ assertEquals(newVersion(true), 0x1000200000002L)
+ assertEquals(newVersion(true), 0x1000200000003L)
}
+
}
class TestAddress(val addressNum: Int) extends Address {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -22,8 +22,12 @@
@Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
override def createCacheManagers {
- var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
- createClusteredCaches(2, cacheName, replSync)
+ var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+ config.setFetchInMemoryState(true)
+ for (i <- 0 until 2) {
+ val cm = addClusterEnabledCacheManager()
+ cm.defineConfiguration(cacheName, config)
+ }
servers = startHotRodServer(cacheManagers.get(0)) :: servers
servers = startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50) :: servers
servers.foreach {s =>
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -287,17 +287,18 @@
t match {
case se: ServerException => {
se.getCause match {
- case uoe: UnknownOperationException => ERROR
- case cce: ClosedChannelException => null// no-op, only log
+ case u: UnknownOperationException => ERROR
+ case c: ClosedChannelException => null // no-op, only log
case _ => {
t match {
- case ioe: IOException => sb.append("CLIENT_ERROR ")
+ case i: IOException => sb.append("CLIENT_ERROR ")
case _ => sb.append("SERVER_ERROR ")
}
sb.append(t).append(CRLF)
}
}
}
+ case c: ClosedChannelException => null // no-op, only log
case _ => sb.append("SERVER_ERROR ").append(t).append(CRLF)
}
}
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-04-07 15:06:42 UTC (rev 1669)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-04-08 13:49:34 UTC (rev 1670)
@@ -24,8 +24,12 @@
@Test(enabled = false) // Disable explicitly to avoid TestNG thinking this is a test!!
override def createCacheManagers {
- var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
- createClusteredCaches(2, cacheName, replSync)
+ var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+ config.setFetchInMemoryState(true)
+ for (i <- 0 until 2) {
+ val cm = addClusterEnabledCacheManager()
+ cm.defineConfiguration(cacheName, config)
+ }
servers = startMemcachedTextServer(cacheManagers.get(0), cacheName) :: servers
servers = startMemcachedTextServer(cacheManagers.get(1), servers.head.getPort + 50, cacheName) :: servers
servers.foreach(s => clients = createMemcachedClient(60000, s.getPort) :: clients)
More information about the infinispan-commits
mailing list