[jboss-cvs] JBoss Messaging SVN: r1908 - in trunk/src/main/org/jboss/messaging/core/plugin: . postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 5 22:36:52 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-05 22:36:50 -0500 (Fri, 05 Jan 2007)
New Revision: 1908
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
Log:
preventing transitory failures at shutdown, because listeners were not unregistered
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java 2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java 2007-01-06 03:36:50 UTC (rev 1908)
@@ -152,7 +152,6 @@
{
log.debug(this + " stopped");
}
-
// Protected ----------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-06 03:36:50 UTC (rev 1908)
@@ -100,7 +100,7 @@
public class DefaultClusteredPostOffice extends DefaultPostOffice
implements ClusteredPostOffice, PostOfficeInternal, Replicator
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
@@ -110,7 +110,7 @@
// Key for looking up node id -> failed over for node id mapping from replicated data
public static final String FAILED_OVER_FOR_KEY = "FAILED_OVER_FOR";
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
/**
* @param map - Map<Integer(nodeID)-Integer(failoverNodeID)>
@@ -146,7 +146,7 @@
return sb.toString();
}
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
// Used for failure testing
@@ -160,7 +160,8 @@
private String groupName;
- private boolean started;
+ private volatile boolean started;
+ private volatile boolean stopping;
private ChannelFactory channelFactory;
@@ -212,7 +213,7 @@
private QueuedExecutor viewExecutor;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
/*
* Constructor using Element for configuration
@@ -276,7 +277,7 @@
this.channelFactory = channelFactory;
}
- // MessagingComponent overrides ----------------------------------
+ // MessagingComponent overrides -----------------------------------------------------------------
public synchronized void start() throws Exception
{
@@ -299,7 +300,7 @@
MembershipListener ml = new ControlMembershipListener();
RequestHandler rh = new PostOfficeRequestHandler();
- //Register as a listener for nodeid-adress mapping events
+ // register as a listener for nodeid-adress mapping events
nodeAddressMapListener = new NodeAddressMapListener();
registerListener(nodeAddressMapListener);
@@ -307,21 +308,16 @@
this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
Receiver r = new DataReceiver();
-
asyncChannel.setReceiver(r);
syncChannel.connect(groupName);
-
asyncChannel.connect(groupName);
super.start();
Address syncAddress = syncChannel.getLocalAddress();
-
Address asyncAddress = asyncChannel.getLocalAddress();
-
PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
-
put(ADDRESS_INFO_KEY, info);
statsSender.start();
@@ -333,31 +329,39 @@
public synchronized void stop(boolean sendNotification) throws Exception
{
+ if (trace) { log.trace(this + " stopping"); }
+
if (!started)
{
log.warn("Attempt to stop() but " + this + " is not started");
+ return;
}
- else
- {
- syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
- super.stop(sendNotification);
+ stopping = true;
- unregisterListener(nodeAddressMapListener);
+ syncSendRequest(new LeaveClusterRequest(getNodeId()));
- statsSender.stop();
+ statsSender.stop();
- syncChannel.close();
+ super.stop(sendNotification);
- asyncChannel.close();
+ // TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+ asyncChannel.setReceiver(null);
- started = false;
+ unregisterListener(nodeAddressMapListener);
- if (trace) { log.trace("Stopped " + this); }
- }
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ syncChannel.close();
+
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ asyncChannel.close();
+
+ started = false;
+
+ log.debug(this + " stopped");
}
- // NotificationBroadcaster implementation ------------------------
+ // NotificationBroadcaster implementation -------------------------------------------------------
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter,
@@ -377,7 +381,7 @@
return new MBeanNotificationInfo[0];
}
- // Peer implementation -------------------------------------------
+ // Peer implementation --------------------------------------------------------------------------
public Set getNodeIDView()
{
@@ -418,7 +422,7 @@
return nodeIDView;
}
- // ClusteredPostOffice implementation ----------------------------
+ // ClusteredPostOffice implementation -----------------------------------------------------------
public Binding bindClusteredQueue(Condition condition, LocalClusteredQueue queue) throws Exception
{
@@ -484,7 +488,7 @@
}
}
- // PostOfficeInternal implementation -----------------------------
+ // PostOfficeInternal implementation ------------------------------------------------------------
/*
* Called when another node adds a binding
@@ -994,7 +998,7 @@
}
}
- // Replicator implementation -------------------------------------
+ // Replicator implementation --------------------------------------------------------------------
public void put(Serializable key, Serializable replicant) throws Exception
{
@@ -1061,7 +1065,7 @@
return failoverMapper;
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
{
@@ -1456,9 +1460,9 @@
return holdingArea.values();
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
protected void addToNameMap(Binding binding)
{
@@ -1618,7 +1622,7 @@
return new DefaultBinding(nodeID, condition, queue, failed);
}
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
throws Exception
@@ -2214,7 +2218,7 @@
log.debug(this + " sent " + notificationType + " JMX notification");
}
- // Inner classes -------------------------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
/*
* This class is used to manage state on the control channel
@@ -2223,6 +2227,11 @@
{
public byte[] getState()
{
+ if (stopping)
+ {
+ return null;
+ }
+
try
{
lock.writeLock().acquire();
@@ -2251,10 +2260,19 @@
public void receive(Message message)
{
+ if (stopping)
+ {
+ return;
+ }
}
public void setState(byte[] bytes)
{
+ if (stopping)
+ {
+ return;
+ }
+
if (bytes != null)
{
try
@@ -2298,16 +2316,21 @@
{
public void block()
{
- //NOOP
+ // NOOP
}
public void suspect(Address address)
{
- //NOOP
+ // NOOP
}
public void viewAccepted(View newView)
{
+ if (stopping)
+ {
+ return;
+ }
+
try
{
// We queue up changes and execute them asynchronously.
@@ -2325,7 +2348,7 @@
public byte[] getState()
{
- //NOOP
+ // NOOP
return null;
}
}
@@ -2341,9 +2364,7 @@
public void run()
{
- //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
- //TODO: can't we do the same since this is pretty useful?
- log.info(DefaultClusteredPostOffice.this + " got new view: " + newView);
+ log.info(DefaultClusteredPostOffice.this + " got new view " + newView);
// JGroups will make sure this method is never called by more than one thread concurrently
@@ -2447,6 +2468,11 @@
{
public Object handle(Message message)
{
+ if (stopping)
+ {
+ return null;
+ }
+
if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
try
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2007-01-06 03:36:50 UTC (rev 1908)
@@ -4,7 +4,6 @@
import java.io.DataOutputStream;
/**
- *
* A LeaveClusterRequest
*
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -12,14 +11,21 @@
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
- *
*/
public class LeaveClusterRequest extends ClusterRequest
{
+ // Constants ------------------------------------------------------------------------------------
+
static final int TYPE = 11;
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
private int nodeId;
+ // Constructors ---------------------------------------------------------------------------------
+
public LeaveClusterRequest(int nodeId)
{
this.nodeId=nodeId;
@@ -27,16 +33,37 @@
/**
* This constructor only exist because it's an Streamable requirement.
- * @see ClusterRequest#createFromStream(java.io.DataInputStream)
+ * @see ClusterRequest#createFromStream(java.io.DataInputStream)
*/
public LeaveClusterRequest()
{
}
+ // Streamable implementation --------------------------------------------------------------------
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(nodeId);
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ nodeId = in.readInt();
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String toString()
+ {
+ return "LeaveClusterRequest[NID=" + nodeId + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
Object execute(PostOfficeInternal office) throws Throwable
{
office.handleNodeLeft(nodeId);
-
+
return null;
}
@@ -45,13 +72,10 @@
return TYPE;
}
- public void write(DataOutputStream out) throws Exception
- {
- out.writeInt(nodeId);
- }
+ // Protected ------------------------------------------------------------------------------------
- public void read(DataInputStream in) throws Exception
- {
- nodeId = in.readInt();
- }
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
}
More information about the jboss-cvs-commits
mailing list