[jboss-cvs] JBoss Messaging SVN: r2088 - in trunk: src/main/org/jboss/jms/client/delegate and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 29 08:11:20 EST 2007


Author: timfox
Date: 2007-01-29 08:11:20 -0500 (Mon, 29 Jan 2007)
New Revision: 2088

Modified:
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/jms/tx/LocalTx.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-789 http://jira.jboss.com/jira/browse/JBMESSAGING-788


Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -457,7 +457,7 @@
 
       ConnectionState connState = (ConnectionState)state.getParent();
       ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
+  
       try
       {
          connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -46,6 +46,7 @@
 import org.jboss.jms.server.endpoint.Ack;
 import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
+import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 
 /**
@@ -64,6 +65,9 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final long serialVersionUID = -8096852898620279131L;
+   
+   private static final Logger log = Logger.getLogger(ClientSessionDelegate.class);
+   
 
    // Attributes -----------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -179,7 +179,7 @@
          log.debug(this + " creating " + (transacted ? "transacted" : "non transacted") +
             " session, " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " +
             (isXA ? "XA": "non XA"));
-
+         
          if (closed)
          {
             throw new IllegalStateException("Connection is closed");
@@ -209,7 +209,7 @@
          ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
 
          log.debug("created " + d);
-
+         
          return d;
       }
       catch (Throwable t)
@@ -695,7 +695,7 @@
    private void processTransaction(ClientTransaction txState, Transaction tx) throws Throwable
    {
       if (trace) { log.trace(this + " processing transaction " + tx); }
-      
+         
       synchronized (sessions)
       {         
          for (Iterator i = txState.getSessionStates().iterator(); i.hasNext(); )
@@ -718,6 +718,11 @@
             
             ServerSessionEndpoint session =
                serverPeer.getSession(new Integer(sessionState.getSessionId()));
+            
+            if (session == null)
+            {               
+               throw new IllegalStateException("Cannot find session with id " + sessionState.getSessionId());
+            }
 
             session.acknowledgeTransactionally(sessionState.getAcks(), tx);
          }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -247,7 +247,19 @@
          {
             // one way invocation, no acknowledgment sent back by the client
             if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
-            callbackHandler.handleCallbackOneway(callback);
+            
+            //FIXME - due a design flaw in the socket based transports, they use a pool of TCP
+            //connections, so subsequent invocations can end up using different underlying connections
+            //meaning that later invocations can overtake earlier invocations, if there are more than
+            //one user concurrently invoking on the same transport            
+            //We need someway of pinning the client object to the underlying invocation            
+            //For now we just serialize all access so that only the first connection in the pool
+            //is ever used - bit this is far from ideal!!!
+            
+            synchronized (Object.class)
+            {            
+               callbackHandler.handleCallbackOneway(callback);
+            }
          }
          catch (HandleCallbackException e)
          {

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -195,16 +195,23 @@
          throw new IllegalStateException("Cannot call this method on the server side");
       }
 
-      SessionTxState state = getSessionTxState(sessionID);
-
-      if (state != null)
+      if (sessionStatesMap == null)
       {
-         return state.getAcks();
+         return Collections.EMPTY_LIST;
       }
       else
-      {
-         return Collections.EMPTY_LIST;
-      }
+      {         
+         SessionTxState state = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+   
+         if (state != null)
+         {
+            return state.getAcks();
+         }
+         else
+         {
+            return Collections.EMPTY_LIST;
+         }
+      }            
    }
 
    // Streamable implementation ---------------------------------

Modified: trunk/src/main/org/jboss/jms/tx/LocalTx.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/LocalTx.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/LocalTx.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -21,6 +21,8 @@
   */
 package org.jboss.jms.tx;
 
+import org.jboss.util.id.GUID;
+
 /**
  * 
  * A LocalTx
@@ -32,8 +34,27 @@
  */
 public class LocalTx
 {
+   private String id = new GUID().toString();
+   
    public String toString()
    {
-      return "LocalTx[" + Integer.toHexString(hashCode()) + "]";
+      return "LocalTx[" + id + "]";
    }
+   
+   public boolean equals(Object other)
+   {
+      if (!(other instanceof LocalTx))
+      {
+         return false;
+      }
+      
+      LocalTx tother = (LocalTx)other;
+      
+      return this.id.equals(tother.id);
+   }
+   
+   public int hashCode()
+   {
+      return id.hashCode();
+   }
 }  

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -189,7 +189,7 @@
       {
          throw new IllegalStateException("Cannot find transaction " + xid);
       }
-      
+                  
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
       
@@ -199,7 +199,10 @@
          
          // If we get this far we can remove the transaction
          
-         this.removeTxInternal(xid);
+         if (this.removeTxInternal(xid) == null)
+         {
+            throw new IllegalStateException("Cannot find xid to remove " + xid);
+         }
       }
       catch (Throwable t)
       {
@@ -333,6 +336,11 @@
       
       ClientTransaction tx = removeTxInternal(xid);
       
+      if (tx == null)
+      {
+         throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
+      }
+      
       //It's possible we don't actually have the prepared tx here locally - this
       //may happen if we have recovered from failure and the transaction manager
       //is calling rollback on the transaction as part of the recovery process.
@@ -486,6 +494,11 @@
 
       ClientTransaction s = removeTxInternal(anonXid);
       
+      if (s == null)
+      {
+         throw new java.lang.IllegalStateException("Cannot find xid to remove " + anonXid);
+      }
+      
       transactions.put(xid, s);
       
       return xid;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -263,7 +263,42 @@
                       "MessageCount");
       assertEquals(0, count.intValue());
    }
+   
+   // Test case for http://jira.jboss.com/jira/browse/JBMESSAGING-788
+   public void testGetDeliveriesForSession() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
 
+      Connection conn = null;
+      
+      try
+      {
+         conn = cf.createConnection();
+         
+         Session session1 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         Session session2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageProducer prod = session2.createProducer(queue);
+         
+         Message msg = session2.createMessage();
+         
+         prod.send(msg);
+         
+         session1.close();
+         
+         session2.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java	2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java	2007-01-29 13:11:20 UTC (rev 2088)
@@ -24,6 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
@@ -32,7 +33,10 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.naming.InitialContext;
+
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.logging.Logger;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.jms.ConnectionTest;
@@ -286,6 +290,7 @@
                      log.debug("commit");
                      messagesProduced += (messageCount - lastMessage);
                      lastMessage = messageCount;
+                     
                      sess.commit();
                   }
                   else




More information about the jboss-cvs-commits mailing list