[jboss-cvs] JBoss Messaging SVN: r3174 - in trunk: src/main/org/jboss/jms/client and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 5 11:14:57 EDT 2007
Author: timfox
Date: 2007-10-05 11:14:57 -0400 (Fri, 05 Oct 2007)
New Revision: 3174
Modified:
trunk/src/etc/aop-messaging-client-debug.xml
trunk/src/etc/aop-messaging-client.xml
trunk/src/main/org/jboss/jms/client/Closeable.java
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/FailoverValve.java
trunk/src/main/org/jboss/jms/client/FailoverValve2.java
trunk/src/main/org/jboss/jms/client/JBossConnection.java
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-880
Modified: trunk/src/etc/aop-messaging-client-debug.xml
===================================================================
--- trunk/src/etc/aop-messaging-client-debug.xml 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/etc/aop-messaging-client-debug.xml 2007-10-05 15:14:57 UTC (rev 3174)
@@ -163,7 +163,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
<advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
@@ -215,7 +215,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
<advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getDestination())">
@@ -276,7 +276,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setTimeToLive(..))">
<advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/etc/aop-messaging-client.xml 2007-10-05 15:14:57 UTC (rev 3174)
@@ -145,7 +145,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
<advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
@@ -195,7 +195,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
<advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getDestination())">
@@ -254,7 +254,7 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->setTimeToLive(..))">
<advice name="handleSetTimeToLive" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing())">
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->closing(..))">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ProducerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->close())">
Modified: trunk/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/Closeable.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/Closeable.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -46,5 +46,5 @@
*
* @throws JMSException
*/
- long closing() throws JMSException;
+ long closing(long sequence) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -179,7 +179,7 @@
{
log.debug(this + " aborted failover");
ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
- connDelegate.closing();
+ connDelegate.closing(-1);
connDelegate.close();
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
Modified: trunk/src/main/org/jboss/jms/client/FailoverValve.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -145,7 +145,7 @@
public void close() throws InterruptedException
{
- log.debug(this + " closing ...");
+ log.debug(this + " close ...");
// Before assuming a write lock, we need to release reentrant read locks.
// When simultaneous threads are closing a valve (as simultaneous threads are capturing a
Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -105,7 +105,7 @@
public synchronized void close()
{
- if (trace) { log.trace(this + " closing " + (locked ? "LOCKED" : "UNLOCKED") + " valve"); }
+ if (trace) { log.trace(this + " close " + (locked ? "LOCKED" : "UNLOCKED") + " valve"); }
if (trace && threads.contains(Thread.currentThread()))
{
Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -128,7 +128,7 @@
public void close() throws JMSException
{
- delegate.closing();
+ delegate.closing(-1);
delegate.close();
}
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -334,7 +334,7 @@
try
{
- sess.closing();
+ sess.closing(-1);
sess.close();
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -93,7 +93,7 @@
public void close() throws JMSException
{
- delegate.closing();
+ delegate.closing(-1);
delegate.close();
}
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -126,7 +126,7 @@
public void close() throws JMSException
{
- delegate.closing();
+ delegate.closing(-1);
delegate.close();
}
Modified: trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -62,7 +62,7 @@
public void close() throws JMSException
{
- delegate.closing();
+ delegate.closing(-1);
delegate.close();
}
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -171,7 +171,7 @@
public void close() throws JMSException
{
- delegate.closing();
+ delegate.closing(-1);
delegate.close();
}
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -344,7 +344,7 @@
public void close(long lastDeliveryId) throws JMSException
{
- log.trace(this + " closing");
+ log.trace(this + " close");
//Wait for the last delivery to arrive
waitForLastDelivery(lastDeliveryId);
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -302,7 +302,7 @@
Closeable del = (Closeable)child.getDelegate();
try
{
- del.closing();
+ del.closing(-1);
del.close();
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -195,7 +195,7 @@
cancelDeliveries(del, dels);
}
-
+
return invocation.invokeNext();
}
@@ -626,7 +626,7 @@
return null;
}
- if (trace) { log.trace("sending message NON-transactionally"); }
+ if (trace) { log.trace("sending message NON-transactionally"); }
return invocation.invokeNext();
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -105,9 +105,9 @@
doInvoke(client, req);
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- RequestSupport req = new ClosingRequest(id, version);
+ RequestSupport req = new ClosingRequest(sequence, id, version);
return ((Long)doInvoke(client, req)).longValue();
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -85,7 +85,6 @@
public synchronized void establishCallback()
{
-
log.debug(" Establishing CFCallback\n");
for (int server = delegates.length - 1; server >= 0; server--)
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -153,9 +153,9 @@
doInvoke(client, req);
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- RequestSupport req = new ClosingRequest(id, version);
+ RequestSupport req = new ClosingRequest(sequence, id, version);
return ((Long)doInvoke(client, req)).longValue();
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -122,9 +122,9 @@
doInvoke(client, req);
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- RequestSupport req = new ClosingRequest(id, version);
+ RequestSupport req = new ClosingRequest(sequence, id, version);
return ((Long)doInvoke(client, req)).longValue();
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -80,7 +80,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -32,6 +32,7 @@
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.Cancel;
@@ -142,9 +143,9 @@
doInvoke(client, req);
}
- public long closing() throws JMSException
- {
- RequestSupport req = new ClosingRequest(id, version);
+ public long closing(long sequence) throws JMSException
+ {
+ RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
return ((Long)doInvoke(client, req)).longValue();
}
@@ -440,11 +441,19 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- private long sequence;
-
public void send(JBossMessage m, boolean checkForDuplicates) throws JMSException
{
- long seq = m.isReliable() ? -1 : sequence++;
+ long seq;
+ if (m.isReliable())
+ {
+ seq = -1;
+ }
+ else
+ {
+ SessionState sstate = (SessionState)state;
+ seq = sstate.getNPSendSequence();
+ sstate.incNpSendSequence();
+ }
RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, seq);
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -305,7 +305,7 @@
public void stop()
{
- log.trace(this + " closing");
+ log.trace(this + " stop");
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -108,7 +108,9 @@
//In that case we want it to act as transacted, so when the session is subsequently enlisted the work can be converted into the
//XA transaction
private boolean treatAsNonTransactedWhenNotEnlisted = true;
-
+
+ private long npSendSequence;
+
// Constructors ---------------------------------------------------------------------------------
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -206,6 +208,8 @@
String oldSessionID = sessionID;
sessionID = newState.sessionID;
+ npSendSequence = 0;
+
// We need to clear anything waiting in the session executor - since there may be messages
// from before failover waiting in there and we don't want them to get delivered after
// failover.
@@ -455,6 +459,16 @@
return sessionID;
}
+ public long getNPSendSequence()
+ {
+ return npSendSequence;
+ }
+
+ public void incNpSendSequence()
+ {
+ npSendSequence++;
+ }
+
public String toString()
{
return "SessionState[" + sessionID + "]";
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -359,7 +359,7 @@
try
{
log.debug("clPearing up state for connection " + sce);
- sce.closing();
+ sce.closing(-1);
sce.close();
log.debug("cleared up state for connection " + sce);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -207,7 +207,7 @@
}
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
// Do nothing
return -1;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -425,7 +425,7 @@
}
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
log.trace(this + " closing (noop)");
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -326,7 +326,7 @@
// Closeable implementation ---------------------------------------------------------------------
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
try
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -127,6 +127,8 @@
static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
+
+ private static final long CLOSE_WAIT_TIMEOUT = 5 * 1000;
// Static ---------------------------------------------------------------------------------------
@@ -174,6 +176,8 @@
private boolean waitingToClose = false;
+ private Object waitLock = new Object();
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -328,15 +332,42 @@
}
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- // currently does nothing
- if (trace) log.trace(this + " closing (noop)");
+ if (trace) log.trace(this + " closing");
+ // Wait for last np message to arrive
+
+ if (sequence != 0)
+ {
+ synchronized (waitLock)
+ {
+ long wait = CLOSE_WAIT_TIMEOUT;
+
+ while (sequence != expectedSequence && wait > 0)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ waitLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ wait -= (System.currentTimeMillis() - start);
+ }
+
+ if (wait <= 0)
+ {
+ log.warn("Timed out waiting for last message");
+ }
+ }
+ }
+
return -1;
}
- private long expectedSequence = 0;
+ private volatile long expectedSequence = 0;
private Map<Long, JBossMessage> heldBack = new HashMap<Long, JBossMessage>();
@@ -345,7 +376,7 @@
throw new IllegalStateException("Should not be handled on the server");
}
- public synchronized void send(JBossMessage message, boolean checkForDuplicates, long thisSequence) throws JMSException
+ public void send(JBossMessage message, boolean checkForDuplicates, long thisSequence) throws JMSException
{
try
{
@@ -356,26 +387,30 @@
//This is a workaround to allow us to use one way messages for np messages for performance
//reasons
-
- if (thisSequence == expectedSequence)
- {
- do
- {
- connectionEndpoint.sendMessage(message, null, false);
-
- expectedSequence++;
-
- message = (JBossMessage)heldBack.remove(expectedSequence);
-
- } while (message != null);
-
+
+ synchronized (waitLock)
+ {
+ if (thisSequence == expectedSequence)
+ {
+ do
+ {
+ connectionEndpoint.sendMessage(message, null, false);
+
+ expectedSequence++;
+
+ message = (JBossMessage)heldBack.remove(expectedSequence);
+
+ } while (message != null);
+ }
+ else
+ {
+ //Not the expected one - add it to the map
+
+ heldBack.put(thisSequence, message);
+ }
+
+ waitLock.notify();
}
- else
- {
- //Not the expected one - add it to the map
-
- heldBack.put(thisSequence, message);
- }
}
else
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -60,9 +60,9 @@
endpoint.close();
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- return endpoint.closing();
+ return endpoint.closing(sequence);
}
public void reset() throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -63,9 +63,9 @@
endpoint.close();
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- return endpoint.closing();
+ return endpoint.closing(sequence);
}
public SessionDelegate createSessionDelegate(boolean transacted,
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -59,9 +59,9 @@
endpoint.close();
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- return endpoint.closing();
+ return endpoint.closing(sequence);
}
public void changeRate(float newRate) throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -72,9 +72,9 @@
endpoint.close();
}
- public long closing() throws JMSException
+ public long closing(long sequence) throws JMSException
{
- return endpoint.closing();
+ return endpoint.closing(sequence);
}
public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
Modified: trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -36,18 +36,24 @@
*/
public class ClosingRequest extends RequestSupport
{
+ private long sequence;
+
public ClosingRequest()
{
}
- public ClosingRequest(String objectId, byte version)
+ public ClosingRequest(long sequence, String objectId, byte version)
{
super(objectId, PacketSupport.REQ_CLOSING, version);
+
+ this.sequence = sequence;
}
public void read(DataInputStream is) throws Exception
{
super.read(is);
+
+ this.sequence = is.readLong();
}
public ResponseSupport serverInvoke() throws Exception
@@ -59,7 +65,7 @@
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
- long id = endpoint.closing();
+ long id = endpoint.closing(sequence);
return new ClosingResponse(id);
}
@@ -67,6 +73,9 @@
public void write(DataOutputStream os) throws Exception
{
super.write(os);
+
+ os.writeLong(sequence);
+
os.flush();
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -48,11 +48,7 @@
/**
*
* This class handles connections to other nodes that are used to pull messages from remote queues to local queues
- *
- *
- * TODO - clean closing of suckers
- *
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: $</tt>20 Jun 2007
*
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -725,7 +725,7 @@
public void testClosingRequest() throws Exception
{
- RequestSupport req = new ClosingRequest("23", (byte)77);
+ RequestSupport req = new ClosingRequest(12, "23", (byte)77);
testPacket(req, PacketSupport.REQ_CLOSING);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -189,10 +189,6 @@
prod.send(tm);
}
-
- //NP messages are sent async so if you close the connection too quickly you may lose the last
- //one
- Thread.sleep(1000);
}
finally
{
@@ -236,7 +232,7 @@
break;
}
- log.info("Got message " + tm.getText());
+ //log.info("Got message " + tm.getText());
msgs.add(tm.getText());
@@ -299,7 +295,7 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
+ //log.info("Got message " + tm.getText());
assertEquals("message" + (i + start), tm.getText());
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -271,7 +271,7 @@
closed = true;
}
- public long closing() throws JMSException
+ public long closing(long seq) throws JMSException
{
return -1;
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2007-10-05 12:48:16 UTC (rev 3173)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2007-10-05 15:14:57 UTC (rev 3174)
@@ -576,6 +576,9 @@
prod.send(tm5);
prod.send(tm6);
+ //Give them time to arrive
+ Thread.sleep(3000);
+
ObjectName destObjectName =
new ObjectName("jboss.messaging.destination:service=Queue,name=QueueListMessages");
More information about the jboss-cvs-commits
mailing list