[jboss-cvs] JBossAS SVN: r65341 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 12 13:32:06 EDT 2007
Author: vblagojevic at jboss.com
Date: 2007-09-12 13:32:05 -0400 (Wed, 12 Sep 2007)
New Revision: 65341
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Log:
[JBAS-3599]Implement MessageListener.block() in HAPartitionImpl
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2007-09-12 16:24:57 UTC (rev 65340)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2007-09-12 17:32:05 UTC (rev 65341)
@@ -63,6 +63,7 @@
import org.jgroups.Event;
import org.jgroups.ExtendedMessageListener;
//import org.jgroups.JChannel;
+import org.jgroups.ExtendedMembershipListener;
import org.jgroups.MembershipListener;
import org.jgroups.MergeView;
import org.jgroups.Message;
@@ -89,7 +90,7 @@
*/
public class ClusterPartition
extends ServiceMBeanSupport
- implements MembershipListener, HAPartition,
+ implements ExtendedMembershipListener, HAPartition,
AsynchEventHandler.AsynchEventProcessor,
ClusterPartitionMBean
{
@@ -185,6 +186,8 @@
/** Whether to bind the partition into JNDI */
protected boolean bindIntoJndi = true;
+ private final ThreadGate flushBlockGate = new ThreadGate();
+
private RpcDispatcher dispatcher = null;
/**
@@ -615,8 +618,18 @@
log.info("Suspected member: " + suspected_mbr);
}
- public void block() {}
+ public void block()
+ {
+ flushBlockGate.close();
+ log.debug("Block processed at " + me);
+ }
+ public void unblock()
+ {
+ flushBlockGate.open();
+ log.debug("Unblock processed at " + me);
+ }
+
/** Notification of a cluster view change. This is done from the JG protocol
* handlder thread and we must be careful to not unduly block this thread.
* Because of this there are two types of listeners, synchronous and
@@ -804,6 +817,10 @@
MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+ if(channel.flushSupported())
+ {
+ flushBlockGate.await(getStateTransferTimeout());
+ }
if (excludeSelf)
{
if( trace )
@@ -928,6 +945,10 @@
MethodCall m = new MethodCall(objName + "." + methodName, args, types);
+ if(channel.flushSupported())
+ {
+ flushBlockGate.await(getStateTransferTimeout());
+ }
if (excludeSelf)
{
if( trace )
@@ -1798,6 +1819,51 @@
}
}
+ /**
+ * Copyright (c) 2005 Brian Goetz and Tim Peierls
+ * Released under the Creative Commons Attribution License
+ * (http://creativecommons.org/licenses/by/2.5)
+ * Official home: http://www.jcip.net
+ *
+ * ThreadGate <p/> Recloseable gate using wait and notifyAll
+ *
+ * @author Brian Goetz and Tim Peierls
+ */
+
+ private static class ThreadGate {
+ // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
+ private boolean isOpen;
+
+ private int generation;
+
+ public synchronized void close()
+ {
+ isOpen = false;
+ }
+
+ public synchronized void open()
+ {
+ ++generation;
+ isOpen = true;
+ notifyAll();
+ }
+
+ // BLOCKS-UNTIL: opened-since(generation on entry)
+ public synchronized void await() throws InterruptedException
+ {
+ int arrivalGeneration = generation;
+ while(!isOpen && arrivalGeneration == generation)
+ wait();
+ }
+
+ // BLOCKS-UNTIL: opened-since(generation on entry)
+ public synchronized void await(long timeout) throws InterruptedException
+ {
+ int arrivalGeneration = generation;
+ while(!isOpen && arrivalGeneration == generation)
+ wait(timeout);
+ }
+ }
private void setupLoggers(String partitionName)
{
More information about the jboss-cvs-commits
mailing list