[infinispan-commits] Infinispan SVN: r2476 - in branches/4.2.x/core/src/main/java/org/infinispan: distribution and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sun Oct 3 14:43:10 EDT 2010
Author: vblagojevic at jboss.com
Date: 2010-10-03 14:43:09 -0400 (Sun, 03 Oct 2010)
New Revision: 2476
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
Log:
[ISPN-634] - Merging on distributed caches can lead rehash failure
[ISPN-668] - Node fails to join a cluster after a shutdown or crash of a node in cluster. Joining node can be restart or new (different port) one.
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-10-03 09:53:56 UTC (rev 2475)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-10-03 18:43:09 UTC (rev 2476)
@@ -45,7 +45,7 @@
public static final int COMMAND_ID = 17;
public enum Type {
- JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
+ JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
}
Type type;
@@ -120,9 +120,6 @@
case JOIN_REHASH_END:
distributionManager.informRehashOnJoin(sender, false);
return null;
- case JOIN_COMPLETE:
- distributionManager.notifyJoinComplete(sender);
- return null;
case PULL_STATE_JOIN:
return pullStateForJoin();
case PULL_STATE_LEAVE:
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-10-03 09:53:56 UTC (rev 2475)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-10-03 18:43:09 UTC (rev 2476)
@@ -110,13 +110,6 @@
List<Address> requestPermissionToJoin(Address joiner);
/**
- * Notifies a coordinator when a join completes
- *
- * @param joiner joiner who has completed.
- */
- void notifyJoinComplete(Address joiner);
-
- /**
* This will cause all nodes to add the joiner to their consistent hash instance (usually by creating a {@link org.infinispan.distribution.UnionConsistentHash}
*
* @param joiner address of joiner
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-03 09:53:56 UTC (rev 2475)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-03 18:43:09 UTC (rev 2476)
@@ -69,50 +69,74 @@
/**
* The default distribution manager implementation
*
- * @author Manik Surtani
+ * @author Manik Surtani, Vladimir Blagojevic
* @since 4.0
*/
@MBean(objectName = "DistributionManager", description = "Component that handles distribution of content across a cluster")
public class DistributionManagerImpl implements DistributionManager {
private final Log log = LogFactory.getLog(DistributionManagerImpl.class);
- private final boolean trace = log.isTraceEnabled();
- Configuration configuration;
- volatile ConsistentHash consistentHash, oldConsistentHash;
- Address self;
- CacheLoaderManager cacheLoaderManager;
+
+ private Configuration configuration;
+ private volatile ConsistentHash consistentHash, oldConsistentHash;
+ private Address self;
+ private CacheLoaderManager cacheLoaderManager;
RpcManager rpcManager;
- CacheManagerNotifier notifier;
- int replCount;
- ViewChangeListener listener;
- CommandsFactory cf;
- LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue<Runnable>();
- ThreadFactory tf = new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setDaemon(true);
- t.setPriority(Thread.MIN_PRIORITY);
- t.setName("Rehasher-" + rpcManager.getTransport().getAddress());
- return t;
- }
- };
- ExecutorService rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, rehashQueue, tf);
+ private CacheManagerNotifier notifier;
- TransactionLogger transactionLogger = new TransactionLoggerImpl();
+ private ViewChangeListener listener;
+ private CommandsFactory cf;
+
+ private final ExecutorService rehashExecutor;
+
+ private final TransactionLogger transactionLogger = new TransactionLoggerImpl();
+
+ /**
+ * Rehash flag set by a rehash task associated with this DistributionManager
+ */
volatile boolean rehashInProgress = false;
- volatile Address joiner;
- static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
+
+
+ /**
+ * Address of a joiner node requesting to join Infinispan cluster. Each node in the cluster is
+ * aware of joiner's identity. After joiner successfully joins (or fails to join), joiner field is nullified
+ */
+ private volatile Address joiner;
+
+ private static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
+
private DataContainer dataContainer;
private InterceptorChain interceptorChain;
private InvocationContextContainer icc;
+
@ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state. If false, the join process is still in progress.")
@Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
private volatile boolean joinComplete = false;
- Future<Void> joinFuture;
- final List<Address> leavers = new CopyOnWriteArrayList<Address>();
- volatile Future<Void> leaveTaskFuture;
- final ReclosableLatch startLatch = new ReclosableLatch(false);
+
+ private Future<Void> joinFuture;
+ private final List<Address> leavers = new CopyOnWriteArrayList<Address>();
+ private volatile Future<Void> leaveTaskFuture;
+ private final ReclosableLatch startLatch = new ReclosableLatch(false);
+
+ /**
+ * Default constructor
+ */
+ public DistributionManagerImpl() {
+ super();
+ LinkedBlockingQueue<Runnable> rehashQueue = new LinkedBlockingQueue<Runnable>();
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setPriority(Thread.MIN_PRIORITY);
+ t.setName("Rehasher-" + rpcManager.getTransport().getAddress());
+ return t;
+ }
+ };
+ rehashExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, rehashQueue, tf);
+ }
+
@Inject
public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
@@ -131,14 +155,15 @@
@Start(priority = 20)
public void start() throws Exception {
- if (log.isTraceEnabled()) {
- log.trace("Starting distribution manager on " + getMyAddress());
- }
- replCount = configuration.getNumOwners();
+ log.trace("Starting distribution manager on " + getMyAddress());
listener = new ViewChangeListener();
notifier.addListener(listener);
join();
}
+
+ private int getReplCount(){
+ return configuration.getNumOwners();
+ }
private Address getMyAddress() {
return rpcManager != null ? rpcManager.getAddress() : null;
@@ -162,6 +187,7 @@
private void join() throws Exception {
startLatch.close();
+ setJoinComplete(false);
Transport t = rpcManager.getTransport();
List<Address> members = t.getMembers();
consistentHash = createConsistentHash(configuration, members);
@@ -169,6 +195,7 @@
if (members.size() > 1 && !t.getCoordinator().equals(self)) {
JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
joinFuture = rehashExecutor.submit(joinTask);
+ //task will set joinComplete flag
} else {
setJoinComplete(true);
}
@@ -246,20 +273,20 @@
boolean willReceiveLeaverState(Address leaver) {
ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;
int dist = ch.getDistance(leaver, self);
- return dist >= 0 && dist <= replCount;
+ return dist >= 0 && dist <= getReplCount();
}
public boolean isLocal(Object key) {
- return consistentHash == null || consistentHash.isKeyLocalToAddress(self, key, replCount);
+ return consistentHash == null || consistentHash.isKeyLocalToAddress(self, key, getReplCount());
}
public List<Address> locate(Object key) {
if (consistentHash == null) return Collections.singletonList(self);
- return consistentHash.locate(key, replCount);
+ return consistentHash.locate(key, getReplCount());
}
public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
- return locateAll(keys, replCount);
+ return locateAll(keys, getReplCount());
}
public Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners) {
@@ -308,51 +335,41 @@
@ManagedOperation(description = "Determines whether a given key is affected by an ongoing rehash, if any.")
@Operation(displayName = "Could key be affected by rehash?")
public boolean isAffectedByRehash(@Parameter(name = "key", description = "Key to check") Object key) {
- return transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, replCount).contains(self);
+ return transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, getReplCount()).contains(self);
}
public TransactionLogger getTransactionLogger() {
return transactionLogger;
}
- public List<Address> requestPermissionToJoin(Address joiner) {
- if (JOINER_CAS.compareAndSet(this, null, joiner)) {
- if (trace) log.trace("Allowing {0} to join", joiner);
+ public List<Address> requestPermissionToJoin(Address a) {
+ if (JOINER_CAS.compareAndSet(this, null, a)) {
+ log.trace("Allowing {0} to join", a);
return new LinkedList<Address>(consistentHash.getCaches());
} else {
- if (trace)
- log.trace("Not alowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+ log.trace("Not alowing {0} to join since there is a join already in progress for node {1}", a, joiner);
return null;
}
}
- public void notifyJoinComplete(Address joiner) {
- log.trace("Received notification that {0} has completed a join. Current 'joiner' flag is {1}, setting this to null.", joiner, this.joiner);
- if (this.joiner != null) {
- if (this.joiner.equals(joiner)) this.joiner = null;
- }
- }
-
- public void informRehashOnJoin(Address joiner, boolean starting) {
- log.trace("Informed of a JOIN by {0}. Starting? {1}", joiner, starting);
+ public void informRehashOnJoin(Address a, boolean starting) {
+ log.trace("Informed of a JOIN by {0}. Starting? {1}", a, starting);
if (!starting) {
if (consistentHash instanceof UnionConsistentHash) {
UnionConsistentHash uch = (UnionConsistentHash) consistentHash;
consistentHash = uch.getNewConsistentHash();
oldConsistentHash = null;
}
- rehashInProgress = false;
+ joiner = null;
} else {
ConsistentHash chOld = consistentHash;
if (chOld instanceof UnionConsistentHash) throw new RuntimeException("Not expecting a union CH!");
oldConsistentHash = chOld;
- this.joiner = joiner;
- rehashInProgress = true;
-
+ joiner = a;
ConsistentHash chNew;
chNew = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
List<Address> newAddresses = new LinkedList<Address>(chOld.getCaches());
- newAddresses.add(joiner);
+ newAddresses.add(a);
chNew.setCaches(newAddresses);
consistentHash = new UnionConsistentHash(chOld, chNew);
}
@@ -360,7 +377,7 @@
}
public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
- if (trace) log.trace("Apply state with " + state);
+ log.trace("Apply state with " + state);
for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
InternalCacheValue v = e.getValue();
@@ -376,9 +393,7 @@
public class ViewChangeListener {
@ViewChanged
public void handleViewChange(ViewChangedEvent e) {
- if (log.isTraceEnabled()) {
- log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
- }
+ log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
boolean started;
// how long do we wait for a startup?
if (e.isNeedsToRejoin()) {
@@ -428,9 +443,7 @@
}
public void setJoinComplete(boolean joinComplete) {
- if (log.isTraceEnabled()) {
- log.trace("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());
- }
+ log.debug("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());
this.joinComplete = joinComplete;
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-10-03 09:53:56 UTC (rev 2475)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-10-03 18:43:09 UTC (rev 2476)
@@ -50,8 +50,7 @@
public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
DataContainer dataContainer, DistributionManagerImpl dmi) {
- super(dmi, rpcManager, conf, commandsFactory, dataContainer);
- this.dataContainer = dataContainer;
+ super(dmi, rpcManager, conf, commandsFactory, dataContainer);
this.self = rpcManager.getTransport().getAddress();
}
@@ -91,52 +90,49 @@
chNew = createConsistentHash(configuration, chOld.getCaches(), self);
dmi.setConsistentHash(chNew);
-
- if (configuration.isRehashEnabled()) {
- // 3. Enable TX logging
- transactionLogger.enable();
-
- // 4. Broadcast new temp CH
- rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
-
- // 5. txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
-
- // 6. pull state from everyone.
- Address myAddress = rpcManager.getTransport().getAddress();
-
- RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
- // TODO I should be able to process state chunks from different nodes simultaneously!!
- List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
- List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
-
- // 7. Apply state
- for (Response r : resps) {
- if (r instanceof SuccessfulResponse) {
- Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
- dmi.applyState(chNew, state);
+ try {
+ if (configuration.isRehashEnabled()) {
+ // 3. Enable TX logging
+ transactionLogger.enable();
+
+ // 4. Broadcast new temp CH
+ rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+
+ // 5. txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
+
+ // 6. pull state from everyone.
+ Address myAddress = rpcManager.getTransport().getAddress();
+
+ RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
+ // TODO I should be able to process state chunks from different nodes simultaneously!!
+ List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
+ List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
+
+ // 7. Apply state
+ for (Response r : resps) {
+ if (r instanceof SuccessfulResponse) {
+ Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
+ dmi.applyState(chNew, state);
+ }
}
+
+ // 8. Drain logs
+ dmi.drainLocalTransactionLog();
+ unlocked = true;
+ } else {
+ rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+ if (trace) log.trace("Rehash not enabled, so not pulling state.");
+ }
+ } finally {
+ // 10.
+ rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+
+ if (configuration.isRehashEnabled()) {
+ // 11.
+ invalidateInvalidHolders(chOld, chNew);
}
-
- // 8. Drain logs
- dmi.drainLocalTransactionLog();
- } else {
- if (trace) log.trace("Rehash not enabled, so not pulling state.");
}
- unlocked = true;
- if (!configuration.isRehashEnabled()) {
- rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
- }
- // 10.
- rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
- rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
- configuration.getRehashRpcTimeout(), true);
-
- if (configuration.isRehashEnabled()) {
- // 11.
- invalidateInvalidHolders(chOld, chNew);
- }
-
} catch (Exception e) {
log.error("Caught exception!", e);
throw new CacheException("Unexpected exception", e);
More information about the infinispan-commits
mailing list