[jboss-cvs] JBoss Messaging SVN: r3174 - in trunk: src/main/org/jboss/jms/client and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 5 11:14:57 EDT 2007


Author: timfox
Date: 2007-10-05 11:14:57 -0400 (Fri, 05 Oct 2007)
New Revision: 3174

Modified:
   trunk/src/etc/aop-messaging-client-debug.xml
   trunk/src/etc/aop-messaging-client.xml
   trunk/src/main/org/jboss/jms/client/Closeable.java
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/FailoverValve.java
   trunk/src/main/org/jboss/jms/client/FailoverValve2.java
   trunk/src/main/org/jboss/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
   trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-880


Modified: trunk/src/etc/aop-messaging-client-debug.xml
===================================================================
--- trunk/src/etc/aop-messaging-client-debug.xml	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/etc/aop-messaging-client-debug.xml	2007-10-05 15:14:57 UTC (rev 3174)
@@ -163,7 +163,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
       <advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>         
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
@@ -215,7 +215,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
       <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>    
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getDestination())">
@@ -276,7 +276,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setTimeToLive(..))">
       <advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>         
    </bind> 
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.ProducerAspect"/>
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">

Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/etc/aop-messaging-client.xml	2007-10-05 15:14:57 UTC (rev 3174)
@@ -145,7 +145,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
       <advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>         
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
@@ -195,7 +195,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
       <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>    
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getDestination())">
@@ -254,7 +254,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setTimeToLive(..))">
       <advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>         
    </bind> 
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing(..))">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.ProducerAspect"/>
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">

Modified: trunk/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/Closeable.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/Closeable.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -46,5 +46,5 @@
     * 
     * @throws JMSException
     */
-   long closing() throws JMSException;
+   long closing(long sequence) throws JMSException;
 }

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -179,7 +179,7 @@
          {
             log.debug(this + " aborted failover");
             ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
-            connDelegate.closing();
+            connDelegate.closing(-1);
             connDelegate.close();
             
             broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));

Modified: trunk/src/main/org/jboss/jms/client/FailoverValve.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -145,7 +145,7 @@
 
    public void close() throws InterruptedException
    {
-      log.debug(this + " closing ...");
+      log.debug(this + " close ...");
 
       // Before assuming a write lock, we need to release reentrant read locks.
       // When simultaneous threads are closing a valve (as simultaneous threads are capturing a

Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -105,7 +105,7 @@
 
    public synchronized void close()
    {
-      if (trace) { log.trace(this + " closing " + (locked ? "LOCKED" : "UNLOCKED") + " valve"); }
+      if (trace) { log.trace(this + " close " + (locked ? "LOCKED" : "UNLOCKED") + " valve"); }
 
       if (trace && threads.contains(Thread.currentThread()))
       {

Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -128,7 +128,7 @@
 
    public void close() throws JMSException
    {
-      delegate.closing();
+      delegate.closing(-1);
       delegate.close();
    }
 

Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -334,7 +334,7 @@
       
       try
       {      
-         sess.closing();
+         sess.closing(-1);
          sess.close();
       }
       catch (Throwable t)

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -93,7 +93,7 @@
 
    public void close() throws JMSException
    {
-      delegate.closing();
+      delegate.closing(-1);
       delegate.close();
    }
 

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -126,7 +126,7 @@
    
    public void close() throws JMSException
    {
-      delegate.closing();
+      delegate.closing(-1);
       delegate.close();
    }
    

Modified: trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -62,7 +62,7 @@
 
    public void close() throws JMSException
    {
-      delegate.closing();
+      delegate.closing(-1);
       delegate.close();
    }
 

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -171,7 +171,7 @@
 
    public void close() throws JMSException
    {
-      delegate.closing();
+      delegate.closing(-1);
       delegate.close();
    }
 

Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -344,7 +344,7 @@
    
    public void close(long lastDeliveryId) throws JMSException
    {     
-   	log.trace(this + " closing");
+   	log.trace(this + " close");
          	
    	//Wait for the last delivery to arrive
       waitForLastDelivery(lastDeliveryId);

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -302,7 +302,7 @@
          Closeable del = (Closeable)child.getDelegate();
          try
          {
-            del.closing();
+            del.closing(-1);
             del.close();
          }
          catch (Throwable t)

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -195,7 +195,7 @@
          
          cancelDeliveries(del, dels);        
       }
-                  
+
       return invocation.invokeNext();
    }      
    
@@ -626,7 +626,7 @@
          return null;
       }
 
-      if (trace) { log.trace("sending message NON-transactionally"); }
+      if (trace) { log.trace("sending message NON-transactionally"); }            
 
       return invocation.invokeNext();
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -105,9 +105,9 @@
       doInvoke(client, req);
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      RequestSupport req = new ClosingRequest(id, version);
+      RequestSupport req = new ClosingRequest(sequence, id, version);
 
       return ((Long)doInvoke(client, req)).longValue();
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -85,7 +85,6 @@
 
    public synchronized void establishCallback()
    {
-
       log.debug(" Establishing CFCallback\n");
 
       for (int server = delegates.length - 1; server >= 0; server--)

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -153,9 +153,9 @@
       doInvoke(client, req);
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      RequestSupport req = new ClosingRequest(id, version);
+      RequestSupport req = new ClosingRequest(sequence, id, version);
 
       return ((Long)doInvoke(client, req)).longValue();
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -122,9 +122,9 @@
       doInvoke(client, req);
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      RequestSupport req = new ClosingRequest(id, version);
+      RequestSupport req = new ClosingRequest(sequence, id, version);
 
       return ((Long)doInvoke(client, req)).longValue();
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -80,7 +80,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -32,6 +32,7 @@
 
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.Ack;
 import org.jboss.jms.delegate.BrowserDelegate;
 import org.jboss.jms.delegate.Cancel;
@@ -142,9 +143,9 @@
       doInvoke(client, req);
    }
 
-   public long closing() throws JMSException
-   {
-      RequestSupport req = new ClosingRequest(id, version);
+   public long closing(long sequence) throws JMSException
+   {   	   	
+      RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
 
       return ((Long)doInvoke(client, req)).longValue();
    }
@@ -440,11 +441,19 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
    
-   private long sequence;
-
    public void send(JBossMessage m, boolean checkForDuplicates) throws JMSException
    {   	
-   	long seq = m.isReliable() ? -1 : sequence++;
+   	long seq;
+   	if (m.isReliable())
+   	{
+   		seq = -1;
+   	}
+   	else
+   	{
+   		SessionState sstate = (SessionState)state;
+   		seq = sstate.getNPSendSequence();
+   		sstate.incNpSendSequence();
+   	}
    	
       RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, seq);
 

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -305,7 +305,7 @@
 
    public void stop()
    {
-      log.trace(this + " closing");
+      log.trace(this + " stop");
 
       // explicitly remove the callback listener, to avoid race conditions on server
       // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -108,7 +108,9 @@
    //In that case we want it to act as transacted, so when the session is subsequently enlisted the work can be converted into the
    //XA transaction
    private boolean treatAsNonTransactedWhenNotEnlisted = true;
-      
+   
+   private long npSendSequence;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -206,6 +208,8 @@
       String oldSessionID = sessionID;
       sessionID = newState.sessionID;
       
+      npSendSequence = 0;
+      
       // We need to clear anything waiting in the session executor - since there may be messages
       // from before failover waiting in there and we don't want them to get delivered after
       // failover.
@@ -455,6 +459,16 @@
       return sessionID;
    }
    
+   public long getNPSendSequence()
+   {
+   	return npSendSequence;
+   }
+   
+   public void incNpSendSequence()
+   {
+   	npSendSequence++;
+   }
+   
    public String toString()
    {
       return "SessionState[" + sessionID + "]";

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -359,7 +359,7 @@
             try
             {
       			log.debug("clPearing up state for connection " + sce);
-               sce.closing();
+               sce.closing(-1);
                sce.close();
                log.debug("cleared up state for connection " + sce);
             }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -207,7 +207,7 @@
       }
    }
          
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
       // Do nothing
       return -1;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -425,7 +425,7 @@
       } 
    }
    
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
       log.trace(this + " closing (noop)");    
       

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -326,7 +326,7 @@
 
    // Closeable implementation ---------------------------------------------------------------------
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
       try
       {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -127,6 +127,8 @@
    static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
    
    private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
+   
+   private static final long CLOSE_WAIT_TIMEOUT = 5 * 1000;
       
    // Static ---------------------------------------------------------------------------------------
 
@@ -174,6 +176,8 @@
    
    private boolean waitingToClose = false;
    
+   private Object waitLock = new Object();
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -328,15 +332,42 @@
       }
    }
       
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      // currently does nothing
-      if (trace) log.trace(this + " closing (noop)");
+      if (trace) log.trace(this + " closing");
       
+      // Wait for last np message to arrive
+      
+      if (sequence != 0)
+      {
+      	synchronized (waitLock)
+      	{      		
+      		long wait = CLOSE_WAIT_TIMEOUT;
+      		
+	      	while (sequence != expectedSequence && wait > 0)
+	      	{
+	      		long start = System.currentTimeMillis(); 
+	      		try
+	      		{
+	      			waitLock.wait();
+	      		}
+	      		catch (InterruptedException e)
+	      		{	      			
+	      		}
+	      		wait -= (System.currentTimeMillis() - start);
+	      	}
+	      	
+	      	if (wait <= 0)
+	      	{
+	      		log.warn("Timed out waiting for last message");
+	      	}
+      	}      	
+      }      
+      
       return -1;
    }
  
-   private long expectedSequence = 0;
+   private volatile long expectedSequence = 0;
    
    private Map<Long, JBossMessage> heldBack = new HashMap<Long, JBossMessage>();
    
@@ -345,7 +376,7 @@
    	throw new IllegalStateException("Should not be handled on the server");
    }
    
-   public synchronized void send(JBossMessage message, boolean checkForDuplicates, long thisSequence) throws JMSException
+   public void send(JBossMessage message, boolean checkForDuplicates, long thisSequence) throws JMSException
    {
       try
       {                
@@ -356,26 +387,30 @@
       		
       		//This is a workaround to allow us to use one way messages for np messages for performance
       		//reasons
-      		      	         
-      		if (thisSequence == expectedSequence)
-      		{
-      			do
-      			{
-      				connectionEndpoint.sendMessage(message, null, false); 
-      				
-         			expectedSequence++;
-         			
-         			message = (JBossMessage)heldBack.remove(expectedSequence);
-      				
-      			} while (message != null);
-      			
+      		
+      		synchronized (waitLock)
+      		{	      		      	        
+	      		if (thisSequence == expectedSequence)
+	      		{
+	      			do
+	      			{
+	      				connectionEndpoint.sendMessage(message, null, false); 
+	      				
+	         			expectedSequence++;
+	         			
+	         			message = (JBossMessage)heldBack.remove(expectedSequence);
+	      				
+	      			} while (message != null);	      			
+	      		}
+	      		else
+	      		{
+	      			//Not the expected one - add it to the map
+	      			
+	      			heldBack.put(thisSequence, message);      			
+	      		}
+	      		
+	      		waitLock.notify();
       		}
-      		else
-      		{
-      			//Not the expected one - add it to the map
-      			
-      			heldBack.put(thisSequence, message);      			
-      		}
       	}
       	else
       	{

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -60,9 +60,9 @@
       endpoint.close();
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      return endpoint.closing();
+      return endpoint.closing(sequence);
    }
 
    public void reset() throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -63,9 +63,9 @@
       endpoint.close();
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      return endpoint.closing();
+      return endpoint.closing(sequence);
    }
 
    public SessionDelegate createSessionDelegate(boolean transacted,

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -59,9 +59,9 @@
       endpoint.close();
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      return endpoint.closing();
+      return endpoint.closing(sequence);
    }
 
    public void changeRate(float newRate) throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -72,9 +72,9 @@
       endpoint.close();
    }
 
-   public long closing() throws JMSException
+   public long closing(long sequence) throws JMSException
    {
-      return endpoint.closing();
+      return endpoint.closing(sequence);
    }
 
    public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException

Modified: trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -36,18 +36,24 @@
  */
 public class ClosingRequest extends RequestSupport
 {
+	private long sequence;
+	
    public ClosingRequest()
    {      
    }
    
-   public ClosingRequest(String objectId, byte version)
+   public ClosingRequest(long sequence, String objectId, byte version)
    {
       super(objectId, PacketSupport.REQ_CLOSING, version);
+      
+      this.sequence = sequence;
    }
    
    public void read(DataInputStream is) throws Exception
    {
       super.read(is);
+      
+      this.sequence = is.readLong();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -59,7 +65,7 @@
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
       
-      long id = endpoint.closing();
+      long id = endpoint.closing(sequence);
       
       return new ClosingResponse(id);
    }
@@ -67,6 +73,9 @@
    public void write(DataOutputStream os) throws Exception
    {
       super.write(os);
+      
+      os.writeLong(sequence);
+      
       os.flush();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -48,11 +48,7 @@
 /**
  * 
  * This class handles connections to other nodes that are used to pull messages from remote queues to local queues
- * 
- *
- * TODO - clean closing of suckers
- * 
- * 
+ *  
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: $</tt>20 Jun 2007
  *

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -725,7 +725,7 @@
       
       public void testClosingRequest() throws Exception
       { 
-         RequestSupport req =  new ClosingRequest("23", (byte)77);
+         RequestSupport req =  new ClosingRequest(12, "23", (byte)77);
                  
          testPacket(req, PacketSupport.REQ_CLOSING);                           
       }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -189,10 +189,6 @@
             
             prod.send(tm);
          }
-         
-         //NP messages are sent async so if you close the connection too quickly you may lose the last
-         //one
-         Thread.sleep(1000);
       }
       finally
       {
@@ -236,7 +232,7 @@
                break;
             }
             
-            log.info("Got message " + tm.getText());
+            //log.info("Got message " + tm.getText());
             
             msgs.add(tm.getText());
 
@@ -299,7 +295,7 @@
             
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
+            //log.info("Got message " + tm.getText());
               
             assertEquals("message" + (i + start), tm.getText());
          } 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -271,7 +271,7 @@
          closed = true;
       }
 
-      public long closing() throws JMSException
+      public long closing(long seq) throws JMSException
       {
          return -1;
       }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2007-10-05 15:14:57 UTC (rev 3174)
@@ -576,6 +576,9 @@
          prod.send(tm5);
          prod.send(tm6);
          
+         //Give them time to arrive
+         Thread.sleep(3000);
+         
          ObjectName destObjectName = 
             new ObjectName("jboss.messaging.destination:service=Queue,name=QueueListMessages");
           




More information about the jboss-cvs-commits mailing list