About ISPN-200 Distributed Queries
by Israel Lacerra
I'm studying ISPN-200 cause I thinking about resolve this issue in my M. Sc.
topic. About this, I want to make a couple of questions (and maybe they
don't make sense):
- Currently, If we have "-Dinfinispan.query.indexLocalOnly=true" the indexes
are just local, right? And if "-Dinfinispan.query.indexLocalOnly=false", the
indexes are global shared. Am I right?
- So, how ISPN-200 will work on this two possibilities?
thanks!
Israel Lacerra
14 years, 10 months
Latest jclouds release hardens cloud provisioning and adds clojure support
by Adrian Cole
The jclouds ecosystem has been extremely busy the last 3 months. We have a
lot to talk about with our latest beta
(1.0-beta-5<http://code.google.com/p/jclouds/downloads/list>)
of jclouds. Besides performance and bug fixes, there are a lot of new
features.
- Clojure support, compatible with 1.1 and 1.2 runtimes.
- Refined ComputeService API, including better errors, links between
nodes and their images, and support for running scripts across your nodes.
- Location compatibility across BlobStore and ComputeService. This allows
you to designate and query geo-collocation of resources, which saves time
and bandwidth.
- More services such as full support for GoGrid and experimental support
for BlueLock vCloud.
Coming next, we'll support portable volume management, across at least EBS
and vCloud, and features you vote
on<http://code.google.com/p/jclouds/issues/list>
.
Please see our blog for a more detailed review of
beta-5<http://anyweight.blogspot.com/2010/05/new-jclouds-hardens-cloud-provision...>
Many thanks to the growing community of beta testers and collaborators!
All the best!
-Adrian
founder jclouds
14 years, 10 months
Re: [infinispan-dev] Fwd: Stale data read when L1 invalidation happens while UnionConsistentHash is in use
by galder@jboss.org
See below:
----- "Manik Surtani" <manik(a)jboss.org> wrote:
> On 3 May 2010, at 08:51, Galder Zamarreno wrote:
>
> > Resending without log until the message is approved.
> >
> > --
> > Galder Zamarreño
> > Sr. Software Engineer
> > Infinispan, JBoss Cache
> >
> > ----- Forwarded Message -----
> > From: galder(a)redhat.com
> > To: "infinispan -Dev List" <infinispan-dev(a)lists.jboss.org>
> > Sent: Friday, April 30, 2010 6:30:05 PM GMT +01:00 Amsterdam /
> Berlin / Bern / Rome / Stockholm / Vienna
> > Subject: Stale data read when L1 invalidation happens while
> UnionConsistentHash is in use
> >
> > Hi,
> >
> > I've spent all day chasing down a random Hot Rod testsuite failure
> related to distribution. This is the last hurdle to close
> https://jira.jboss.org/jira/browse/ISPN-411. In
> HotRodDistributionTest, which is still to be committed, I test adding
> a new node, doing a put on this node, and then doing a get in a
> different node and making sure that I get what was put. The test
> randomly fails saying that the get returns the old value. The failure
> is nothing to do with Hot Rod itself but rather a race condition where
> union consistent hash is used. Let me explain:
> >
> > 1. An earlier operation had set
> "k-testDistributedPutWithTopologyChanges" key to
> "v5-testDistributedPutWithTopologyChanges".
> > 2. Start a new hot rod server in eq-7969.
> > 2. eq-7969 node calls a put on that key with
> "v6-testDistributedPutWithTopologyChanges". Recipients for the put
> are: eq-7969 and eq-61332.
> > 3. eq-7969 sends an invalidate L1 to all, including eq-13415
> > 4. eq-13415 should invalidate
> "k-testDistributedPutWithTopologyChanges" but it doesn't, since it
> considers that "k-testDistributedPutWithTopologyChanges" is local to
> eq-13415:
> >
> > 2010-04-30 18:02:19,907 6046 TRACE
> [org.infinispan.distribution.DefaultConsistentHash]
> (OOB-2,Infinispan-Cluster,eq-13415:) Hash code for key
> CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45,
> 116, 101, 115, 116, 68, 105, 115, 116, ..]}} is 344897059
> > 2010-04-30 18:02:19,907 6046 TRACE
> [org.infinispan.distribution.DefaultConsistentHash]
> (OOB-2,Infinispan-Cluster,eq-13415:) Candidates for key
> CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45,
> 116, 101, 115, 116, 68, 105, 115, 116, ..]}} are {5458=eq-7969,
> 6831=eq-61332}
> > 2010-04-30 18:02:19,907 6046 TRACE
> [org.infinispan.distribution.DistributionManagerImpl]
> (OOB-2,Infinispan-Cluster,eq-13415:) Is local
> CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45,
> 116, 101, 115, 116, 68, 105, 115, 116, ..]}} to eq-13415 query returns
> true and consistentHash is
> org.infinispan.distribution.UnionConsistentHash@10747b4
> >
> > This is a log with log messages that I added to debug it. The key
> factor here is that UnionConsistentHash is in use, probably due to
> rehashing not having fully finished.
> >
> > 5. The end result is that a read of
> "k-testDistributedPutWithTopologyChanges" in eq-13415 returns
> "v5-testDistributedPutWithTopologyChanges".
> >
> > I thought that maybe we could be more conservative here and if
> rehashing is in progress (or UnionConsistentHash is in use) invalidate
> regardless. Assuming that a put always follows an invalidation in
> distribution and not viceversa, that would be fine. The only downside
> is that you'd be invalidating too much but put would replace the data
> in the node where invalidation should not have happened but it did, so
> not a problem.
> >
> > Thoughts? Alternatively, maybe I need to shape my test so that I
> wait for rehashing to finish, but the problem would still be there.
>
> Yes, this seems to be a bug with concurrent rehashing and invalidation
> rather than HotRod.
>
> Could you modify your test to so the following:
>
> 1. start 2 caches C1 and C2.
> 2. put a key K such that K maps on to C1 and C2
> 3. add a new node, C3. K should now map to C1 and C3.
> 4. Modify the value on C1 *before* rehashing completes.
> 5. See if we see the stale value on C2.
>
> To do this you would need a custom object for K that hashes the way
> you would expect (this could be hardcoded) and a value which blocks
> when serializing so we can control how long rehashing takes.
Since logical addresses are used underneath and these change from one run to the other, I'm not sure how I can generate such key programatically. It's even more complicated to figure out a key that will later, when C3 starts, map to it. Without having these addresses locked somehow, or their hash codes, I can't see how this is doable. IOW, to be able to do this, I need to mock these addresses into giving fixed as hash codes. I'll dig further into this.
>
> I never promised the test would be simple! :)
>
> Cheers
> Manik
> --
> Manik Surtani
> manik(a)jboss.org
> Lead, Infinispan
> Lead, JBoss Cache
> http://www.infinispan.org
> http://www.jbosscache.org
>
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
14 years, 10 months
AtomicMapCache in transactional mode returning null
by galder@redhat.com
Hi,
This is related to https://community.jboss.org/message/541023#541023 thread. Once https://jira.jboss.org/jira/browse/ISPN-418 fails, another issue appears whereby a null delta is written to the cache store. The reason for this is that the code in testRestoreTransactionalAtomicMap actually generates two modifications in the transaction.
The first one comes from the putIfAbsent below that can be found in AtomicHashMap:
public static AtomicHashMap newInstance(Cache cache, Object cacheKey) {
AtomicHashMap value = new AtomicHashMap();
Object oldValue = cache.putIfAbsent(cacheKey, value);
if (oldValue != null) value = (AtomicHashMap) oldValue;
return value;
}
And the second modification comes from AtomicHashMapProxy.getDeltaMapForWrite call by the code below. This comes from the map put call:
public V put(K key, V value) {
try {
startAtomic();
InvocationContext ctx = icc.createInvocationContext();
AtomicHashMap<K, V> deltaMapForWrite = getDeltaMapForWrite(ctx);
return deltaMapForWrite.put(key, value);
}
finally {
endAtomic();
}
}
On one side, these two modifications do exactly the same, a put with the atomic hash map, so it's not very efficient. One put would be enough for the test to work. However, the reason the test fails is after writing the first modification via:
public void writeObject(ObjectOutput output, Object subject) throws IOException {
DeltaAware dw = (DeltaAware) subject;
output.writeObject(dw.delta());
}
dw.delta() call resets the delta and sets it to null. So, when the second modification is written, delta is null and that's the final value written. Obviously, when data is read, null delta is retrieved and hence the test fails.
So IMO, one of the two modifications must go and my vote is for the one in newInstance. I don't really understand why we should put when creating a new instance of AtomicHashMap. It makes more sense to do the put when you actually modify the map via a put call for example. Manik, any idea why you added the putIfAbsent to newInstance?
As a side note, and this might be related, the javadoc of newInstance needs updating since AtomicMapCache does not exist. It should probably refer to AtomicMapLookup instead of AtomicMapCache.
/**
* Construction only allowed through this factory method. This factory is intended for use internally by the
* CacheDelegate. User code should use {@link org.infinispan.atomic.AtomicMapCache#getAtomicMap(Object)}.
*/
Cheers,
--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache
14 years, 10 months
Re: [infinispan-dev] Infinispan within JBossAS
by galder@jboss.org
Hi Dimitris,
See inline:
----- "Manik Surtani" <manik(a)jboss.org> wrote:
> On 29 Apr 2010, at 19:00, Dimitris Andreadis wrote:
>
> > Hi guys,
> >
> > I'll be doing a short talk about JBoss 6 & Infinispan at a local
> conference:
> >
> > "This presentation introduces Infinispan, the new underlying data
> caching and replication
> > infrastructure included with the upcoming JBoss AS 6. Infinispan is
> an Open Source library
> > that can be used independently of JBoss AS to let you build dynamic
> and highly available
> > data grids that scale to the order of thousands, while offering a
> large number of enterprise
> > features."
> >
> > I need some help to identify the key benefits of using Infinispan in
> the context of AS.
> >
> > - Why is it going to be better from JBoss Cache?
> > - What are the major use cases we are addressing?
>
> From a JBossAS perspective (for now at least) I believe it is all of
> the same stuff JBoss Cache did: EJB and HTTP session replication, JPA
> 2nd Level Cache.
>
> > The obvious ones are
> >
> > - smaller memory footprint
> > - faster
>
> Both of the above, but also greater scalability (larger clusters),
> lower overhead when new nodes join existing clusters (--> faster
> startup)
>
> > But I need more, in terms of how it will affect
> >
> > - session/sfsb replication
> > - jpa/entity caching invalidation, 2nd level caching
>
> Nothing new in terms of "features" (over what JBoss Cache offered),
> except perhaps finer grained control over eviction settings for the
> JPA 2nd Level Cache. Galder can comment more here.
The biggest internal change 2nd level caching is that each entity/collection type is stored in its own lightweight cache instance. Before with JBoss Cache, all entity/collections shared the same cache instance. As Manik hinted, the most visible change from a user perspective is that you can define eviction settings or a per entity/collection type much more easily than you could with JBoss Cache based 2nd level cache. Before, you needed to know the internal JBoss Cache tree structure to be able to define these correctly and you needed to modify the actual JBoss Cache configuration file. With Infinispan, no need to modify infinispan configuration any more, you can define evictions directly in your hibernate/ejb3 descriptor and the actual configuration is much more intuitive:
http://community.jboss.org/wiki/usinginfinispanasjpahibernatesecondlevelc...
>
> > - other?
>
> The ability for people to use a data grid directly, in their
> applications, as an alternative data store.
>
> > Beyond the "standard" usage of Infinispan replacing JBossCache in
> AS, I need usecases of
> > datagrids used in the context of AS deployments. Is it going to be
> used primarily as a
> > read-mostly cache? Do you have some examples?
>
> Some typical scenarios would be to front "expensive" data stores, web
> services, etc., in which case you are looking at a read-mostly cache.
> But other use cases also include a complete datastore replacement.
> Fast, scalable, transactional, in-memory durable datastore.
>
> > I'm trying to imagine applications that would really benefit by the
> JBossAS/Infinispan
> > combination when deployed in the cloud in really large numbers (e.g.
> hundred of AS
> > instances). Would a special architecture design would need to be
> used in this case?
>
> Depends on the level if durability expected. For example, tuning your
> cache store for overflow/persistence is one area where you can
> exercise the tradeoff between performance and durability. Sync or
> async network communications come into play as well.
>
> > Finally, do you have some standalone Infinispan usage examples?
>
> Yes - a lot of what I have seen involve grid-based applications that
> farm out tasks to a grid of processor nodes that also need access to
> data. Traditional data stores become bottlenecks very quickly in such
> cases, so storing data in a distributed fashion is key. Having data
> locality and storing stuff in memory (low latency) are added pluses.
>
>
> > I think it will be great if we can associate/match Infinispan
> (within AS) to real people
> > problems and usecases.
>
> It will primarily centre around scalability/fast, low-latency data
> access. There will also be the ability to stack "tiers" of data grids
> with the client/server stuff we're doing. So this gives you access to
> large amounts of storage space.
>
> And cloud-scale deployments, where you need fast startup regardless of
> cluster size, elastic scale out/scale in, etc. Here is a quick
> diagram I pieced together some time back, for a "stateless" app server
> (which will be necessary to achieve app server elasticity). Note that
> this is just a concept, hasn't been decided upon that this will happen
> in JBoss AS yet, etc.
>
>
>
> [image/png:StatelessAppServer.png]
>
>
>
> HTH
> Manik
>
> >
> > Thanks for the help!
> >
> > /Dimitris
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev(a)lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Manik Surtani
> manik(a)jboss.org
> Lead, Infinispan
> Lead, JBoss Cache
> http://www.infinispan.org
> http://www.jbosscache.org
>
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
14 years, 10 months
Fwd: Stale data read when L1 invalidation happens while UnionConsistentHash is in use
by Galder Zamarreno
Resending without log until the message is approved.
--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache
----- Forwarded Message -----
From: galder(a)redhat.com
To: "infinispan -Dev List" <infinispan-dev(a)lists.jboss.org>
Sent: Friday, April 30, 2010 6:30:05 PM GMT +01:00 Amsterdam / Berlin / Bern / Rome / Stockholm / Vienna
Subject: Stale data read when L1 invalidation happens while UnionConsistentHash is in use
Hi,
I've spent all day chasing down a random Hot Rod testsuite failure related to distribution. This is the last hurdle to close https://jira.jboss.org/jira/browse/ISPN-411. In HotRodDistributionTest, which is still to be committed, I test adding a new node, doing a put on this node, and then doing a get in a different node and making sure that I get what was put. The test randomly fails saying that the get returns the old value. The failure is nothing to do with Hot Rod itself but rather a race condition where union consistent hash is used. Let me explain:
1. An earlier operation had set "k-testDistributedPutWithTopologyChanges" key to "v5-testDistributedPutWithTopologyChanges".
2. Start a new hot rod server in eq-7969.
2. eq-7969 node calls a put on that key with "v6-testDistributedPutWithTopologyChanges". Recipients for the put are: eq-7969 and eq-61332.
3. eq-7969 sends an invalidate L1 to all, including eq-13415
4. eq-13415 should invalidate "k-testDistributedPutWithTopologyChanges" but it doesn't, since it considers that "k-testDistributedPutWithTopologyChanges" is local to eq-13415:
2010-04-30 18:02:19,907 6046 TRACE [org.infinispan.distribution.DefaultConsistentHash] (OOB-2,Infinispan-Cluster,eq-13415:) Hash code for key CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45, 116, 101, 115, 116, 68, 105, 115, 116, ..]}} is 344897059
2010-04-30 18:02:19,907 6046 TRACE [org.infinispan.distribution.DefaultConsistentHash] (OOB-2,Infinispan-Cluster,eq-13415:) Candidates for key CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45, 116, 101, 115, 116, 68, 105, 115, 116, ..]}} are {5458=eq-7969, 6831=eq-61332}
2010-04-30 18:02:19,907 6046 TRACE [org.infinispan.distribution.DistributionManagerImpl] (OOB-2,Infinispan-Cluster,eq-13415:) Is local CacheKey{data=ByteArray{size=39, hashCode=17b1683, array=[107, 45, 116, 101, 115, 116, 68, 105, 115, 116, ..]}} to eq-13415 query returns true and consistentHash is org.infinispan.distribution.UnionConsistentHash@10747b4
This is a log with log messages that I added to debug it. The key factor here is that UnionConsistentHash is in use, probably due to rehashing not having fully finished.
5. The end result is that a read of "k-testDistributedPutWithTopologyChanges" in eq-13415 returns "v5-testDistributedPutWithTopologyChanges".
I thought that maybe we could be more conservative here and if rehashing is in progress (or UnionConsistentHash is in use) invalidate regardless. Assuming that a put always follows an invalidation in distribution and not viceversa, that would be fine. The only downside is that you'd be invalidating too much but put would replace the data in the node where invalidation should not have happened but it did, so not a problem.
Thoughts? Alternatively, maybe I need to shape my test so that I wait for rehashing to finish, but the problem would still be there.
Cheers,
--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache
14 years, 10 months
Fwd: Infinispan SVN: r1733 - in trunk: server/core/src/main/scala/org/infinispan/server/core/transport/netty and 3 other directories.
by Galder Zamarreno
Guys,
I made a mistake in the commit message of this. It should refer to https://jira.jboss.org/jira/browse/ISPN-411 and not https://jira.jboss.org/jira/browse/ISPN-384.
Cheers,
--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache
----- Forwarded Message -----
From: infinispan-commits(a)lists.jboss.org
To: infinispan-commits(a)lists.jboss.org
Sent: Monday, May 3, 2010 11:20:24 AM GMT +01:00 Amsterdam / Berlin / Bern / Rome / Stockholm / Vienna
Subject: [infinispan-commits] Infinispan SVN: r1733 - in trunk: server/core/src/main/scala/org/infinispan/server/core/transport/netty and 3 other directories.
Author: galder.zamarreno(a)jboss.com
Date: 2010-05-03 05:20:23 -0400 (Mon, 03 May 2010)
New Revision: 1733
Modified:
trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
Log:
[ISPN-384] (Implement topology and hash distribution headers in Hot Rod) Implemented hash distribution headers and added methods to ConsistentHash interface to enable clients to retrieve an address' hash id and the hash space.
Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2010-05-03 09:20:23 UTC (rev 1733)
@@ -81,4 +81,20 @@
* @return true if the key is mapped to the address; false otherwise
*/
boolean isKeyLocalToAddress(Address a, Object key, int replCount);
+
+ /**
+ * Returns the value between 0 and the hash space limit, or hash id, for a particular address. If there's no such
+ * value for an address, this method will return -1.
+ *
+ * @return An int between 0 and hash space if the address is present in the hash wheel, otherwise it returns -1.
+ */
+ int getHashId(Address a);
+
+ /**
+ * Returns the hash space constant for this consistent hash algorithm class. This integer is often used as modulus
+ * for arithmetic operations within the algorithm, for example, limiting the range of possible hash values.
+ *
+ * @return A positive integer containing the hash space constant or 0 is not supported by implementation.
+ */
+ int getHashSpace();
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2010-05-03 09:20:23 UTC (rev 1733)
@@ -3,12 +3,16 @@
import org.infinispan.marshall.Ids;
import org.infinispan.marshall.Marshallable;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -20,6 +24,8 @@
// make sure all threads see the current list
ArrayList<Address> addresses;
SortedMap<Integer, Address> positions;
+ // TODO: Maybe address and addressToHashIds can be combined in a LinkedHashMap?
+ Map<Address, Integer> addressToHashIds;
final static int HASH_SPACE = 10240; // no more than 10k nodes?
@@ -43,12 +49,17 @@
addresses.trimToSize();
positions = new TreeMap<Integer, Address>();
+ addressToHashIds = new HashMap<Address, Integer>();
for (Address a : addresses) {
int positionIndex = Math.abs(hash(a.hashCode())) % HASH_SPACE;
// this is deterministic since the address list is ordered and the order is consistent across the grid
while (positions.containsKey(positionIndex)) positionIndex = positionIndex + 1 % HASH_SPACE;
positions.put(positionIndex, a);
+ // If address appears several times, take the lowest value to guarantee that
+ // at least the initial value and subsequent +1 values would end up in the same node
+ if (!addressToHashIds.containsKey(a))
+ addressToHashIds.put(a, positionIndex);
}
addresses.clear();
@@ -153,6 +164,20 @@
}
@Override
+ public int getHashId(Address a) {
+ Integer hashId = addressToHashIds.get(a);
+ if (hashId == null)
+ return -1;
+ else
+ return hashId.intValue();
+ }
+
+ @Override
+ public int getHashSpace() {
+ return HASH_SPACE;
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Modified: trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java 2010-05-03 09:20:23 UTC (rev 1733)
@@ -262,6 +262,16 @@
return hash;
}
+ @Override
+ public int getHashId(Address a) {
+ throw new RuntimeException("Not yet implemented");
+ }
+
+ @Override
+ public int getHashSpace() {
+ return Integer.MAX_VALUE; // Entire positive integer range
+ }
+
/**
* @return A String representing the object pool.
*/
Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2010-05-03 09:20:23 UTC (rev 1733)
@@ -56,6 +56,19 @@
throw new UnsupportedOperationException("Unsupported!");
}
+ @Override
+ public int getHashId(Address a) {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public int getHashSpace() {
+ int oldHashSpace = oldCH.getHashSpace();
+ int newHashSpace = newCH.getHashSpace();
+ // In a union, the hash space is the biggest of the hash spaces.
+ return oldHashSpace > newHashSpace ? oldHashSpace : newHashSpace;
+ }
+
public ConsistentHash getNewConsistentHash() {
return newCH;
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -56,8 +56,15 @@
ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer {
override def determineThreadName(currentThreadName: String, proposedThreadName: String): String = {
val index = proposedThreadName.findIndexOf(_ == '#')
- val typeInFix = if (proposedThreadName.contains("boss")) "Master-" else "Worker-"
- threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
+ val typeInFix =
+ if (proposedThreadName contains "server worker") "ServerWorker-"
+ else if (proposedThreadName contains "server boss") "ServerMaster-"
+ else if (proposedThreadName contains "client worker") "ClientWorker-"
+ else "ClientMaster-"
+ val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
+ trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
+ name, currentThread, proposedThreadName)
+ name
}
})
val bootstrap = new ServerBootstrap(factory);
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -4,6 +4,7 @@
import java.util.Arrays
import org.infinispan.marshall.Marshallable
import java.io.{ObjectInput, ObjectOutput}
+import org.infinispan.server.core.Logging
/**
* // TODO: Document this
@@ -23,7 +24,9 @@
}
}
- override def hashCode: Int = 41 + Arrays.hashCode(data)
+ override def hashCode: Int = {
+ 41 + Arrays.hashCode(data)
+ }
override def toString = {
new StringBuilder().append("CacheKey").append("{")
@@ -33,8 +36,8 @@
}
-object CacheKey {
- class Externalizer extends org.infinispan.marshall.Externalizer {
+object CacheKey extends Logging {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
override def writeObject(output: ObjectOutput, obj: AnyRef) {
val cacheKey = obj.asInstanceOf[CacheKey]
output.write(cacheKey.data.length)
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -12,6 +12,7 @@
import collection.immutable
import org.infinispan.util.concurrent.TimeoutException
import java.io.IOException
+import org.infinispan.distribution.DefaultConsistentHash
/**
* // TODO: Document this
@@ -27,6 +28,10 @@
import HotRodServer._
type SuitableHeader = HotRodHeader
+ private lazy val isClustered: Boolean = cacheManager.getGlobalConfiguration.getTransportClass != null
+ private lazy val topologyCache: Cache[String, TopologyView] =
+ if (isClustered) cacheManager.getCache(TopologyCacheName) else null
+
override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
val streamOp = buffer.readUnsignedByte
val op = toRequest(streamOp)
@@ -82,25 +87,27 @@
private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
val topologyResponse = getTopologyResponse(h)
if (h.flag == ForceReturnPreviousValue)
- new ResponseWithPrevious(h.messageId, op, st, topologyResponse, if (prev == null) None else Some(prev.data))
+ new ResponseWithPrevious(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse,
+ if (prev == null) None else Some(prev.data))
else
- new Response(h.messageId, op, st, topologyResponse)
+ new Response(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse)
}
override def createGetResponse(h: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef = {
val topologyResponse = getTopologyResponse(h)
if (v != null && op == GetRequest)
- new GetResponse(h.messageId, GetResponse, Success, topologyResponse, Some(v.data))
+ new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, Success, topologyResponse, Some(v.data))
else if (v != null && op == GetWithVersionRequest)
- new GetWithVersionResponse(h.messageId, GetWithVersionResponse, Success, topologyResponse, Some(v.data), v.version)
+ new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, Success,
+ topologyResponse, Some(v.data), v.version)
else if (op == GetRequest)
- new GetResponse(h.messageId, GetResponse, KeyDoesNotExist, topologyResponse, None)
+ new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, KeyDoesNotExist, topologyResponse, None)
else
- new GetWithVersionResponse(h.messageId, GetWithVersionResponse, KeyDoesNotExist, topologyResponse, None, 0)
+ new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, KeyDoesNotExist,
+ topologyResponse, None, 0)
}
override def handleCustomRequest(h: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
- val messageId = h.messageId
h.op match {
case RemoveIfUnmodifiedRequest => {
val k = readKey(buffer)
@@ -124,18 +131,18 @@
val topologyResponse = getTopologyResponse(h)
val k = readKey(buffer)
if (cache.containsKey(k))
- new Response(messageId, ContainsKeyResponse, Success, topologyResponse)
+ new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, Success, topologyResponse)
else
- new Response(messageId, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
+ new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
}
case ClearRequest => {
val topologyResponse = getTopologyResponse(h)
cache.clear
- new Response(messageId, ClearResponse, Success, topologyResponse)
+ new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, topologyResponse)
}
case PingRequest => {
val topologyResponse = getTopologyResponse(h)
- new Response(messageId, PingResponse, Success, topologyResponse)
+ new Response(h.messageId, h.cacheName, h.clientIntel, PingResponse, Success, topologyResponse)
}
}
}
@@ -152,33 +159,36 @@
stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
val topologyResponse = getTopologyResponse(h)
- new StatsResponse(h.messageId, immutable.Map[String, String]() ++ stats, topologyResponse)
+ new StatsResponse(h.messageId, h.cacheName, h.clientIntel, immutable.Map[String, String]() ++ stats, topologyResponse)
}
override def createErrorResponse(h: HotRodHeader, t: Throwable): AnyRef = {
t match {
case i: IOException =>
- new ErrorResponse(h.messageId, ParseError, getTopologyResponse(h), i.toString)
+ new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ParseError, getTopologyResponse(h), i.toString)
case t: TimeoutException =>
- new ErrorResponse(h.messageId, OperationTimedOut, getTopologyResponse(h), t.toString)
+ new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, OperationTimedOut, getTopologyResponse(h), t.toString)
case t: Throwable =>
- new ErrorResponse(h.messageId, ServerError, getTopologyResponse(h), t.toString)
+ new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ServerError, getTopologyResponse(h), t.toString)
}
}
private def getTopologyResponse(h: HotRodHeader): Option[AbstractTopologyResponse] = {
// If clustered, set up a cache for topology information
- if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
- val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
- h.clientIntelligence match {
+ if (isClustered) {
+ h.clientIntel match {
case 2 | 3 => {
val currentTopologyView = topologyCache.get("view")
if (h.topologyId != currentTopologyView.topologyId) {
- if (h.clientIntelligence == 2) {
+ val cache = cacheManager.getCache(h.cacheName)
+ val config = cache.getConfiguration
+ if (h.clientIntel == 2 || !config.getCacheMode.isDistributed) {
Some(TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members)))
- } else { // Must be 3
- // TODO: Implement hash-distribution-aware reply
- None
+ } else { // Must be 3 and distributed
+ // TODO: Retrieve hash function when we have specified functions
+ val hashSpace = cache.getAdvancedCache.getDistributionManager.getConsistentHash.getHashSpace
+ Some(HashDistAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members),
+ config.getNumOwners, 1, hashSpace))
}
} else None
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -108,14 +108,14 @@
case se: ServerException => {
val h = se.header.asInstanceOf[HotRodHeader]
se.getCause match {
- case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, None, i.toString)
- case u: UnknownOperationException => new ErrorResponse(h.messageId, UnknownOperation, None, u.toString)
- case u: UnknownVersionException => new ErrorResponse(h.messageId, UnknownVersion, None, u.toString)
+ case i: InvalidMagicIdException => new ErrorResponse(0, "", 1, InvalidMagicOrMsgId, None, i.toString)
+ case u: UnknownOperationException => new ErrorResponse(h.messageId, "", 1, UnknownOperation, None, u.toString)
+ case u: UnknownVersionException => new ErrorResponse(h.messageId, "", 1, UnknownVersion, None, u.toString)
case t: Throwable => h.decoder.createErrorResponse(h, t)
}
}
case c: ClosedChannelException => null
- case t: Throwable => new ErrorResponse(0, ServerError, None, t.toString)
+ case t: Throwable => new ErrorResponse(0, "", 1, ServerError, None, t.toString)
}
}
@@ -131,7 +131,7 @@
class InvalidMagicIdException(reason: String) extends StreamCorruptedException(reason)
class HotRodHeader(override val op: Enumeration#Value, val messageId: Long, val cacheName: String,
- val flag: ProtocolFlag, val clientIntelligence: Short, val topologyId: Int,
+ val flag: ProtocolFlag, val clientIntel: Short, val topologyId: Int,
val decoder: AbstractVersionedDecoder) extends RequestHeader(op) {
override def toString = {
new StringBuilder().append("HotRodHeader").append("{")
@@ -139,7 +139,7 @@
.append(", messageId=").append(messageId)
.append(", cacheName=").append(cacheName)
.append(", flag=").append(flag)
- .append(", clientIntelligence=").append(clientIntelligence)
+ .append(", clientIntelligence=").append(clientIntel)
.append(", topologyId=").append(topologyId)
.append("}").toString
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -4,6 +4,9 @@
import org.infinispan.server.core.transport.{ChannelBuffer, ChannelHandlerContext, Channel, Encoder}
import OperationStatus._
import org.infinispan.server.core.transport.ChannelBuffers._
+import org.infinispan.manager.CacheManager
+import org.infinispan.Cache
+import collection.mutable.ListBuffer
/**
* // TODO: Document this
@@ -11,8 +14,10 @@
* @since
*/
-class HotRodEncoder extends Encoder {
+class HotRodEncoder(cacheManager: CacheManager) extends Encoder {
import HotRodEncoder._
+ import HotRodServer._
+ private lazy val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
trace("Encode msg {0}", msg)
@@ -56,23 +61,76 @@
buffer.writeByte(1) // Topology changed
r.topologyResponse.get match {
case t: TopologyAwareResponse => {
- buffer.writeUnsignedInt(t.view.topologyId)
- buffer.writeUnsignedInt(t.view.members.size)
- t.view.members.foreach{address =>
- buffer.writeString(address.host)
- buffer.writeUnsignedShort(address.port)
- }
+ if (r.clientIntel == 2)
+ writeTopologyHeader(t, buffer)
+ else
+ writeHashTopologyHeader(t, buffer)
}
- case h: HashDistAwareResponse => {
- // TODO: Implement reply to hash dist responses
- }
+ case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r)
}
} else {
buffer.writeByte(0) // No topology change
}
buffer
}
-
+
+ private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
+ buffer.writeUnsignedInt(t.view.topologyId)
+ buffer.writeUnsignedInt(t.view.members.size)
+ t.view.members.foreach{address =>
+ buffer.writeString(address.host)
+ buffer.writeUnsignedShort(address.port)
+ }
+ }
+
+ // TODO: Spec values when client intel is 3 but cache is not configured with distribution
+ private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
+ buffer.writeUnsignedInt(t.view.topologyId)
+ buffer.writeUnsignedShort(0) // Num key owners
+ buffer.writeByte(0) // Hash function
+ buffer.writeUnsignedInt(0) // Hash space
+ buffer.writeUnsignedInt(t.view.members.size)
+ t.view.members.foreach{address =>
+ buffer.writeString(address.host)
+ buffer.writeUnsignedShort(address.port)
+ buffer.writeUnsignedInt(0) // Address' hash id
+ }
+ }
+
+ private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response) {
+ buffer.writeUnsignedInt(h.view.topologyId)
+ buffer.writeUnsignedShort(h.numOwners) // Num key owners
+ buffer.writeByte(h.hashFunction) // Hash function
+ buffer.writeUnsignedInt(h.hashSpace) // Hash space
+ buffer.writeUnsignedInt(h.view.members.size)
+ var hashIdUpdateRequired = false
+ // If we reached here, we know for sure that this is a cache configured with distribution
+ val consistentHash = cacheManager.getCache(r.cacheName).getAdvancedCache.getDistributionManager.getConsistentHash
+ val updateMembers = new ListBuffer[TopologyAddress]
+ h.view.members.foreach{address =>
+ buffer.writeString(address.host)
+ buffer.writeUnsignedShort(address.port)
+ val cachedHashId = address.hashIds.get(r.cacheName)
+ val hashId = consistentHash.getHashId(address.clusterAddress)
+ val newAddress =
+ // If distinct or not present, cached hash id needs updating
+ if (cachedHashId == None || cachedHashId.get != hashId) {
+ if (!hashIdUpdateRequired) hashIdUpdateRequired = true
+ val newHashIds = address.hashIds + (r.cacheName -> hashId)
+ address.copy(hashIds = newHashIds)
+ } else {
+ address
+ }
+ updateMembers += newAddress
+ buffer.writeUnsignedInt(hashId) // Address' hash id
+ }
+ // At least a hash id had to be updated in the view. Take the view copy and distribute it around the cluster
+ if (hashIdUpdateRequired) {
+ val viewCopy = h.view.copy(members = updateMembers.toList)
+ topologyCache.replace("view", h.view, viewCopy)
+ }
+ }
+
}
object HotRodEncoder extends Logging {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -20,7 +20,7 @@
* @since 4.1
*/
-class HotRodServer extends AbstractProtocolServer("HotRod") {
+class HotRodServer extends AbstractProtocolServer("HotRod") with Logging {
import HotRodServer._
private var isClustered: Boolean = _
private var address: TopologyAddress = _
@@ -28,7 +28,7 @@
def getAddress: TopologyAddress = address
- override def getEncoder: Encoder = new HotRodEncoder
+ override def getEncoder: Encoder = new HotRodEncoder(getCacheManager)
override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
@@ -43,10 +43,10 @@
private def addSelfToTopologyView(host: String, port: Int, cacheManager: CacheManager) {
defineTopologyCacheConfig(cacheManager)
topologyCache = cacheManager.getCache(TopologyCacheName)
- address = TopologyAddress(host, port, 0, cacheManager.getAddress)
+ address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
+ debug("Local topology address is {0}", address)
cacheManager.addListener(new CrashedMemberDetectorListener)
val currentView = topologyCache.get("view")
- // TODO: If distribution configured, add hashcode of this address
if (currentView != null) {
val newMembers = currentView.members ::: List(address)
val newView = TopologyView(currentView.topologyId + 1, newMembers)
@@ -77,14 +77,18 @@
protected def removeSelfFromTopologyView {
// Graceful shutdown, remove this node as member and install new view
val currentView = topologyCache.get("view")
- // TODO: If distribution configured, add hashcode of this address
- val newMembers = currentView.members.filterNot(_ == address)
- val newView = TopologyView(currentView.topologyId + 1, newMembers)
- val replaced = topologyCache.replace("view", currentView, newView)
- if (!replaced) {
- // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+ // Comparing cluster address should be enough. Full object comparison could fail if hash id map has changed.
+ val newMembers = currentView.members.filterNot(_.clusterAddress == address.clusterAddress)
+ if (newMembers.length != (currentView.members.length - 1)) {
+ debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
} else {
- debug("Removed {0} from topology view, new view is {1}", address, newView)
+ val newView = TopologyView(currentView.topologyId + 1, newMembers)
+ val replaced = topologyCache.replace("view", currentView, newView)
+ if (!replaced) {
+ // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+ } else {
+ debug("Removed {0} from topology view, new view is {1}", address, newView)
+ }
}
}
@@ -97,8 +101,7 @@
}
@Listener
- class CrashedMemberDetectorListener {
- import HotRodServer._
+ private class CrashedMemberDetectorListener {
private val executor = Executors.newCachedThreadPool(new ThreadFactory(){
val threadCounter = new AtomicInteger
@@ -117,48 +120,12 @@
// This is to avoid all nodes trying to make the same modification which would be wasteful and lead to deadlocks.
if (cacheManager.isCoordinator) {
// Use a separate thread to avoid blocking the view handler thread
- val callable = new Callable[Void] {
- override def call = {
- try {
- val newMembers = e.getNewMembers
- val oldMembers = e.getOldMembers
- // Someone left the cluster, verify whether it did it gracefully or crashed.
- if (oldMembers.size > newMembers.size) {
- val newMembersList = asBuffer(newMembers).toList
- val oldMembersList = asBuffer(oldMembers).toList
- val goneMembers = oldMembersList -- newMembersList
- val currentView = topologyCache.get("view")
- var tmpMembers = currentView.members
- for (goneMember <- goneMembers) {
- trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
- // If old memmber is in topology, it means that it had an abnormal ending
- val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
- if (isCrashed) {
- trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
- "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
- tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
- trace("After removal, new Hot Rod topology is {0}", tmpMembers)
- }
- }
- if (tmpMembers.size < currentView.members.size) {
- val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
- val replaced = topologyCache.replace("view", currentView, newView)
- if (!replaced) {
- // TODO: How to deal with concurrent failures at this point?
- }
- }
- }
- } catch {
- case t: Throwable => error("Error detecting crashed member", t)
- }
- null
- }
- }
- executor.submit(callable);
+ executor.submit(new CrashedMemberDetectorCallable(e));
}
}
private def isOldMemberInTopology(oldMember: Address, view: TopologyView): (Boolean, TopologyAddress) = {
+ // TODO: If members was stored as a map, this would be more efficient
for (member <- view.members) {
if (member.clusterAddress == oldMember) {
return (true, member)
@@ -166,11 +133,51 @@
}
(false, null)
}
+
+ private class CrashedMemberDetectorCallable(e: ViewChangedEvent) extends Callable[Void] {
+ override def call = {
+ try {
+ val newMembers = e.getNewMembers
+ val oldMembers = e.getOldMembers
+ // Someone left the cluster, verify whether it did it gracefully or crashed.
+ if (oldMembers.size > newMembers.size) {
+ val newMembersList = asBuffer(newMembers).toList
+ val oldMembersList = asBuffer(oldMembers).toList
+ val goneMembers = oldMembersList.filterNot(newMembersList contains)
+ val currentView = topologyCache.get("view")
+ if (currentView != null) {
+ var tmpMembers = currentView.members
+ for (goneMember <- goneMembers) {
+ trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+ // If old memmber is in topology, it means that it had an abnormal ending
+ val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
+ if (isCrashed) {
+ trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+ "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
+ tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
+ trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+ }
+ }
+ if (tmpMembers.size < currentView.members.size) {
+ val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
+ val replaced = topologyCache.replace("view", currentView, newView)
+ if (!replaced) {
+ // TODO: How to deal with concurrent failures at this point?
+ }
+ }
+ }
+ }
+ } catch {
+ case t: Throwable => error("Error detecting crashed member", t)
+ }
+ null
+ }
+ }
}
}
-object HotRodServer extends Logging {
+object HotRodServer {
val TopologyCacheName = "___hotRodTopologyCache"
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -9,9 +9,8 @@
* @author Galder Zamarreño
* @since 4.1
*/
-// TODO: Maybe add clientIntelligence to response to decide what information to send back
-class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus,
- val topologyResponse: Option[AbstractTopologyResponse]) {
+class Response(val messageId: Long, val cacheName: String, val clientIntel: Short, val operation: OperationResponse,
+ val status: OperationStatus, val topologyResponse: Option[AbstractTopologyResponse]) {
override def toString = {
new StringBuilder().append("Response").append("{")
.append("messageId=").append(messageId)
@@ -21,11 +20,12 @@
}
}
-class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
+class ResponseWithPrevious(override val messageId: Long, override val cacheName: String,
+ override val clientIntel: Short, override val operation: OperationResponse,
override val status: OperationStatus,
override val topologyResponse: Option[AbstractTopologyResponse],
val previous: Option[Array[Byte]])
- extends Response(messageId, operation, status, topologyResponse) {
+ extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
override def toString = {
new StringBuilder().append("ResponseWithPrevious").append("{")
.append("messageId=").append(messageId)
@@ -36,10 +36,11 @@
}
}
-class GetResponse(override val messageId: Long, override val operation: OperationResponse,
- override val status: OperationStatus, override val topologyResponse: Option[AbstractTopologyResponse],
+class GetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
+ override val operation: OperationResponse, override val status: OperationStatus,
+ override val topologyResponse: Option[AbstractTopologyResponse],
val data: Option[Array[Byte]])
- extends Response(messageId, operation, status, topologyResponse) {
+ extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
override def toString = {
new StringBuilder().append("GetResponse").append("{")
.append("messageId=").append(messageId)
@@ -50,11 +51,12 @@
}
}
-class GetWithVersionResponse(override val messageId: Long, override val operation: OperationResponse,
+class GetWithVersionResponse(override val messageId: Long, override val cacheName: String,
+ override val clientIntel: Short, override val operation: OperationResponse,
override val status: OperationStatus,
override val topologyResponse: Option[AbstractTopologyResponse],
override val data: Option[Array[Byte]], val version: Long)
- extends GetResponse(messageId, operation, status, topologyResponse, data) {
+ extends GetResponse(messageId, cacheName, clientIntel, operation, status, topologyResponse, data) {
override def toString = {
new StringBuilder().append("GetWithVersionResponse").append("{")
.append("messageId=").append(messageId)
@@ -66,9 +68,10 @@
}
}
-class ErrorResponse(override val messageId: Long, override val status: OperationStatus,
+class ErrorResponse(override val messageId: Long, override val cacheName: String,
+ override val clientIntel: Short, override val status: OperationStatus,
override val topologyResponse: Option[AbstractTopologyResponse], val msg: String)
- extends Response(messageId, ErrorResponse, status, topologyResponse) {
+ extends Response(messageId, cacheName, clientIntel, ErrorResponse, status, topologyResponse) {
override def toString = {
new StringBuilder().append("ErrorResponse").append("{")
.append("messageId=").append(messageId)
@@ -79,9 +82,10 @@
}
}
-class StatsResponse(override val messageId: Long, val stats: Map[String, String],
+class StatsResponse(override val messageId: Long, override val cacheName: String,
+ override val clientIntel: Short, val stats: Map[String, String],
override val topologyResponse: Option[AbstractTopologyResponse])
- extends Response(messageId, StatsResponse, Success, topologyResponse) {
+ extends Response(messageId, cacheName, clientIntel, StatsResponse, Success, topologyResponse) {
override def toString = {
new StringBuilder().append("StatsResponse").append("{")
.append("messageId=").append(messageId)
@@ -95,5 +99,5 @@
case class TopologyAwareResponse(override val view: TopologyView)
extends AbstractTopologyResponse(view)
-case class HashDistAwareResponse(override val view: TopologyView, numKeyOwners: Short, hashFunction: Byte, hashSpaceSize: Int)
+case class HashDistAwareResponse(override val view: TopologyView, numOwners: Int, hashFunction: Byte, hashSpace: Int)
extends AbstractTopologyResponse(view)
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -5,12 +5,16 @@
import org.infinispan.remoting.transport.Address
/**
- * // TODO: Document this
+ * A Hot Rod topology address represents a Hot Rod endpoint that belongs to a Hot Rod cluster. It contains host/port
+ * information where the Hot Rod endpoint is listening. To be able to detect crashed members in the cluster and update
+ * the Hot Rod topology accordingly, it also contains the corresponding cluster address. Finally, since each cache
+ * could potentially be configured with a different hash algorithm, a topology address also contains per cache hash id.
+ *
* @author Galder Zamarreño
* @since // TODO
*/
@Marshallable(externalizer = classOf[TopologyAddress.Externalizer], id = 58)
-case class TopologyAddress(val host: String, val port: Int, val hostHashCode: Int, val clusterAddress: Address)
+case class TopologyAddress(val host: String, val port: Int, val hashIds: Map[String, Int], val clusterAddress: Address)
object TopologyAddress {
class Externalizer extends org.infinispan.marshall.Externalizer {
@@ -18,16 +22,16 @@
val topologyAddress = obj.asInstanceOf[TopologyAddress]
output.writeObject(topologyAddress.host)
output.writeInt(topologyAddress.port)
- output.writeInt(topologyAddress.hostHashCode)
+ output.writeObject(topologyAddress.hashIds)
output.writeObject(topologyAddress.clusterAddress)
}
override def readObject(input: ObjectInput): AnyRef = {
val host = input.readObject.asInstanceOf[String]
val port = input.readInt
- val hostHashCode = input.readInt
+ val hashIds = input.readObject.asInstanceOf[Map[String, Int]]
val clusterAddress = input.readObject.asInstanceOf[Address]
- TopologyAddress(host, port, hostHashCode, clusterAddress)
+ TopologyAddress(host, port, hashIds, clusterAddress)
}
}
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -11,7 +11,10 @@
@Marshallable(externalizer = classOf[TopologyView.Externalizer], id = 59)
case class TopologyView(val topologyId: Int, val members: List[TopologyAddress])
// TODO: TopologyView could maintain a Map[Address, TopologyAddress] rather than pushing Address into each TopologyAddress.
-// TODO: That would make crash detection more efficient at the expense of some extra space.
+// TODO: That would make crash detection more efficient at the expense of some extra space.
+// TODO: In fact, it might increase more concurrency and make replication more efficient if topology cache stored stuff
+// TODO: in [Address, TopologyAddress] and either keep the topology id as an entry in that same cache or in a separate one.
+// TODO: The downside here is that you'd need to make multiple cache calls atomic via txs or similar.
object TopologyView {
class Externalizer extends org.infinispan.marshall.Externalizer {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -2,13 +2,11 @@
import org.infinispan.config.Configuration
import java.lang.reflect.Method
-import test.HotRodClient
import test.HotRodTestingUtil._
import org.infinispan.server.hotrod.OperationStatus._
-import org.infinispan.config.Configuration.CacheMode
import org.testng.Assert._
-import org.testng.annotations.{AfterMethod, AfterClass, Test}
-import org.infinispan.test.{TestingUtil, MultipleCacheManagersTest}
+import org.testng.annotations.Test
+import org.infinispan.test.TestingUtil
/**
* // TODO: Document this
@@ -17,57 +15,18 @@
*/
@Test(groups = Array("functional"), testName = "server.hotrod.HotRodReplicationTest")
-class HotRodReplicationTest extends MultipleCacheManagersTest {
+class HotRodReplicationTest extends HotRodMultiNodeTest {
import HotRodServer._
- private val cacheName = "hotRodReplSync"
- private[this] var servers: List[HotRodServer] = List()
- private[this] var clients: List[HotRodClient] = List()
+ override protected def cacheName: String = "hotRodReplSync"
- @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
- override def createCacheManagers {
- for (i <- 0 until 2) {
- val cm = addClusterEnabledCacheManager()
- cm.defineConfiguration(cacheName, createCacheConfig)
- cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
- }
- servers = servers ::: List(startHotRodServer(cacheManagers.get(0)))
- servers = servers ::: List(startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50))
- servers.foreach {s =>
- clients = new HotRodClient("127.0.0.1", s.getPort, cacheName, 60) :: clients
- }
- }
-
- @AfterClass(alwaysRun = true)
- override def destroy {
- log.debug("Test finished, close Hot Rod server", null)
- clients.foreach(_.stop)
- servers.foreach(_.stop)
- super.destroy // Stop the caches last so that at stoppage time topology cache can be updated properly
- }
-
- @AfterMethod(alwaysRun=true)
- override def clearContent() {
- // Do not clear cache between methods so that topology cache does not get cleared
- }
-
- private def createCacheConfig: Configuration = {
+ override protected def createCacheConfig: Configuration = {
val config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
config.setFetchInMemoryState(true)
config
}
- private def createTopologyCacheConfig: Configuration = {
- val topologyCacheConfig = new Configuration
- topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
- topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
- topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
- topologyCacheConfig.setSyncCommitPhase(true) // Only for testing, so that asserts work fine.
- topologyCacheConfig.setSyncRollbackPhase(true) // Only for testing, so that asserts work fine.
- topologyCacheConfig
- }
-
def testReplicatedPut(m: Method) {
val putSt = clients.head.put(k(m) , 0, 0, v(m)).status
assertStatus(putSt, Success)
@@ -106,22 +65,15 @@
assertEquals(resp.topologyResponse, None)
resp = clients.head.ping(2, 0)
assertStatus(resp.status, Success)
- assertTopologyReceived(resp.topologyResponse.get)
+ assertTopologyReceived(resp.topologyResponse.get, servers)
resp = clients.tail.head.ping(2, 1)
assertStatus(resp.status, Success)
- assertTopologyReceived(resp.topologyResponse.get)
+ assertTopologyReceived(resp.topologyResponse.get, servers)
resp = clients.tail.head.ping(2, 2)
assertStatus(resp.status, Success)
assertEquals(resp.topologyResponse, None)
}
- private def assertTopologyReceived(topologyResp: AbstractTopologyResponse) {
- assertEquals(topologyResp.view.topologyId, 2)
- assertEquals(topologyResp.view.members.size, 2)
- assertAddressEquals(topologyResp.view.members.head, servers.head.getAddress)
- assertAddressEquals(topologyResp.view.members.tail.head, servers.tail.head.getAddress)
- }
-
def testReplicatedPutWithTopologyChanges(m: Method) {
var resp = clients.head.put(k(m) , 0, 0, v(m), 1, 0)
assertStatus(resp.status, Success)
@@ -129,10 +81,10 @@
assertSuccess(clients.tail.head.get(k(m), 0), v(m))
resp = clients.head.put(k(m) , 0, 0, v(m, "v1-"), 2, 0)
assertStatus(resp.status, Success)
- assertTopologyReceived(resp.topologyResponse.get)
+ assertTopologyReceived(resp.topologyResponse.get, servers)
resp = clients.tail.head.put(k(m) , 0, 0, v(m, "v2-"), 2, 1)
assertStatus(resp.status, Success)
- assertTopologyReceived(resp.topologyResponse.get)
+ assertTopologyReceived(resp.topologyResponse.get, servers)
resp = clients.head.put(k(m) , 0, 0, v(m, "v3-"), 2, 2)
assertStatus(resp.status, Success)
assertEquals(resp.topologyResponse, None)
@@ -142,21 +94,21 @@
cm.defineConfiguration(cacheName, createCacheConfig)
cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
val newServer = startHotRodServer(cm, servers.tail.head.getPort + 25)
- servers = servers ::: List(newServer)
- resp = clients.head.put(k(m) , 0, 0, v(m, "v4-"), 2, 2)
- assertStatus(resp.status, Success)
- assertEquals(resp.topologyResponse.get.view.topologyId, 3)
- assertEquals(resp.topologyResponse.get.view.members.size, 3)
- assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
- assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
- assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, servers.tail.tail.head.getAddress)
- assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v4-"))
+ try {
+ val resp = clients.head.put(k(m) , 0, 0, v(m, "v4-"), 2, 2)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse.get.view.topologyId, 3)
+ assertEquals(resp.topologyResponse.get.view.members.size, 3)
+ assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
+ assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
+ assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, newServer.getAddress)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v4-"))
+ } finally {
+ newServer.stop
+ cm.stop
+ }
- servers.tail.tail.head.stop
- servers = servers.filterNot(_ == newServer)
- cm.stop
-
resp = clients.head.put(k(m) , 0, 0, v(m, "v5-"), 2, 3)
assertStatus(resp.status, Success)
assertEquals(resp.topologyResponse.get.view.topologyId, 4)
@@ -169,20 +121,21 @@
cm.defineConfiguration(cacheName, createCacheConfig)
cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
val crashingServer = startCrashingHotRodServer(cm, servers.tail.head.getPort + 11)
- servers = servers ::: List(crashingServer)
- resp = clients.head.put(k(m) , 0, 0, v(m, "v6-"), 2, 4)
- assertStatus(resp.status, Success)
- assertEquals(resp.topologyResponse.get.view.topologyId, 5)
- assertEquals(resp.topologyResponse.get.view.members.size, 3)
- assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
- assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
- assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, servers.tail.tail.head.getAddress)
- assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v6-"))
+ try {
+ val resp = clients.head.put(k(m) , 0, 0, v(m, "v6-"), 2, 4)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse.get.view.topologyId, 5)
+ assertEquals(resp.topologyResponse.get.view.members.size, 3)
+ assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
+ assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
+ assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, crashingServer.getAddress)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v6-"))
+ } finally {
+ crashingServer.stop
+ cm.stop
+ }
- crashingServer.stop
- servers = servers.filterNot(_ == crashingServer)
- cm.stop
TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1))
resp = clients.head.put(k(m) , 0, 0, v(m, "v7-"), 2, 5)
@@ -192,11 +145,18 @@
assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v7-"))
- }
- private def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress) {
- assertEquals(actual.host, expected.host)
- assertEquals(actual.port, expected.port)
- assertEquals(actual.hostHashCode, expected.hostHashCode)
+ resp = clients.head.put(k(m) , 0, 0, v(m, "v8-"), 3, 1)
+ assertStatus(resp.status, Success)
+ val hashTopologyResp = resp.topologyResponse.get.asInstanceOf[HashDistAwareResponse]
+ assertEquals(hashTopologyResp.view.topologyId, 6)
+ assertEquals(hashTopologyResp.view.members.size, 2)
+ assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, Map(cacheName -> 0))
+ assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, Map(cacheName -> 0))
+ assertEquals(hashTopologyResp.numOwners, 0)
+ assertEquals(hashTopologyResp.hashFunction, 0)
+ assertEquals(hashTopologyResp.hashSpace, 0)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v8-"))
}
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -22,6 +22,7 @@
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.{AtomicLong}
import org.infinispan.test.TestingUtil
+import org.infinispan.util.Util
/**
* A very simply Hot Rod client for testing purpouses
@@ -229,7 +230,7 @@
buffer.writeByte(op.code) // opcode
buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
buffer.writeUnsignedInt(op.flags) // flags
- buffer.writeByte(op.clientIntelligence) // client intelligence
+ buffer.writeByte(op.clientIntel) // client intelligence
buffer.writeUnsignedInt(op.topologyId) // topology id
if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
buffer.writeRangedBytes(op.key) // key length + key
@@ -271,17 +272,28 @@
val topologyChangeResponse =
if (topologyChangeMarker == 1) {
val topologyId = buf.readUnsignedInt
- if (op.clientIntelligence == 2) {
+ if (op.clientIntel == 2) {
val numberClusterMembers = buf.readUnsignedInt
val viewArray = new Array[TopologyAddress](numberClusterMembers)
for (i <- 0 until numberClusterMembers) {
val host = buf.readString
val port = buf.readUnsignedShort
- viewArray(i) = TopologyAddress(host, port, 0, null)
+ viewArray(i) = TopologyAddress(host, port, Map.empty, null)
}
Some(TopologyAwareResponse(TopologyView(topologyId, viewArray.toList)))
- } else if (op.clientIntelligence == 3) {
- None // TODO: Parse hash distribution aware
+ } else if (op.clientIntel == 3) {
+ val numOwners = buf.readUnsignedShort
+ val hashFunction = buf.readByte
+ val hashSpace = buf.readUnsignedInt
+ val numberClusterMembers = buf.readUnsignedInt
+ val viewArray = new Array[TopologyAddress](numberClusterMembers)
+ for (i <- 0 until numberClusterMembers) {
+ val host = buf.readString
+ val port = buf.readUnsignedShort
+ val hashId = buf.readUnsignedInt
+ viewArray(i) = TopologyAddress(host, port, Map(op.cacheName -> hashId), null)
+ }
+ Some(HashDistAwareResponse(TopologyView(topologyId, viewArray.toList), numOwners, hashFunction, hashSpace))
} else {
None // Is it possible?
}
@@ -295,40 +307,52 @@
for (i <- 1 to size) {
stats += (buf.readString -> buf.readString)
}
- new StatsResponse(id, immutable.Map[String, String]() ++ stats, topologyChangeResponse)
+ new StatsResponse(id, op.cacheName, op.clientIntel, immutable.Map[String, String]() ++ stats,
+ topologyChangeResponse)
}
case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
| RemoveResponse | RemoveIfUnmodifiedResponse => {
if (op.flags == 1) {
val length = buf.readUnsignedInt
if (length == 0) {
- new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, None)
+ new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+ topologyChangeResponse, None)
} else {
val previous = new Array[Byte](length)
buf.readBytes(previous)
- new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, Some(previous))
+ new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+ topologyChangeResponse, Some(previous))
}
- } else new Response(id, opCode, status, topologyChangeResponse)
+ } else new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
}
- case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status, topologyChangeResponse)
+ case ContainsKeyResponse | ClearResponse | PingResponse =>
+ new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
case GetWithVersionResponse => {
if (status == Success) {
val version = buf.readLong
val data = Some(buf.readRangedBytes)
- new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, data, version)
+ new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+ topologyChangeResponse, data, version)
} else{
- new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, None, 0)
+ new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+ topologyChangeResponse, None, 0)
}
}
case GetResponse => {
if (status == Success) {
val data = Some(buf.readRangedBytes)
- new GetResponse(id, opCode, status, topologyChangeResponse, data)
+ new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, data)
} else{
- new GetResponse(id, opCode, status, topologyChangeResponse, None)
+ new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, None)
}
}
- case ErrorResponse => new ErrorResponse(id, status, topologyChangeResponse, buf.readString)
+ case ErrorResponse => {
+ if (op == null)
+ new ErrorResponse(id, "", 0, status, topologyChangeResponse, buf.readString)
+ else
+ new ErrorResponse(id, op.cacheName, op.clientIntel, status, topologyChangeResponse, buf.readString)
+ }
+
}
trace("Got response from server: {0}", resp)
resp
@@ -366,18 +390,35 @@
}
-case class Op(val magic: Int,
- val code: Byte,
- val cacheName: String,
- val key: Array[Byte],
- val lifespan: Int,
- val maxIdle: Int,
- val value: Array[Byte],
- val flags: Int,
- val version: Long,
- val clientIntelligence: Byte,
- val topologyId: Int) {
+class Op(val magic: Int,
+ val code: Byte,
+ val cacheName: String,
+ val key: Array[Byte],
+ val lifespan: Int,
+ val maxIdle: Int,
+ val value: Array[Byte],
+ val flags: Int,
+ val version: Long,
+ val clientIntel: Byte,
+ val topologyId: Int) {
lazy val id = HotRodClient.idCounter.incrementAndGet
+ override def toString = {
+ new StringBuilder().append("Op").append("(")
+ .append(id).append(',')
+ .append(magic).append(',')
+ .append(code).append(',')
+ .append(cacheName).append(',')
+ .append(if (key == null) "null" else Util.printArray(key, true)).append(',')
+ .append(maxIdle).append(',')
+ .append(lifespan).append(',')
+ .append(if (value == null) "null" else Util.printArray(value, true)).append(',')
+ .append(flags).append(',')
+ .append(version).append(',')
+ .append(clientIntel).append(',')
+ .append(topologyId).append(')')
+ .toString
+ }
+
}
class PartialOp(override val magic: Int,
@@ -389,14 +430,14 @@
override val value: Array[Byte],
override val flags: Int,
override val version: Long,
- override val clientIntelligence: Byte,
+ override val clientIntel: Byte,
override val topologyId: Int)
- extends Op(magic, code, cacheName, key, lifespan, maxIdle, value, flags, version, clientIntelligence, topologyId) {
+ extends Op(magic, code, cacheName, key, lifespan, maxIdle, value, flags, version, clientIntel, topologyId) {
}
class StatsOp(override val magic: Int,
override val code: Byte,
override val cacheName: String,
- override val clientIntelligence: Byte,
+ override val clientIntel: Byte,
override val topologyId: Int,
- val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntelligence, topologyId)
\ No newline at end of file
+ val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-05-03 09:20:23 UTC (rev 1733)
@@ -8,7 +8,7 @@
import org.infinispan.server.hotrod.OperationStatus._
import org.testng.Assert._
import org.infinispan.util.Util
-import org.infinispan.server.hotrod.{ResponseWithPrevious, GetWithVersionResponse, GetResponse, HotRodServer}
+import org.infinispan.server.hotrod._
/**
* // TODO: Document this
@@ -72,9 +72,10 @@
def assertSuccess(resp: GetResponse, expected: Array[Byte]): Boolean = {
assertStatus(resp.status, Success)
- val isSuccess = Arrays.equals(expected, resp.data.get)
- assertTrue(isSuccess)
- isSuccess
+ val isArrayEquals = Arrays.equals(expected, resp.data.get)
+ assertTrue(isArrayEquals, "Retrieved data should have contained " + Util.printArray(expected, true)
+ + " (" + new String(expected) + "), but instead we received " + Util.printArray(resp.data.get, true) + " (" + new String(resp.data.get) +")")
+ isArrayEquals
}
def assertSuccess(resp: GetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
@@ -96,6 +97,46 @@
status == KeyDoesNotExist
}
+ def assertTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer]) {
+ assertEquals(topoResp.view.topologyId, 2)
+ assertEquals(topoResp.view.members.size, 2)
+ assertAddressEquals(topoResp.view.members.head, servers.head.getAddress)
+ assertAddressEquals(topoResp.view.members.tail.head, servers.tail.head.getAddress)
+ }
+
+ def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress) {
+ assertEquals(actual.host, expected.host)
+ assertEquals(actual.port, expected.port)
+ }
+
+ def assertHashTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer], hashIds: List[Map[String, Int]]) {
+ val hashTopologyResp = topoResp.asInstanceOf[HashDistAwareResponse]
+ assertEquals(hashTopologyResp.view.topologyId, 2)
+ assertEquals(hashTopologyResp.view.members.size, 2)
+ assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, hashIds.head)
+ assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, hashIds.tail.head)
+ assertEquals(hashTopologyResp.numOwners, 2)
+ assertEquals(hashTopologyResp.hashFunction, 1)
+ assertEquals(hashTopologyResp.hashSpace, 10240)
+ }
+
+ def assertNoHashTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer], hashIds: List[Map[String, Int]]) {
+ val hashTopologyResp = topoResp.asInstanceOf[HashDistAwareResponse]
+ assertEquals(hashTopologyResp.view.topologyId, 2)
+ assertEquals(hashTopologyResp.view.members.size, 2)
+ assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, hashIds.head)
+ assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, hashIds.tail.head)
+ assertEquals(hashTopologyResp.numOwners, 0)
+ assertEquals(hashTopologyResp.hashFunction, 0)
+ assertEquals(hashTopologyResp.hashSpace, 0)
+ }
+
+ def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress, expectedHashIds: Map[String, Int]) {
+ assertEquals(actual.host, expected.host)
+ assertEquals(actual.port, expected.port)
+ assertEquals(actual.hashIds, expectedHashIds)
+ }
+
}
object UniquePortThreadLocal extends ThreadLocal[Int] {
_______________________________________________
infinispan-commits mailing list
infinispan-commits(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-commits
14 years, 10 months