[Jboss-cvs] JBoss Messaging SVN: r1323 - in branches/Branch_1_0: src/etc src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 19 20:49:11 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-19 20:48:57 -0400 (Tue, 19 Sep 2006)
New Revision: 1323
Modified:
branches/Branch_1_0/src/etc/aop-messaging-client.xml
branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
fix and extra tests for http://jira.jboss.org/jira/browse/JBMESSAGING-542
Modified: branches/Branch_1_0/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_0/src/etc/aop-messaging-client.xml 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/etc/aop-messaging-client.xml 2006-09-20 00:48:57 UTC (rev 1323)
@@ -122,7 +122,10 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->redeliver(..))">
<advice name="handleRedeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
- </bind>
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->closing())">
+ <advice name="handleClosing" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
<advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/Closeable.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -28,13 +28,13 @@
* Implemented by JMS classes that can be closed
*
* @author <a href="mailto:adrian at jboss.org>Adrian Brock</a>
+ * @author <a href="mailto:ovidiu at jboss.org>Ovidiu Feodorov</a>
*
* @version $Revision$
*/
public interface Closeable
{
/**
- *
* Close the instance
*
* @throws JMSException
@@ -47,5 +47,7 @@
* @throws JMSException
*/
void closing() throws JMSException;
+
+ boolean isClosed();
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -49,7 +49,7 @@
*
* $Id$
*/
-public class ClosedInterceptor implements Interceptor
+public class ClosedInterceptor implements Interceptor
{
// Constants -----------------------------------------------------
@@ -76,6 +76,15 @@
private String delegateType;
// Static --------------------------------------------------------
+
+ public static String stateToString(int state)
+ {
+ return state == NOT_CLOSED ? "NOT_CLOSED" :
+ state == IN_CLOSING ? "IN_CLOSING" :
+ state == CLOSING ? "CLOSING" :
+ state == IN_CLOSE ? "IN_CLOSE" :
+ state == CLOSED ? "CLOSED" : "UNKNOWN";
+ }
// Constructors --------------------------------------------------
@@ -102,6 +111,11 @@
return sb.toString();
}
+ public boolean isClosed()
+ {
+ return state == IN_CLOSE || state == CLOSED;
+ }
+
// Interceptor implementation -----------------------------------
public String getName()
@@ -119,6 +133,12 @@
}
String methodName = ((MethodInvocation) invocation).getMethod().getName();
+
+ if ("isClosed".equals(methodName))
+ {
+ return new Boolean(isClosed());
+ }
+
boolean isClosing = methodName.equals("closing");
boolean isClose = methodName.equals("close");
@@ -138,7 +158,18 @@
}
else
{
- inuse();
+ synchronized(this)
+ {
+ // object "in use", increment inUseCount
+ if (state == IN_CLOSE || state == CLOSED)
+ {
+ log.error(this + ": method " + methodName + "() did not go through, " +
+ "the interceptor is " + stateToString(state));
+
+ throw new IllegalStateException("The object is closed");
+ }
+ ++inUseCount;
+ }
}
if (isClosing)
@@ -222,19 +253,6 @@
}
/**
- * Mark the object as inuse
- */
- protected synchronized void inuse() throws Throwable
- {
- if (state == IN_CLOSE || state == CLOSED)
- {
- log.error(this + " is closed");
- throw new IllegalStateException("The object is closed");
- }
- ++inUseCount;
- }
-
- /**
* Mark the object as no longer inuse
*/
protected synchronized void done() throws Throwable
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -23,9 +23,12 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.Iterator;
+import java.util.ArrayList;
import javax.jms.IllegalStateException;
import javax.jms.Session;
+import javax.jms.JMSException;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
@@ -64,18 +67,27 @@
// Public --------------------------------------------------------
+ public Object handleClosing(Invocation invocation) throws Throwable
+ {
+ // Send to the server all acknowledgments accumulated in toAck. This is useful, for example,
+ // when a message listener close the session from its onMessage().
+ acknowledgeOnClosing(invocation);
+
+ return invocation.invokeNext();
+ }
+
+
public Object handleClose(Invocation invocation) throws Throwable
{
Object res = invocation.invokeNext();
-
+
+ // We must explicitly shutdown the executor
+
SessionState state = getState(invocation);
-
- //We must explicitly shutdown the executor
+ state.getExecutor().shutdownNow();
- state.getExecutor().shutdownNow();
-
return res;
- }
+ }
public Object handlePreDeliver(Invocation invocation) throws Throwable
{
@@ -99,7 +111,7 @@
Object[] args = mi.getArguments();
MessageProxy mp = (MessageProxy)args[0];
int consumerID = ((Integer)args[1]).intValue();
- AckInfo info = new AckInfo(mp, consumerID);
+ AckInfo info = new AckInfo(mp, consumerID, ackMode);
state.getToAck().add(info);
@@ -301,6 +313,35 @@
{
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
+
+ /**
+ * The method sends to server all eligible acknowlegments (those that are NOT CLIIENT_ACKNOWLEDGE
+ * for example)
+ */
+ private void acknowledgeOnClosing(Invocation invocation) throws JMSException
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ SessionState state = getState(invocation);
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ // select eligible acknowledgments
+ List acks = new ArrayList();
+ for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
+ {
+ AckInfo ack = (AckInfo)i.next();
+ if (ack.getAckMode() == Session.AUTO_ACKNOWLEDGE ||
+ ack.getAckMode() == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ acks.add(ack);
+ i.remove();
+ }
+ }
+
+ if (!acks.isEmpty())
+ {
+ del.acknowledgeBatch(acks);
+ }
+ }
// Inner Classes -------------------------------------------------
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -24,6 +24,7 @@
import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.TransactionInProgressException;
+import javax.jms.Session;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
@@ -197,7 +198,7 @@
MethodInvocation mi = (MethodInvocation)invocation;
MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
int consumerID = ((Integer)mi.getArguments()[1]).intValue();
- AckInfo info = new AckInfo(proxy, consumerID);
+ AckInfo info = new AckInfo(proxy, consumerID, Session.SESSION_TRANSACTED);
ConnectionState connState = (ConnectionState)state.getParent();
if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -59,7 +59,6 @@
{
}
-
// BrowserDelegate implementation --------------------------------
/**
@@ -76,6 +75,11 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ public boolean isClosed()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
public boolean hasNextMessage() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -93,6 +93,15 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
+ public boolean isClosed()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public JBossConnectionConsumer createConnectionConsumer(Destination dest,
String subscriptionName,
String messageSelector,
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -99,6 +99,15 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
+ public boolean isClosed()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public MessageListener getMessageListener()
{
throw new IllegalStateException("This invocation should not be handled here!");
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -55,7 +55,6 @@
{
super(-1);
}
-
// ProducerDelegate implementation -------------------------------
@@ -81,6 +80,15 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
+ public boolean isClosed()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public int getDeliveryMode() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -83,25 +83,25 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void acknowledge(AckInfo ackInfo) throws JMSException
+ public void close() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void acknowledgeBatch(List ackInfos) throws JMSException
+ public void closing() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void acknowledgeAll() throws JMSException
+ public boolean isClosed()
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -110,25 +110,34 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void addTemporaryDestination(JBossDestination destination) throws JMSException
+ public void acknowledge(AckInfo ackInfo) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void redeliver() throws JMSException
+ public void acknowledgeBatch(List ackInfos) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
+ public void acknowledgeAll() throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void close() throws JMSException
+ public void addTemporaryDestination(JBossDestination destination) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -137,7 +146,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void closing() throws JMSException
+ public void redeliver() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -132,8 +132,12 @@
}
}
}
-
- postDeliver(sess, isConnectionConsumer);
+
+ if (!sess.isClosed())
+ {
+ // postDeliver only if the session is not closed
+ postDeliver(sess, isConnectionConsumer);
+ }
}
protected static void preDeliver(SessionDelegate sess,
@@ -344,12 +348,13 @@
{
// Wait for any onMessage() executions to complete
-// if (Thread.currentThread().equals(sessionExecutor.getThread()))
-// {
-// // the current thread already closing this MessageCallbackHandler, so no need to register
-// // another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
-// return;
-// }
+ if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ {
+ // the current thread already closing this MessageCallbackHandler (this happens when the
+ // session is closed from within the MessageListener.onMessage(), for example), so no need
+ // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+ return;
+ }
Future result = new Future();
@@ -665,7 +670,7 @@
if (trace) { log.trace("Closer starts running"); }
result.setResult(null);
-
+
if (trace) { log.trace("Closer finished run"); }
}
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -27,8 +27,6 @@
import java.util.List;
import java.util.Map;
-import javax.jms.Session;
-
import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.server.Version;
@@ -61,7 +59,8 @@
private QueuedExecutor executor;
private boolean recoverCalled;
-
+
+ // List<AckInfo>
private List toAck;
private Map callbackHandlers;
@@ -98,7 +97,10 @@
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
}
-
+
+ /**
+ * @return List<AckInfo>
+ */
public List getToAck()
{
return toAck;
Modified: branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -79,9 +79,9 @@
void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons);
- public boolean getTransacted();
+ boolean getTransacted();
- public int getAcknowledgeMode();
+ int getAcknowledgeMode();
void commit() throws JMSException;
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/destination/Queue.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -59,7 +59,8 @@
}
JBossQueue jbq = new JBossQueue(name);
- org.jboss.messaging.core.local.Queue q = (org.jboss.messaging.core.local.Queue)cm.getCoreDestination(jbq);
+ org.jboss.messaging.core.local.Queue q =
+ (org.jboss.messaging.core.local.Queue)cm.getCoreDestination(jbq);
return q.getMessageCount();
}
catch (Throwable t)
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -152,9 +152,14 @@
public void closing() throws JMSException
{
- //Do nothing
+ // Do nothing
}
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
// Public --------------------------------------------------------
public String toString()
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -330,7 +330,12 @@
{
log.trace("closing (noop)");
}
-
+
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
public void sendTransaction(TransactionRequest request) throws JMSException
{
try
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -388,6 +388,11 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " close");
}
}
+
+ public boolean isClosed()
+ {
+ return closed;
+ }
// ConsumerEndpoint implementation -------------------------------
@@ -739,47 +744,55 @@
}
}
- if (list != null)
- {
- ServerConnectionEndpoint connection =
- ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
+ if (list == null)
+ {
+ if (trace) { log.trace(this + " has a null list, returning"); }
+ return;
+ }
- try
- {
- if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
-
- ClientDelivery del = new ClientDelivery(list, id);
-
- // TODO How can we ensure that messages for the same consumer aren't delivered
- // concurrently to the same consumer on different threads?
- MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
-
- MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
+ ServerConnectionEndpoint connection =
+ ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
- if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-
- HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+ try
+ {
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
- // For now we don't look at how many messages are accepted since they all will be.
- // The field is a placeholder for the future.
- if (result.clientIsFull())
- {
- // Stop the server sending any more messages to the client.
- // This is ok outside lock.
- clientConsumerFull = true;
- }
- }
- catch(Throwable t)
+ ClientDelivery del = new ClientDelivery(list, id);
+
+ // TODO How can we ensure that messages for the same consumer aren't delivered
+ // concurrently to the same consumer on different threads?
+ MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+
+ MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
+
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
+
+ HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+
+ // For now we don't look at how many messages are accepted since they all will be.
+ // The field is a placeholder for the future.
+ if (result.clientIsFull())
{
- log.warn("Failed to deliver the message to the client. See the server log for more details.");
- log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
-
- ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
-
- mgr.handleClientFailure(connection.getRemotingClientSessionId());
+ // Stop the server sending any more messages to the client.
+ // This is ok outside lock.
+ clientConsumerFull = true;
}
- }
+ }
+ catch(Throwable t)
+ {
+ log.warn("Failed to deliver the message to the client. See the server log for more details.");
+ log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
+
+ ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
+
+ mgr.handleClientFailure(connection.getRemotingClientSessionId());
+ }
}
+
+ public String toString()
+ {
+ return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
+ }
}
/*
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -409,6 +409,11 @@
// currently does nothing
if (trace) log.trace("closing (noop)");
}
+
+ public boolean isClosed()
+ {
+ return closed;
+ }
public void send(JBossMessage message) throws JMSException
{
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -111,7 +111,6 @@
* This used at consumer close to cancel any undelivered messages left in the client buffer
* or at session recovery to cancel any messages that couldn't be redelivered locally
* @param ackInfos
- * @throws Exception
*/
void cancelDeliveries(List ackInfos) throws JMSException;
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -65,6 +65,11 @@
endpoint.closing();
}
+ public boolean isClosed()
+ {
+ return endpoint.isClosed();
+ }
+
public boolean hasNextMessage() throws JMSException
{
return endpoint.hasNextMessage();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -67,6 +67,11 @@
endpoint.closing();
}
+ public boolean isClosed()
+ {
+ return endpoint.isClosed();
+ }
+
public SessionDelegate createSessionDelegate(boolean transacted,
int acknowledgmentMode,
boolean isXA) throws JMSException
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -63,7 +63,12 @@
{
endpoint.closing();
}
-
+
+ public boolean isClosed()
+ {
+ return endpoint.isClosed();
+ }
+
public void more() throws JMSException
{
endpoint.more();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -72,7 +72,12 @@
{
endpoint.closing();
}
-
+
+ public boolean isClosed()
+ {
+ return endpoint.isClosed();
+ }
+
public void send(JBossMessage msg) throws JMSException
{
endpoint.send(msg);
Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -33,6 +33,7 @@
* for processing.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
*
* $Id$
*/
@@ -45,8 +46,10 @@
// Attributes ----------------------------------------------------
protected long messageID;
-
protected int consumerID;
+
+ // One of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+ private int ackMode;
// The actual proxy must not get serialized
protected transient MessageProxy msg;
@@ -58,18 +61,31 @@
public AckInfo()
{
}
-
+
public AckInfo(MessageProxy proxy, int consumerID)
{
+ this(proxy, consumerID, -1);
+ }
+
+ /**
+ * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+ */
+ public AckInfo(MessageProxy proxy, int consumerID, int ackMode)
+ {
this.msg = proxy;
this.messageID = proxy.getMessage().getMessageID();
- this.consumerID = consumerID;
+ this.consumerID = consumerID;
+ this.ackMode = ackMode;
}
- public AckInfo(long messageID, int consumerID)
+ /**
+ * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
+ */
+ public AckInfo(long messageID, int consumerID, int ackMode)
{
this.messageID = messageID;
this.consumerID = consumerID;
+ this.ackMode = ackMode;
}
// Public --------------------------------------------------------
@@ -89,6 +105,16 @@
return msg;
}
+ /**
+ *
+ * @return one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE ..., or -1 if it has not
+ * previously set
+ */
+ public int getAckMode()
+ {
+ return ackMode;
+ }
+
public String toString()
{
return "AckInfo[" + messageID + ", " + consumerID + "]";
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -570,6 +570,9 @@
return undelivered;
}
+ /**
+ * Returns the count of messages stored AND being delivered.
+ */
public int messageCount()
{
synchronized (refLock)
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -30,7 +30,9 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Message;
+import javax.jms.MessageListener;
import javax.naming.InitialContext;
+import javax.management.ObjectName;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -225,7 +227,7 @@
}
- public void test_NonPersistent_NonTransactional_Asynchronous_to_Client() throws Exception
+ public void test_Asynchronous_to_Client() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
@@ -278,7 +280,91 @@
conn.close();
}
+ public void test_MessageListener() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue);
+
+ final Slot slot = new Slot();
+
+ cons.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ try
+ {
+ slot.put(m);
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("got InterruptedException", e);
+ }
+ }
+ });
+
+ conn.start();
+
+ MessageProducer prod = session.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ TextMessage m = session.createTextMessage("one");
+ prod.send(m);
+
+ TextMessage rm = (TextMessage)slot.poll(5000);
+
+ assertEquals("one", rm.getText());
+
+ conn.close();
+ }
+
+ public void test_ClientAcknowledge() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer p = session.createProducer(queue);
+ p.send(session.createTextMessage("CLACK"));
+
+ MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ TextMessage m = (TextMessage)cons.receive(1000);
+
+ assertEquals("CLACK", m.getText());
+
+ // make sure there's no other message in queue
+ Message m2 = cons.receive(1000);
+ assertNull(m2);
+
+ // make sure the message is still in "delivering" state
+ ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=JMSTestQueue");
+ Integer mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+ assertEquals(1, mc.intValue());
+
+ m.acknowledge();
+
+ // make sure there's nothing in queue anymore
+ mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+ assertEquals(0, mc.intValue());
+
+ conn.close();
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -252,15 +252,15 @@
assertEquals("Your mum", tm2.getText());
- //Don't ack
+ // Don't ack
- //Create another consumer
+ // Create another consumer
Session sessConsume2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons2 = sessConsume2.createConsumer(queue);
- //this should cancel message and cause delivery to other consumer
+ // this should cancel message and cause delivery to other consumer
sessConsume1.close();
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -33,6 +33,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.naming.InitialContext;
+import javax.management.ObjectName;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -96,9 +97,64 @@
/**
* Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
*/
- public void testClosingConnectionFromMessageListener() throws Exception
+ public void testClosingConsumerFromMessageListener() throws Exception
{
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
+ // load the queue
+
+ Connection c = cf.createConnection();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = s.createProducer(queue);
+ Message m = s.createMessage();
+ prod.send(m);
+ c.close();
+
+ final Result result = new Result();
+ Connection conn = cf.createConnection();
+ s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer cons = s.createConsumer(queue);
+ cons.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ // close the connection on the same thread that processed the message
+ try
+ {
+ log.debug("attempting close");
+ cons.close();
+ log.debug("consumer closed");
+ result.setSuccess();
+ }
+ catch(Exception e)
+ {
+ result.setFailure(e);
+ }
+ }
+ });
+
+ conn.start();
+
+ // wait for the message to propagate
+ Thread.sleep(3000);
+
+ assertTrue(result.isSuccess());
+ assertNull(result.getFailure());
+
+ // make sure the acknowledgment made it back to the queue
+
+ Integer count = (Integer)ServerManagement.
+ getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+ "MessageCount");
+ assertEquals(0, count.intValue());
+ }
+
+ /**
+ * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
+ */
+ public void testClosingSessionFromMessageListener() throws Exception
+ {
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
@@ -112,6 +168,63 @@
c.close();
final Result result = new Result();
+ Connection conn = cf.createConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = session.createConsumer(queue);
+ cons.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ // close the connection on the same thread that processed the message
+ try
+ {
+ log.debug("attempting close");
+ session.close();
+ log.debug("session closed");
+ result.setSuccess();
+ }
+ catch(Exception e)
+ {
+ result.setFailure(e);
+ }
+ }
+ });
+
+ conn.start();
+
+ // wait for the message to propagate
+ Thread.sleep(3000);
+
+ assertTrue(result.isSuccess());
+ assertNull(result.getFailure());
+
+ // make sure the acknowledgment made it back to the queue
+
+ Integer count = (Integer)ServerManagement.
+ getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+ "MessageCount");
+ assertEquals(0, count.intValue());
+
+ }
+
+ /**
+ * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-542
+ */
+ public void testClosingConnectionFromMessageListener() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/MiscellaneousQueue");
+
+ // load the queue
+
+ Connection c = cf.createConnection();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = s.createProducer(queue);
+ Message m = s.createMessage();
+ prod.send(m);
+ c.close();
+
+ final Result result = new Result();
final Connection conn = cf.createConnection();
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = s.createConsumer(queue);
@@ -142,6 +255,12 @@
assertTrue(result.isSuccess());
assertNull(result.getFailure());
+ // make sure the acknowledgment made it back to the queue
+
+ Integer count = (Integer)ServerManagement.
+ getAttribute(new ObjectName("jboss.messaging.destination:service=Queue,name=MiscellaneousQueue"),
+ "MessageCount");
+ assertEquals(0, count.intValue());
}
// Package protected ---------------------------------------------
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/SessionTest.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -38,6 +38,7 @@
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.naming.InitialContext;
+import javax.management.ObjectName;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.JBossSession;
@@ -447,7 +448,81 @@
}
+ public void testCloseNoClientAcknowledgment() throws Exception
+ {
+ // send a message to the queue
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ s.createProducer(queue).send(s.createTextMessage("wont_ack"));
+ conn.close();
+
+ conn = cf.createConnection();
+ s = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ conn.start();
+
+ TextMessage m = (TextMessage)s.createConsumer(queue).receive(1000);
+
+ assertEquals("wont_ack", m.getText());
+
+ // Do NOT ACK
+
+ s.close(); // this shouldn cancel the delivery
+
+ // get the message again
+ s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ m = (TextMessage)s.createConsumer(queue).receive(1000);
+
+ assertEquals("wont_ack", m.getText());
+
+ conn.close();
+ }
+
+ public void testCloseInTransaction() throws Exception
+ {
+ // send a message to the queue
+
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ s.createProducer(queue).send(s.createTextMessage("bex"));
+ conn.close();
+
+ conn = cf.createConnection();
+ Session session = conn.createSession(true, -1);
+ conn.start();
+
+ TextMessage m = (TextMessage)session.createConsumer(queue).receive(1000);
+
+ assertEquals("bex", m.getText());
+
+ // make sure the acknowledment hasn't been sent to the channel
+ ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=TestQueue");
+ Integer mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+
+ assertEquals(1, mc.intValue());
+
+ // close the session
+ session.close();
+
+ // JMS 1.1 4.4.1: "Closing a transacted session must roll back its transaction in progress"
+
+ mc = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+ assertEquals(1, mc.intValue());
+
+ conn.close();
+
+ // make sure I can still get the right message
+
+ conn = cf.createConnection();
+ s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+ TextMessage rm = (TextMessage)s.createConsumer(queue).receive(1000);
+
+ assertEquals("bex", m.getText());
+
+ conn.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-09-19 19:29:41 UTC (rev 1322)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-09-20 00:48:57 UTC (rev 1323)
@@ -268,7 +268,7 @@
long messageID = 123456;
int consumerID = 65432;
- AckInfo ack = new AckInfo(messageID, consumerID);
+ AckInfo ack = new AckInfo(messageID, consumerID, -1);
Object[] args = new Object[] { ack };
@@ -358,9 +358,9 @@
mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
- AckInfo ackA = new AckInfo(1524, 71627);
- AckInfo ackB = new AckInfo(987987, 45354);
- AckInfo ackC = new AckInfo(32423, 4533);
+ AckInfo ackA = new AckInfo(1524, 71627, -1);
+ AckInfo ackB = new AckInfo(987987, 45354, -1);
+ AckInfo ackC = new AckInfo(32423, 4533, -1);
List acks = new ArrayList();
acks.add(ackA);
@@ -736,7 +736,7 @@
JBossMessage m = new JBossMessage(123);
MessageTest.configureMessage(m);
- AckInfo info = new AckInfo(123, 456);
+ AckInfo info = new AckInfo(123, 456, -1);
TxState state = new TxState();
state.getMessages().add(m);
@@ -854,8 +854,8 @@
List ids = new ArrayList();
- AckInfo ack1 = new AckInfo(1254, 78123);
- AckInfo ack2 = new AckInfo(786, 8979);
+ AckInfo ack1 = new AckInfo(1254, 78123, -1);
+ AckInfo ack2 = new AckInfo(786, 8979, -1);
ids.add(ack1);
ids.add(ack2);
More information about the jboss-cvs-commits
mailing list