[jboss-cvs] JBoss Messaging SVN: r3397 - in branches/Branch_Stable: src/main/org/jboss/jms/server/endpoint and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 3 08:56:39 EST 2007


Author: timfox
Date: 2007-12-03 08:56:39 -0500 (Mon, 03 Dec 2007)
New Revision: 3397

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Wait for last message before closing - knock on from remoting problem


Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-12-03 13:56:39 UTC (rev 3397)
@@ -458,7 +458,15 @@
    {   		
       boolean oneway = !(m.isReliable() || strictTck);
       
-      RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway);
+      long sequence = -1;
+      if (oneway)
+      {
+         SessionState sstate = (SessionState)state;
+         sequence = sstate.getNPSendSequence();
+         sstate.incNpSendSequence();
+      }
+      
+      RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway, sequence);
 
       if (oneway)
       {

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-03 13:56:39 UTC (rev 3397)
@@ -126,6 +126,8 @@
    
    private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
    
+   private static final long CLOSE_WAIT_TIMEOUT = 10 * 1000;
+      
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
@@ -173,6 +175,11 @@
    
    private boolean waitingToClose = false;
    
+   private Object waitLock = new Object();
+   
+   private long lastSequence = -1;
+   
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -332,14 +339,58 @@
    {
       if (trace) log.trace(this + " closing");
       
+      //Need to wait for messages to arrive or they may be lost if connection/session is closed too
+      //quickly after sending
+      
+      if (sequence != -1)
+      {
+         synchronized (waitLock)
+         {           
+            long wait = CLOSE_WAIT_TIMEOUT;
+            
+            while (lastSequence != sequence - 1 && wait > 0)
+            {
+               long start = System.currentTimeMillis(); 
+               try
+               {
+                  waitLock.wait(wait);
+               }
+               catch (InterruptedException e)
+               {                 
+               }
+               wait -= (System.currentTimeMillis() - start);
+            }
+            
+            if (wait <= 0)
+            {
+               log.warn("Timed out waiting for last message");
+            }
+         }        
+      }  
+      
       return -1;
    }
  
    public void send(JBossMessage message, final boolean checkForDuplicates) throws JMSException
    {
+      throw new IllegalStateException("Should not be handled here");
+   }
+   
+   public void send(JBossMessage message, final boolean checkForDuplicates, long sequence) throws JMSException
+   {
       try
       {                
-         connectionEndpoint.sendMessage(message, null, checkForDuplicates);       
+         connectionEndpoint.sendMessage(message, null, checkForDuplicates); 
+         
+         if (sequence != -1)
+         {
+            synchronized (waitLock)
+            {
+               this.lastSequence = sequence;
+               
+               waitLock.notifyAll();
+            }
+         }
       }
       catch (Throwable t)
       {
@@ -566,8 +617,6 @@
          }
          
          deliveryIdSequence = maxDeliveryId + 1;
-         
-       //  log.info("*** set deliveryIdSequence to " + deliveryIdSequence);
       }
       catch (Throwable t)
       {
@@ -1253,8 +1302,6 @@
    	 
    	 long deliveryId = deliveryIdSequence++;
    	 
-   //	 log.info(System.identityHashCode(this) + " Delivery id is now " + deliveryId);
-   	 
    	 if (trace) { log.trace("Delivery id is now " + deliveryId); }
    	 
    	 //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-12-03 13:56:39 UTC (rev 3397)
@@ -78,9 +78,14 @@
 
    public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
    {
-      ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates);
+      throw new IllegalStateException("Invocation should not be handled here");
    }
    
+   public void send(JBossMessage msg, boolean checkForDuplicates, long sequence) throws JMSException
+   {
+      ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates, sequence);
+   }
+   
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
                                                   boolean connectionConsumer, boolean autoFlowControl) throws JMSException

Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-12-03 13:56:39 UTC (rev 3397)
@@ -48,6 +48,7 @@
    private JBossMessage msg;
    private boolean checkForDuplicates;
    private boolean oneway;
+   private long sequence;
  
    public SessionSendRequest()
    {      
@@ -57,12 +58,14 @@
                              byte version,
                              JBossMessage msg,
                              boolean checkForDuplicates,
-                             boolean oneway)
+                             boolean oneway,
+                             long sequence)
    {
       super(objectId, PacketSupport.REQ_SESSION_SEND, version);
       this.msg = msg;
       this.checkForDuplicates = checkForDuplicates;
       this.oneway = oneway;
+      this.sequence = sequence;
    }
 
    public void read(DataInputStream is) throws Exception
@@ -78,6 +81,8 @@
       checkForDuplicates = is.readBoolean();
       
       oneway = is.readBoolean();
+      
+      sequence = is.readLong();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -87,7 +92,7 @@
       
       if (advised != null)
       {         
-         advised.send(msg, checkForDuplicates);
+         advised.send(msg, checkForDuplicates, sequence);
       }
       else
       {      	
@@ -112,6 +117,8 @@
       
       os.writeBoolean(oneway);
       
+      os.writeLong(sequence);
+      
       os.flush();
    }
    

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-12-03 13:56:39 UTC (rev 3397)
@@ -667,7 +667,7 @@
          JBossMessage msg = new JBossMessage(123);
          
          RequestSupport req =
-            new SessionSendRequest("23", (byte)77, msg, false, true);
+            new SessionSendRequest("23", (byte)77, msg, false, true, -1);
                  
          testPacket(req, PacketSupport.REQ_SESSION_SEND);                           
       }




More information about the jboss-cvs-commits mailing list