[jboss-cvs] JBoss Messaging SVN: r1908 - in trunk/src/main/org/jboss/messaging/core/plugin: . postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 5 22:36:52 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-05 22:36:50 -0500 (Fri, 05 Jan 2007)
New Revision: 1908

Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
Log:
preventing transitory failures at shutdown, because listeners were not unregistered

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java	2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java	2007-01-06 03:36:50 UTC (rev 1908)
@@ -152,7 +152,6 @@
    {
       log.debug(this + " stopped");
    }
-    
    
    // Protected ----------------------------------------------------------     
    

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-06 03:36:50 UTC (rev 1908)
@@ -100,7 +100,7 @@
 public class DefaultClusteredPostOffice extends DefaultPostOffice
    implements ClusteredPostOffice, PostOfficeInternal, Replicator
 {
-   // Constants -----------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
 
@@ -110,7 +110,7 @@
    // Key for looking up node id -> failed over for node id mapping from replicated data
    public static final String FAILED_OVER_FOR_KEY = "FAILED_OVER_FOR";
 
-   // Static --------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
 
    /**
     * @param map - Map<Integer(nodeID)-Integer(failoverNodeID)>
@@ -146,7 +146,7 @@
       return sb.toString();
    }
 
-   // Attributes ----------------------------------------------------
+   // Attributes -----------------------------------------------------------------------------------
 
    // Used for failure testing
 
@@ -160,7 +160,8 @@
 
    private String groupName;
 
-   private boolean started;
+   private volatile boolean started;
+   private volatile boolean stopping;
 
    private ChannelFactory channelFactory;
 
@@ -212,7 +213,7 @@
    private QueuedExecutor viewExecutor;
 
 
-   // Constructors --------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
    /*
     * Constructor using Element for configuration
@@ -276,7 +277,7 @@
       this.channelFactory = channelFactory;
    }
 
-   // MessagingComponent overrides ----------------------------------
+   // MessagingComponent overrides -----------------------------------------------------------------
 
    public synchronized void start() throws Exception
    {
@@ -299,7 +300,7 @@
       MembershipListener ml = new ControlMembershipListener();
       RequestHandler rh = new PostOfficeRequestHandler();
 
-      //Register as a listener for nodeid-adress mapping events
+      // register as a listener for nodeid-adress mapping events
       nodeAddressMapListener = new NodeAddressMapListener();
 
       registerListener(nodeAddressMapListener);
@@ -307,21 +308,16 @@
       this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
 
       Receiver r = new DataReceiver();
-
       asyncChannel.setReceiver(r);
 
       syncChannel.connect(groupName);
-
       asyncChannel.connect(groupName);
 
       super.start();
 
       Address syncAddress = syncChannel.getLocalAddress();
-
       Address asyncAddress = asyncChannel.getLocalAddress();
-
       PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
-
       put(ADDRESS_INFO_KEY, info);
 
       statsSender.start();
@@ -333,31 +329,39 @@
 
    public synchronized void stop(boolean sendNotification) throws Exception
    {
+      if (trace) { log.trace(this + " stopping"); }
+
       if (!started)
       {
          log.warn("Attempt to stop() but " + this + " is not started");
+         return;
       }
-      else
-      {
-         syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
 
-         super.stop(sendNotification);
+      stopping = true;
 
-         unregisterListener(nodeAddressMapListener);
+      syncSendRequest(new LeaveClusterRequest(getNodeId()));
 
-         statsSender.stop();
+      statsSender.stop();
 
-         syncChannel.close();
+      super.stop(sendNotification);
 
-         asyncChannel.close();
+      //  TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+      asyncChannel.setReceiver(null);
 
-         started = false;
+      unregisterListener(nodeAddressMapListener);
 
-         if (trace) { log.trace("Stopped " + this); }
-      }
+      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+      syncChannel.close();
+
+      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+      asyncChannel.close();
+
+      started = false;
+
+      log.debug(this + " stopped");
    }
 
-   // NotificationBroadcaster implementation ------------------------
+   // NotificationBroadcaster implementation -------------------------------------------------------
 
    public void addNotificationListener(NotificationListener listener,
                                        NotificationFilter filter,
@@ -377,7 +381,7 @@
       return new MBeanNotificationInfo[0];
    }
 
-   // Peer implementation -------------------------------------------
+   // Peer implementation --------------------------------------------------------------------------
 
    public Set getNodeIDView()
    {
@@ -418,7 +422,7 @@
       return nodeIDView;
    }
 
-   // ClusteredPostOffice implementation ----------------------------
+   // ClusteredPostOffice implementation -----------------------------------------------------------
 
    public Binding bindClusteredQueue(Condition condition, LocalClusteredQueue queue) throws Exception
    {
@@ -484,7 +488,7 @@
       }
    }
 
-   // PostOfficeInternal implementation -----------------------------
+   // PostOfficeInternal implementation ------------------------------------------------------------
 
    /*
     * Called when another node adds a binding
@@ -994,7 +998,7 @@
       }
    }
 
-   // Replicator implementation -------------------------------------
+   // Replicator implementation --------------------------------------------------------------------
 
    public void put(Serializable key, Serializable replicant) throws Exception
    {
@@ -1061,7 +1065,7 @@
       return failoverMapper;
    }
 
-   // Public --------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------
 
    public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
    {
@@ -1456,9 +1460,9 @@
       return holdingArea.values();
    }
 
-   // Package protected ---------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
 
-   // Protected -----------------------------------------------------
+   // Protected ------------------------------------------------------------------------------------
 
    protected void addToNameMap(Binding binding)
    {
@@ -1618,7 +1622,7 @@
       return new DefaultBinding(nodeID, condition, queue, failed);
    }
 
-   // Private -------------------------------------------------------
+   // Private --------------------------------------------------------------------------------------
 
    private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
       throws Exception
@@ -2214,7 +2218,7 @@
       log.debug(this + " sent " + notificationType + " JMX notification");
    }
 
-   // Inner classes -------------------------------------------------------------------
+   // Inner classes --------------------------------------------------------------------------------
 
    /*
     * This class is used to manage state on the control channel
@@ -2223,6 +2227,11 @@
    {
       public byte[] getState()
       {
+         if (stopping)
+         {
+            return null;
+         }
+
          try
          {
             lock.writeLock().acquire();
@@ -2251,10 +2260,19 @@
 
       public void receive(Message message)
       {
+         if (stopping)
+         {
+            return;
+         }
       }
 
       public void setState(byte[] bytes)
       {
+         if (stopping)
+         {
+            return;
+         }
+
          if (bytes != null)
          {
             try
@@ -2298,16 +2316,21 @@
    {
       public void block()
       {
-         //NOOP
+         // NOOP
       }
 
       public void suspect(Address address)
       {
-         //NOOP
+         // NOOP
       }
 
       public void viewAccepted(View newView)
       {
+         if (stopping)
+         {
+            return;
+         }
+
          try
          {
             // We queue up changes and execute them asynchronously.
@@ -2325,7 +2348,7 @@
 
       public byte[] getState()
       {
-         //NOOP
+         // NOOP
          return null;
       }
    }
@@ -2341,9 +2364,7 @@
 
       public void run()
       {
-         //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
-         //TODO:     can't we do the same since this is pretty useful?
-         log.info(DefaultClusteredPostOffice.this  + " got new view: " + newView);
+         log.info(DefaultClusteredPostOffice.this  + " got new view " + newView);
 
          // JGroups will make sure this method is never called by more than one thread concurrently
 
@@ -2447,6 +2468,11 @@
    {
       public Object handle(Message message)
       {
+         if (stopping)
+         {
+            return null;
+         }
+
          if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
 
          try

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2007-01-05 23:22:23 UTC (rev 1907)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2007-01-06 03:36:50 UTC (rev 1908)
@@ -4,7 +4,6 @@
 import java.io.DataOutputStream;
 
 /**
- * 
  * A LeaveClusterRequest
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -12,14 +11,21 @@
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
- *
  */
 public class LeaveClusterRequest extends ClusterRequest
 {
+   // Constants ------------------------------------------------------------------------------------
+
    static final int TYPE = 11;
 
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
    private int nodeId;
 
+   // Constructors ---------------------------------------------------------------------------------
+
    public LeaveClusterRequest(int nodeId)
    {
       this.nodeId=nodeId;
@@ -27,16 +33,37 @@
 
    /**
     * This constructor only exist because it's an Streamable requirement.
-    * @see ClusterRequest#createFromStream(java.io.DataInputStream)  
+    * @see ClusterRequest#createFromStream(java.io.DataInputStream)
     */
    public LeaveClusterRequest()
    {
    }
 
+   // Streamable implementation --------------------------------------------------------------------
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeInt(nodeId);
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+      nodeId = in.readInt();
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public String toString()
+   {
+      return "LeaveClusterRequest[NID="  + nodeId + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
    Object execute(PostOfficeInternal office) throws Throwable
    {
       office.handleNodeLeft(nodeId);
-      
+
       return null;
    }
 
@@ -45,13 +72,10 @@
       return TYPE;
    }
 
-   public void write(DataOutputStream out) throws Exception
-   {
-      out.writeInt(nodeId);
-   }
+   // Protected ------------------------------------------------------------------------------------
 
-   public void read(DataInputStream in) throws Exception
-   {
-      nodeId = in.readInt();
-   }
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
 }




More information about the jboss-cvs-commits mailing list