[jboss-cvs] JBoss Messaging SVN: r2088 - in trunk: src/main/org/jboss/jms/client/delegate and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 29 08:11:20 EST 2007
Author: timfox
Date: 2007-01-29 08:11:20 -0500 (Mon, 29 Jan 2007)
New Revision: 2088
Modified:
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/tx/LocalTx.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-789 http://jira.jboss.com/jira/browse/JBMESSAGING-788
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -457,7 +457,7 @@
ConnectionState connState = (ConnectionState)state.getParent();
ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
+
try
{
connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -46,6 +46,7 @@
import org.jboss.jms.server.endpoint.Ack;
import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
+import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
/**
@@ -64,6 +65,9 @@
// Constants ------------------------------------------------------------------------------------
private static final long serialVersionUID = -8096852898620279131L;
+
+ private static final Logger log = Logger.getLogger(ClientSessionDelegate.class);
+
// Attributes -----------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -179,7 +179,7 @@
log.debug(this + " creating " + (transacted ? "transacted" : "non transacted") +
" session, " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " +
(isXA ? "XA": "non XA"));
-
+
if (closed)
{
throw new IllegalStateException("Connection is closed");
@@ -209,7 +209,7 @@
ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
log.debug("created " + d);
-
+
return d;
}
catch (Throwable t)
@@ -695,7 +695,7 @@
private void processTransaction(ClientTransaction txState, Transaction tx) throws Throwable
{
if (trace) { log.trace(this + " processing transaction " + tx); }
-
+
synchronized (sessions)
{
for (Iterator i = txState.getSessionStates().iterator(); i.hasNext(); )
@@ -718,6 +718,11 @@
ServerSessionEndpoint session =
serverPeer.getSession(new Integer(sessionState.getSessionId()));
+
+ if (session == null)
+ {
+ throw new IllegalStateException("Cannot find session with id " + sessionState.getSessionId());
+ }
session.acknowledgeTransactionally(sessionState.getAcks(), tx);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -247,7 +247,19 @@
{
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
- callbackHandler.handleCallbackOneway(callback);
+
+ //FIXME - due a design flaw in the socket based transports, they use a pool of TCP
+ //connections, so subsequent invocations can end up using different underlying connections
+ //meaning that later invocations can overtake earlier invocations, if there are more than
+ //one user concurrently invoking on the same transport
+ //We need someway of pinning the client object to the underlying invocation
+ //For now we just serialize all access so that only the first connection in the pool
+ //is ever used - bit this is far from ideal!!!
+
+ synchronized (Object.class)
+ {
+ callbackHandler.handleCallbackOneway(callback);
+ }
}
catch (HandleCallbackException e)
{
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -195,16 +195,23 @@
throw new IllegalStateException("Cannot call this method on the server side");
}
- SessionTxState state = getSessionTxState(sessionID);
-
- if (state != null)
+ if (sessionStatesMap == null)
{
- return state.getAcks();
+ return Collections.EMPTY_LIST;
}
else
- {
- return Collections.EMPTY_LIST;
- }
+ {
+ SessionTxState state = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+
+ if (state != null)
+ {
+ return state.getAcks();
+ }
+ else
+ {
+ return Collections.EMPTY_LIST;
+ }
+ }
}
// Streamable implementation ---------------------------------
Modified: trunk/src/main/org/jboss/jms/tx/LocalTx.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/LocalTx.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/LocalTx.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.tx;
+import org.jboss.util.id.GUID;
+
/**
*
* A LocalTx
@@ -32,8 +34,27 @@
*/
public class LocalTx
{
+ private String id = new GUID().toString();
+
public String toString()
{
- return "LocalTx[" + Integer.toHexString(hashCode()) + "]";
+ return "LocalTx[" + id + "]";
}
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof LocalTx))
+ {
+ return false;
+ }
+
+ LocalTx tother = (LocalTx)other;
+
+ return this.id.equals(tother.id);
+ }
+
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
}
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -189,7 +189,7 @@
{
throw new IllegalStateException("Cannot find transaction " + xid);
}
-
+
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -199,7 +199,10 @@
// If we get this far we can remove the transaction
- this.removeTxInternal(xid);
+ if (this.removeTxInternal(xid) == null)
+ {
+ throw new IllegalStateException("Cannot find xid to remove " + xid);
+ }
}
catch (Throwable t)
{
@@ -333,6 +336,11 @@
ClientTransaction tx = removeTxInternal(xid);
+ if (tx == null)
+ {
+ throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
+ }
+
//It's possible we don't actually have the prepared tx here locally - this
//may happen if we have recovered from failure and the transaction manager
//is calling rollback on the transaction as part of the recovery process.
@@ -486,6 +494,11 @@
ClientTransaction s = removeTxInternal(anonXid);
+ if (s == null)
+ {
+ throw new java.lang.IllegalStateException("Cannot find xid to remove " + anonXid);
+ }
+
transactions.put(xid, s);
return xid;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -263,7 +263,42 @@
"MessageCount");
assertEquals(0, count.intValue());
}
+
+ // Test case for http://jira.jboss.com/jira/browse/JBMESSAGING-788
+ public void testGetDeliveriesForSession() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session session1 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ Session session2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session2.createProducer(queue);
+
+ Message msg = session2.createMessage();
+
+ prod.send(msg);
+
+ session1.close();
+
+ session2.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java 2007-01-29 10:11:25 UTC (rev 2087)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java 2007-01-29 13:11:20 UTC (rev 2088)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -32,7 +33,10 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
+
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.jms.ConnectionTest;
@@ -286,6 +290,7 @@
log.debug("commit");
messagesProduced += (messageCount - lastMessage);
lastMessage = messageCount;
+
sess.commit();
}
else
More information about the jboss-cvs-commits
mailing list