[jboss-cvs] JBoss Messaging SVN: r2452 - in trunk: src/main/org/jboss/jms/client/container and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 26 19:43:46 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-02-26 19:43:45 -0500 (Mon, 26 Feb 2007)
New Revision: 2452

Modified:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/FailoverValve2.java
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/tests/etc/container-qalab.xml
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
minor

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -100,7 +100,8 @@
             remotingConnection.setFailed();
          }
          
-         //Note - failover doesn't occur until _after_ the above check - so the next comment belongs here
+         // Note - failover doesn't occur until _after_ the above check - so the next comment
+         // belongs here
          log.debug(this + " starting client-side failover");
          
          // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,

Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -53,18 +53,18 @@
    
    private boolean locked;
 
+   // Constructors ---------------------------------------------------------------------------------
+
    public FailoverValve2()
    {
       trace = log.isTraceEnabled();
-      
+
       if (trace)
       {
          threads = new HashSet();
       }
    }
 
-   // Constructors ---------------------------------------------------------------------------------
-
    // Public ---------------------------------------------------------------------------------------
 
    public synchronized void enter()

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -174,10 +174,10 @@
       {                  
          if (isClosing)
          {
-            //We make sure we remove ourself AFTER the invocation has been made
-            //otherwise in a failover situation we would end up divorced from the hierarchy
-            //and failover will not occur properly since failover would not be able to
-            //traverse the hierarchy and update the delegates properly
+            // We make sure we remove ourself AFTER the invocation has been made otherwise in a
+            // failover situation we would end up divorced from the hierarchy and failover will not
+            // occur properly since failover would not be able to traverse the hierarchy and update
+            // the delegates properly
             removeSelf(invocation);
             
             closing();

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -34,7 +34,6 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.util.MessageQueueNameHelper;
-import org.jboss.logging.Logger;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -52,17 +51,15 @@
  */
 public class ConsumerAspect
 {
-   // Constants -----------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
    
-   private static final Logger log = Logger.getLogger(ConsumerAspect.class);
-   
-   // Static --------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
 
-   // Attributes ----------------------------------------------------
+   // Attributes -----------------------------------------------------------------------------------
 
-   // Constructors --------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
-   // Public --------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------
 
    public Object handleCreateConsumerDelegate(Invocation invocation) throws Throwable
    {
@@ -87,7 +84,9 @@
       String queueName = null;
       if (consumerState.getSubscriptionName() != null)
       {
-         queueName = MessageQueueNameHelper.createSubscriptionName(connectionState.getClientID(), consumerState.getSubscriptionName());
+         queueName = MessageQueueNameHelper.
+            createSubscriptionName(connectionState.getClientID(),
+                                   consumerState.getSubscriptionName());
       }
       else if (consumerState.getDestination().isQueue())
       {
@@ -198,11 +197,11 @@
       return getState(invocation).getSelector();
    }
    
-   // Package protected ---------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
 
-   // Protected -----------------------------------------------------
+   // Protected ------------------------------------------------------------------------------------
 
-   // Private -------------------------------------------------------
+   // Private --------------------------------------------------------------------------------------
    
    private ConsumerState getState(Invocation inv)
    {
@@ -215,5 +214,5 @@
       return state.getMessageCallbackHandler();      
    }
    
-   // Inner classes -------------------------------------------------
+   // Inner classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -259,36 +259,6 @@
          });
    }
    
-   private void handleMessageInternal(Object message) throws Exception
-   {
-      MessageProxy proxy = (MessageProxy) message;
-
-      if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
-
-      synchronized (mainLock)
-      {
-         if (closed)
-         {             
-            // Ignore
-            if (trace) { log.trace(this + " is closed, so ignore message"); }
-            return;
-         }
-
-         proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-                        
-         //Add it to the buffer
-         buffer.addLast(proxy, proxy.getJMSPriority());         
-         
-         lastDeliveryId = proxy.getDeliveryId();
-         
-         if (trace) { log.trace(this + " added message(s) to the buffer"); }
-         
-         messageAdded(); 
-         
-         checkStop();         
-      }
-   }
-         
    public void setMessageListener(MessageListener listener) throws JMSException
    {     
       synchronized (mainLock)
@@ -565,7 +535,37 @@
    // Protected ------------------------------------------------------------------------------------
             
    // Private --------------------------------------------------------------------------------------
-   
+
+   private void handleMessageInternal(Object message) throws Exception
+   {
+      MessageProxy proxy = (MessageProxy) message;
+
+      if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
+
+      synchronized (mainLock)
+      {
+         if (closed)
+         {
+            // Ignore
+            if (trace) { log.trace(this + " is closed, so ignore message"); }
+            return;
+         }
+
+         proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+         //Add it to the buffer
+         buffer.addLast(proxy, proxy.getJMSPriority());
+
+         lastDeliveryId = proxy.getDeliveryId();
+
+         if (trace) { log.trace(this + " added message(s) to the buffer"); }
+
+         messageAdded();
+
+         checkStop();
+      }
+   }
+
    private void checkStop()
    {
       int size = buffer.size();
@@ -640,9 +640,7 @@
          log.warn("Thread interrupted", e);
       }
    }
-   
 
-   
    private void queueRunner(ListenerRunner runner)
    {
       try

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -41,7 +41,6 @@
 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
 
 /**
- * 
  * State corresponding to a connection. This state is acessible inside aspects/interceptors.
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -36,18 +36,16 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager.ReferenceInfo;
 
 /**
- * A PagingChannel
+ * This channel implementation automatically pages message references to and from storage to prevent
+ * more than a maximum number of references being stored in memory at once.
  * 
- * This channel implementation automatically pages message references to and from storage to prevent more
- * than a maximum number of references being stored in memory at once.
- * 
- * This allows us to support logical channels holding many millions of messages without running out of memory.
+ * This allows us to support logical channels holding many millions of messages without running out
+ * of memory.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
- *
  */
 public abstract class PagingChannelSupport extends ChannelSupport
 {
@@ -88,17 +86,7 @@
    protected long nextPagingOrder;
    
    /**
-    * Constructor with default paging params
-    * @param channelID
-    * @param ms
-    * @param pm
-    * @param mm
-    * @param acceptReliableMessages
-    * @param recoverable
-    * @param fullSize
-    * @param pageSize
-    * @param downCacheSize
-    * @param executor
+    * Constructor with default paging params.
     */
    public PagingChannelSupport(long channelID, MessageStore ms, PersistenceManager pm,
                                boolean acceptReliableMessages, boolean recoverable,                        
@@ -112,16 +100,7 @@
    }
    
    /**
-    * Constructor specifying paging params
-    * @param channelID
-    * @param ms
-    * @param pm
-    * @param acceptReliableMessages
-    * @param recoverable
-    * @param executor
-    * @param fullSize
-    * @param pageSize
-    * @param downCacheSize
+    * Constructor specifying paging params.
     */
    public PagingChannelSupport(long channelID, MessageStore ms, PersistenceManager pm,
                                boolean acceptReliableMessages, boolean recoverable,                        
@@ -528,8 +507,6 @@
          nextPagingOrder = ili.getMaxPageOrdering().longValue() + 1;
                            
          paging = true;
-         
-         log.info("set paging to true");
       }
       else
       {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -64,9 +64,8 @@
 import org.jboss.messaging.util.Util;
 
 /**
+ * JDBC implementation of PersistenceManager.
  *  
- * JDBC implementation of PersistenceManager 
- *  
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -28,7 +28,7 @@
 import org.jboss.messaging.core.tx.Transaction;
 
 /**
- * The interface to the persistence manager
+ * The interface to the persistence manager.
  *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -64,13 +64,15 @@
     
    void updatePageOrder(long channelID, List references) throws Exception;
    
-   void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception;
+   void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num)
+      throws Exception;
    
    List getPagedReferenceInfos(long channelID, long orderStart, int number) throws Exception;
    
    InitialLoadInfo loadFromStart(long channelID, int fullSize) throws Exception;
    
-   InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception;  
+   InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad,
+                                long firstPagingOrder, long nextPagingOrder) throws Exception;
      
    List getMessages(List messageIds) throws Exception;
          
@@ -86,9 +88,8 @@
 
    boolean referenceExists(long messageID) throws Exception;
 
-   // Interface value classes
-   //---------------------------------------------------------------
-   
+   // Interface value classes ----------------------------------------------------------------------
+
    class MessageChannelPair
    {
       private Message message;

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -21,9 +21,6 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.Iterator;
-import java.util.Map;
-
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.Filter;
@@ -34,7 +31,6 @@
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager.ReferenceInfo;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.Future;
@@ -114,11 +110,10 @@
       
    public QueueStats getStats()
    {      
-      //Currently we only return the current message reference count for the channel
-      //Note we are only interested in the number of refs in the main queue, not
-      //in any deliveries
-      //Also we are only interested in the value obtained after delivery is complete.
-      //This is so we don't end up with transient values since delivery is half way through
+      // Currently we only return the current message reference count for the channel. Note we are
+      // only interested in the number of refs in the main queue, not in any deliveries. Also we are
+      // only interested in the value obtained after delivery is complete. This is so we don't end
+      // up with transient values since delivery is half way through.
       
       int cnt = getRefCount();
       
@@ -126,8 +121,8 @@
       {
          lastCount = cnt;
          
-         //We only return stats if it has changed since last time - this is so when we only
-         //broadcast data when necessary
+         // We only return stats if it has changed since last time - this is so when we only
+         // broadcast data when necessary.
          return new QueueStats(name, cnt);
       }
       else
@@ -181,7 +176,8 @@
    }
    
    public void handlePullMessagesResult(RemoteQueueStub remoteQueue, Message message,
-                                        long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception
+                                        long holdingTxId, boolean failBeforeCommit,
+                                        boolean failAfterCommit) throws Exception
    { 
       //This needs to be run on a different thread to the one used by JGroups to deliver the message
       //to avoid deadlock
@@ -192,7 +188,8 @@
    }
    
    //TODO it's not ideal that we need to pass in a PullMessagesRequest
-   public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) throws Exception
+   public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId,
+                                          PullMessagesRequest tx) throws Exception
    {
       //This needs to be run on a different thread to the one used by JGroups to deliver the message
       //to avoid deadlock
@@ -224,30 +221,25 @@
       return ((Integer)result.getResult()).intValue();
    }
    
-   /*
-    * Merge the contents of one queue with another - this happens at failover when
-    * a queue is failed over to another node, but a queue with the same name already exists
-    * In this case we merge the two queues
+   /**
+    * Merge the contents of one queue with another - this happens at failover when a queue is failed
+    * over to another node, but a queue with the same name already exists. In this case we merge the
+    * two queues.
     */
    public void mergeIn(RemoteQueueStub remoteQueue) throws Exception
    {
       if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
            
-      log.info("queue is paging:" + this.paging + " message refs size " +
-               this.messageRefs.size() + " fullsize:" + this.fullSize +
-               " delivering:" + this.deliveringCount.get());
-      
       synchronized (refLock)
       {
          flushDownCache();
                   
          PersistenceManager.InitialLoadInfo ili =
-            pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(), firstPagingOrder, nextPagingOrder);
+            pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(),
+                            firstPagingOrder, nextPagingOrder);
             
          if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }            
-                           
-         log.info("firstpageord:" + ili.getMinPageOrdering() + " lastpageord:" + ili.getMaxPageOrdering());
-         
+
          doLoad(ili);         
          
          deliverInternal();

Modified: trunk/tests/etc/container-qalab.xml
===================================================================
--- trunk/tests/etc/container-qalab.xml	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/tests/etc/container-qalab.xml	2007-02-27 00:43:45 UTC (rev 2452)
@@ -35,7 +35,6 @@
          <password>messaging</password>
       </database-configuration>
 
-
       <database-configuration name="oracle">
          <url>jdbc:oracle:thin:@dev01-priv:1521:qadb01</url>
          <driver>oracle.jdbc.driver.OracleDriver</driver>

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-02-26 23:59:44 UTC (rev 2451)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-02-27 00:43:45 UTC (rev 2452)
@@ -944,10 +944,12 @@
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
     */
    public static void deployQueue(String name, String jndiName, int fullSize, int pageSize,
-                                  int downCacheSize, int serverIndex, boolean clustered) throws Exception
+                                  int downCacheSize, int serverIndex, boolean clustered)
+      throws Exception
    {
       insureStarted();
-      servers[serverIndex].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+      servers[serverIndex].getServer().
+         deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
    }
 
    /**




More information about the jboss-cvs-commits mailing list