[jboss-cvs] jboss-jms/tests/src/org/jboss/test/messaging/jms ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:52 EDT 2006
User: timfox
Date: 06/07/17 13:14:52
Modified: tests/src/org/jboss/test/messaging/jms
AcknowledgmentTest.java BrowserTest.java
CTSMiscellaneousTest.java
ConnectionConsumerTest.java
MessageConsumerTest.java MessageProxyTest.java
PersistenceTest.java ReferencingTest.java
SessionTest.java WireFormatTest.java XATest.java
XATransactionTest.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.32 +213 -35 jboss-jms/tests/src/org/jboss/test/messaging/jms/AcknowledgmentTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AcknowledgmentTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/AcknowledgmentTest.java,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -b -r1.31 -r1.32
--- AcknowledgmentTest.java 24 Jun 2006 09:05:40 -0000 1.31
+++ AcknowledgmentTest.java 17 Jul 2006 17:14:52 -0000 1.32
@@ -24,7 +24,6 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -36,6 +35,7 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
import javax.naming.InitialContext;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -47,7 +47,7 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
- * $Id: AcknowledgmentTest.java,v 1.31 2006/06/24 09:05:40 timfox Exp $
+ * $Id: AcknowledgmentTest.java,v 1.32 2006/07/17 17:14:52 timfox Exp $
*/
public class AcknowledgmentTest extends MessagingTestCase
{
@@ -160,6 +160,7 @@
//receive but rollback
TextMessage m2 = (TextMessage)sub.receive(3000);
+
assertNotNull(m2);
assertEquals("testing123", m2.getText());
@@ -256,7 +257,93 @@
}
}
+ public void testTransactionalAcknowlegment() throws Exception
+ {
+
+ Connection conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(0);
+
+ producerSess.rollback();
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+ assertRemainingMessages(0);
+
+ producerSess.commit();
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.rollback();
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Session rollback called");
+
+ Message m = null;
+
+ int i = 0;
+ for(; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive();
+ log.trace("Received message " + i);
+
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ // if I don't receive enough messages, the test will timeout
+
+ log.trace("Received " + i + " messages after recover");
+
+ consumerSess.commit();
+
+ assertRemainingMessages(0);
+
+ // make sure I don't receive anything else
+
+ m = consumer.receive(200);
+ assertNull(m);
+ conn.close();
+
+ }
/**
* Send some messages, don't acknowledge them and verify that they are re-sent on recovery.
@@ -282,6 +369,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent messages");
int count = 0;
@@ -292,30 +381,41 @@
count++;
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Received " + count + " messages");
assertEquals(count, NUM_MESSAGES);
consumerSess.recover();
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Session recover called");
+ Message m = null;
int i = 0;
for(; i < NUM_MESSAGES; i++)
{
- consumer.receive();
+ m = consumer.receive();
log.trace("Received message " + i);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
// if I don't receive enough messages, the test will timeout
log.trace("Received " + i + " messages after recover");
+ m.acknowledge();
+
+ assertRemainingMessages(0);
+
// make sure I don't receive anything else
- Message m = consumer.receive(200);
+ m = consumer.receive(200);
assertNull(m);
conn.close();
@@ -340,7 +440,7 @@
MessageConsumer consumer = consumerSess.createConsumer(queue);
conn.start();
- final int NUM_MESSAGES = 1;
+ final int NUM_MESSAGES = 20;
for (int i = 0; i < NUM_MESSAGES; i++)
{
@@ -348,6 +448,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent " + NUM_MESSAGES + " messages");
int count = 0;
@@ -360,10 +462,17 @@
break;
}
log.trace("Acking session");
+
+ assertRemainingMessages(NUM_MESSAGES - count);
+
m.acknowledge();
+
+ assertRemainingMessages(NUM_MESSAGES - (count + 1));
count++;
}
+ assertRemainingMessages(0);
+
assertEquals(NUM_MESSAGES, count);
log.trace("received and acknowledged " + count + " messages");
@@ -409,6 +518,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent messages");
Message m = null;
@@ -420,10 +531,14 @@
count++;
}
+ assertRemainingMessages(NUM_MESSAGES);
+
assertNotNull(m);
m.acknowledge();
+ assertRemainingMessages(0);
+
log.trace("Received " + count + " messages");
assertEquals(count, NUM_MESSAGES);
@@ -443,7 +558,6 @@
}
-
/**
* Send some messages, acknowledge some of them, and verify that the others are resent after
* delivery
@@ -470,6 +584,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent messages");
int count = 0;
@@ -489,6 +605,8 @@
count++;
}
+ assertRemainingMessages(NUM_MESSAGES - ACKED_MESSAGES);
+
assertNotNull(m);
log.trace("Received " + count + " messages");
@@ -507,7 +625,7 @@
count++;
}
- assertEquals(count, NUM_MESSAGES - ACKED_MESSAGES);
+ assertEquals(NUM_MESSAGES - ACKED_MESSAGES, count);
conn.close();
@@ -540,6 +658,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent messages");
int count = 0;
@@ -547,11 +667,18 @@
Message m = null;
for (int i = 0; i < NUM_MESSAGES; i++)
{
+ assertRemainingMessages(NUM_MESSAGES - i);
+
m = consumer.receive(200);
+
+ assertRemainingMessages(NUM_MESSAGES - (i + 1));
+
if (m == null) break;
count++;
}
+ assertRemainingMessages(0);
+
assertNotNull(m);
log.trace("Received " + count + " messages");
@@ -598,6 +725,8 @@
producer.send(m);
}
+ assertRemainingMessages(NUM_MESSAGES);
+
log.trace("Sent messages");
int count = 0;
@@ -610,6 +739,8 @@
count++;
}
+ assertRemainingMessages(0);
+
assertNotNull(m);
log.trace("Received " + count + " messages");
@@ -644,7 +775,7 @@
prod.send(tm3);
sessSend.close();
- log.debug("all messages sent");
+ assertRemainingMessages(3);
conn.start();
@@ -656,6 +787,8 @@
listener.waitForMessages();
+ assertRemainingMessages(0);
+
conn.close();
assertFalse(listener.failed);
}
@@ -674,13 +807,18 @@
prod.send(tm3);
sessSend.close();
+ assertRemainingMessages(3);
+
conn.start();
Session sessReceive = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons = sessReceive.createConsumer(queue);
MessageListenerClientAck listener = new MessageListenerClientAck(sessReceive);
cons.setMessageListener(listener);
+
listener.waitForMessages();
+ assertRemainingMessages(0);
+
conn.close();
assertFalse(listener.failed);
@@ -701,6 +839,8 @@
prod.send(tm3);
sessSend.close();
+ assertRemainingMessages(3);
+
conn.start();
Session sessReceive = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = sessReceive.createConsumer(queue);
@@ -708,6 +848,8 @@
cons.setMessageListener(listener);
listener.waitForMessages();
+ assertRemainingMessages(0);
+
conn.close();
assertFalse(listener.failed);
@@ -742,6 +884,7 @@
public void waitForMessages() throws InterruptedException
{
latch.acquire();
+ Thread.sleep(500);
}
public void onMessage(Message m)
@@ -750,16 +893,14 @@
{
count++;
- log.trace("Message is:" + m);
-
TextMessage tm = (TextMessage)m;
- log.trace("Got message:" + tm.getText());
-
// Receive first three messages then recover() session
// Only last message should be redelivered
if (count == 1)
{
+ assertRemainingMessages(3);
+
if (!"a".equals(tm.getText()))
{
failed = true;
@@ -768,6 +909,8 @@
}
if (count == 2)
{
+ assertRemainingMessages(2);
+
if (!"b".equals(tm.getText()))
{
failed = true;
@@ -776,6 +919,8 @@
}
if (count == 3)
{
+ assertRemainingMessages(1);
+
if (!"c".equals(tm.getText()))
{
failed = true;
@@ -785,6 +930,8 @@
}
if (count == 4)
{
+ assertRemainingMessages(1);
+
if (!"c".equals(tm.getText()))
{
failed = true;
@@ -794,7 +941,7 @@
}
}
- catch (JMSException e)
+ catch (Exception e)
{
failed = true;
latch.release();
@@ -824,6 +971,7 @@
public void waitForMessages() throws InterruptedException
{
latch.acquire();
+ Thread.sleep(500);
}
public void onMessage(Message m)
@@ -833,10 +981,10 @@
count++;
TextMessage tm = (TextMessage)m;
- log.trace("Got message " + tm.getText() + " count=" + count);
if (count == 1)
{
+ assertRemainingMessages(3);
if (!"a".equals(tm.getText()))
{
log.trace("Expected a but got " + tm.getText());
@@ -846,6 +994,7 @@
}
if (count == 2)
{
+ assertRemainingMessages(3);
if (!"b".equals(tm.getText()))
{
log.trace("Expected b but got " + tm.getText());
@@ -855,9 +1004,10 @@
}
if (count == 3)
{
- log.trace("Expected c but got " + tm.getText());
+ assertRemainingMessages(3);
if (!"c".equals(tm.getText()))
{
+ log.trace("Expected c but got " + tm.getText());
failed = true;
latch.release();
}
@@ -865,20 +1015,23 @@
}
if (count == 4)
{
- log.trace("Expected a but got " + tm.getText());
+ assertRemainingMessages(3);
if (!"a".equals(tm.getText()))
{
+ log.trace("Expected a but got " + tm.getText());
failed = true;
latch.release();
}
tm.acknowledge();
+ assertRemainingMessages(2);
sess.recover();
}
if (count == 5)
{
- log.trace("Expected b but got " + tm.getText());
+ assertRemainingMessages(2);
if (!"b".equals(tm.getText()))
{
+ log.trace("Expected b but got " + tm.getText());
failed = true;
latch.release();
}
@@ -886,27 +1039,32 @@
}
if (count == 6)
{
- log.trace("Expected b but got " + tm.getText());
+ assertRemainingMessages(2);
if (!"b".equals(tm.getText()))
{
+ log.trace("Expected b but got " + tm.getText());
failed = true;
latch.release();
}
}
if (count == 7)
{
- log.trace("Expected c but got " + tm.getText());
+ assertRemainingMessages(2);
if (!"c".equals(tm.getText()))
{
+ log.trace("Expected c but got " + tm.getText());
failed = true;
latch.release();
}
- }
+ tm.acknowledge();
+ assertRemainingMessages(0);
latch.release();
+ }
}
- catch (JMSException e)
+ catch (Exception e)
{
+ log.error("Caught exception", e);
failed = true;
latch.release();
}
@@ -934,6 +1092,9 @@
public void waitForMessages() throws InterruptedException
{
latch.acquire();
+
+ //Wait for postdeliver to be called
+ Thread.sleep(500);
}
public void onMessage(Message m)
@@ -944,10 +1105,9 @@
TextMessage tm = (TextMessage)m;
- log.trace("Got message:" + tm.getText());
-
if (count == 1)
{
+ assertRemainingMessages(3);
if (!"a".equals(tm.getText()))
{
failed = true;
@@ -956,6 +1116,7 @@
}
if (count == 2)
{
+ assertRemainingMessages(3);
if (!"b".equals(tm.getText()))
{
failed = true;
@@ -964,6 +1125,7 @@
}
if (count == 3)
{
+ assertRemainingMessages(3);
if (!"c".equals(tm.getText()))
{
failed = true;
@@ -974,6 +1136,7 @@
}
if (count == 4)
{
+ assertRemainingMessages(3);
if (!"a".equals(tm.getText()))
{
failed = true;
@@ -982,6 +1145,7 @@
}
if (count == 5)
{
+ assertRemainingMessages(3);
if (!"b".equals(tm.getText()))
{
failed = true;
@@ -989,9 +1153,11 @@
}
log.trace("commit");
sess.commit();
+ assertRemainingMessages(1);
}
if (count == 6)
{
+ assertRemainingMessages(1);
if (!"c".equals(tm.getText()))
{
failed = true;
@@ -1002,6 +1168,7 @@
}
if (count == 7)
{
+ assertRemainingMessages(1);
if (!"c".equals(tm.getText()))
{
failed = true;
@@ -1009,10 +1176,11 @@
}
log.trace("Commit");
sess.commit();
+ assertRemainingMessages(0);
latch.release();
}
}
- catch (JMSException e)
+ catch (Exception e)
{
//log.error(e);
failed = true;
@@ -1022,6 +1190,16 @@
}
+ private boolean assertRemainingMessages(int expected) throws Exception
+ {
+ ObjectName destObjectName =
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue");
+ Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "MessageCount");
+ assertEquals(expected, messageCount.intValue());
+ return expected == messageCount.intValue();
+ }
+
+
}
1.19 +4 -3 jboss-jms/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BrowserTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/BrowserTest.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -b -r1.18 -r1.19
--- BrowserTest.java 28 Mar 2006 14:26:20 -0000 1.18
+++ BrowserTest.java 17 Jul 2006 17:14:52 -0000 1.19
@@ -43,9 +43,9 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.18 $</tt>
+ * @version <tt>$Revision: 1.19 $</tt>
*
- * $Id: BrowserTest.java,v 1.18 2006/03/28 14:26:20 timfox Exp $
+ * $Id: BrowserTest.java,v 1.19 2006/07/17 17:14:52 timfox Exp $
*/
public class BrowserTest extends MessagingTestCase
{
@@ -157,6 +157,7 @@
for (int i = 0; i < numMessages; i++)
{
Message m = session.createMessage();
+ m.setIntProperty("cnt", i);
producer.send(m);
}
@@ -185,7 +186,7 @@
for (int i = 0; i < numMessages; i++)
{
- mc.receive();
+ Message m = mc.receive();
}
browser = session.createBrowser(queue);
1.6 +7 -2 jboss-jms/tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CTSMiscellaneousTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- CTSMiscellaneousTest.java 24 Jun 2006 09:05:40 -0000 1.5
+++ CTSMiscellaneousTest.java 17 Jul 2006 17:14:52 -0000 1.6
@@ -54,9 +54,9 @@
* Safeguards for previously detected TCK failures.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.5 $</tt>
+ * @version <tt>$Revision: 1.6 $</tt>
*
- * $Id: CTSMiscellaneousTest.java,v 1.5 2006/06/24 09:05:40 timfox Exp $
+ * $Id: CTSMiscellaneousTest.java,v 1.6 2006/07/17 17:14:52 timfox Exp $
*/
public class CTSMiscellaneousTest extends MessagingTestCase
{
@@ -209,6 +209,11 @@
}
}
+ /*
+ * I don't think this test is valid since it assumes the message is going to go cons2 on rollback-
+ * whereas in reality the point to point router will give it to the first available consumer
+ * which in this case is cons, not cons2.
+ */
public void testContestedQueueOnRollback() throws Exception
{
ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
1.28 +3 -11 jboss-jms/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConnectionConsumerTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -b -r1.27 -r1.28
--- ConnectionConsumerTest.java 24 Jun 2006 09:05:40 -0000 1.27
+++ ConnectionConsumerTest.java 17 Jul 2006 17:14:52 -0000 1.28
@@ -45,9 +45,9 @@
* ConnectionConsumer tests
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.27 $</tt>
+ * @version <tt>$Revision: 1.28 $</tt>
*
- * $Id: ConnectionConsumerTest.java,v 1.27 2006/06/24 09:05:40 timfox Exp $
+ * $Id: ConnectionConsumerTest.java,v 1.28 2006/07/17 17:14:52 timfox Exp $
*/
public class ConnectionConsumerTest extends MessagingTestCase
{
@@ -123,8 +123,6 @@
JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue, null, pool, 1);
- log.trace("Started connection consumer");
-
connProducer = cf.createConnection();
Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -136,8 +134,6 @@
prod.send(m);
}
- log.trace("Sent messages");
-
//Wait for messages
listener.waitForLatch(10000);
@@ -343,10 +339,6 @@
{
try
{
-
-
- //log.trace("Received message " + msgsReceived);
-
TextMessage tm = (TextMessage)message;
if (!tm.getText().equals("testing testing"))
1.85 +167 -128 jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MessageConsumerTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java,v
retrieving revision 1.84
retrieving revision 1.85
diff -u -b -r1.84 -r1.85
--- MessageConsumerTest.java 24 Jun 2006 09:05:40 -0000 1.84
+++ MessageConsumerTest.java 17 Jul 2006 17:14:52 -0000 1.85
@@ -61,9 +61,9 @@
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.84 $</tt>
+ * @version <tt>$Revision: 1.85 $</tt>
*
- * $Id: MessageConsumerTest.java,v 1.84 2006/06/24 09:05:40 timfox Exp $
+ * $Id: MessageConsumerTest.java,v 1.85 2006/07/17 17:14:52 timfox Exp $
*/
public class MessageConsumerTest extends MessagingTestCase
{
@@ -135,7 +135,6 @@
public void tearDown() throws Exception
{
-
producerConnection.close();
consumerConnection.close();
@@ -325,6 +324,15 @@
// start consumer connection after the message is submitted
consumerConnection.start();
+ //NOTE! There semantics of receiveNoWait do not guarantee the message is available
+ //immediately after the message is sent
+ //It will be available some indeterminate time later.
+ //This is fine and as per spec.
+ //To implement receiveNoWait otherwise would be very costly
+ //Also other messaging systems e.g. Sun implement it this way
+
+ Thread.sleep(500);
+
TextMessage m = (TextMessage)queueConsumer.receiveNoWait();
assertEquals(tm.getText(), m.getText());
}
@@ -382,37 +390,40 @@
// closed consumer tests
//
- public void testClose1() throws Exception
- {
- // there is a consumer already open by setup
-
- consumerConnection.start();
-
- Message m = producerSession.createMessage();
- queueProducer.send(m);
-
- // the message is in the channel, however the queue maintains it as "not delivered"
+ //This test is not valid -
+ //The message will not be in the new consumer it will be in the original consumer
- QueueBrowser browser = producerSession.createBrowser(queue);
- Enumeration e = browser.getEnumeration();
- Message bm = (Message)e.nextElement();
- assertEquals(m.getJMSMessageID(), bm.getJMSMessageID());
- assertFalse(e.hasMoreElements());
-
- // create a second consumer and try to receive from queue, it should return the message
- MessageConsumer queueConsumer2 = consumerSession.createConsumer(queue);
-
- Message rm = queueConsumer2.receive(3000);
- assertEquals(m.getJMSMessageID(), rm.getJMSMessageID());
-
- queueConsumer.close();
-
-
- // try to receive from queue again, it should get a message
- rm = queueConsumer2.receive(3000);
- assertNull(rm);
-
- }
+// public void testClose1() throws Exception
+// {
+// // there is a consumer already open by setup
+//
+// consumerConnection.start();
+//
+// Message m = producerSession.createMessage();
+// queueProducer.send(m);
+//
+// // the message is in the channel, however the queue maintains it as "not delivered"
+//
+// QueueBrowser browser = producerSession.createBrowser(queue);
+// Enumeration e = browser.getEnumeration();
+// Message bm = (Message)e.nextElement();
+// assertEquals(m.getJMSMessageID(), bm.getJMSMessageID());
+// assertFalse(e.hasMoreElements());
+//
+// // create a second consumer and try to receive from queue, it should return the message
+// MessageConsumer queueConsumer2 = consumerSession.createConsumer(queue);
+//
+// Message rm = queueConsumer2.receive(3000);
+// assertEquals(m.getJMSMessageID(), rm.getJMSMessageID());
+//
+// queueConsumer.close();
+//
+//
+// // try to receive from queue again, it should get a message
+// rm = queueConsumer2.receive(3000);
+// assertNull(rm);
+//
+// }
/* Test that an ack can be sent after the consumer that received the message has been closed.
* Acks are scoped per session.
@@ -763,8 +774,10 @@
assertEquals("hello1", rm1.getText());
log.trace(rm1.getJMSMessageID());
+ log.trace("rolling back");
//rollback should cause redelivery of messages not acked
sess.rollback();
+ log.trace("rolled back");
TextMessage rm2 = (TextMessage)cons1.receive(1500);
assertEquals("hello1", rm2.getText());
@@ -818,14 +831,14 @@
cons1.close();
- MessageConsumer cons2 = sess.createConsumer(queue);
-
//rollback should cause redelivery of messages
//in this case redelivery occurs to a different receiver
sess.rollback();
+ MessageConsumer cons2 = sess.createConsumer(queue);
+
TextMessage rm2 = (TextMessage)cons2.receive(1500);
assertNotNull(rm2);
assertEquals("hello1", rm2.getText());
@@ -933,13 +946,13 @@
cons1.close();
- MessageConsumer cons2 = sess.createConsumer(queue);
-
log.debug("sess.recover()");
//redeliver
sess.recover();
+ MessageConsumer cons2 = sess.createConsumer(queue);
+
log.debug("receiving ...");
TextMessage rm2 = (TextMessage)cons2.receive(1500);
@@ -994,12 +1007,19 @@
Message r1 = cons1.receive();
+ log.trace("Got first message");
+
cons1.close();
+ log.trace("Closed consumer");
+
MessageConsumer cons2 = sess.createConsumer(queue);
+ log.trace("Wairting for second message");
Message r2 = cons2.receive();
+ log.trace("got second message");
+
Message r3 = cons2.receive();
r1.acknowledge();
@@ -1066,96 +1086,102 @@
/**
* Test server-side consumer delegate activation (on receive())
*/
- public void testReceive1() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- s.createConsumer(queue);
-
- conn.start();
-
- MessageProducer p = s.createProducer(queue);
- Message m = s.createTextMessage("1");
- p.send(m);
-
- MessageConsumer c2 = s.createConsumer(queue);
-
- // TODO the test should be modified to deal with the multiple consumer receive undeterminism
-
- Message r = c2.receive(2000);
+// This test is not valid since the message will be in the first consumer, not c2
- assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
+// public void testReceive1() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = cf.createConnection();
+//
+// Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// s.createConsumer(queue);
+//
+// conn.start();
+//
+// MessageProducer p = s.createProducer(queue);
+// Message m = s.createTextMessage("1");
+// p.send(m);
+//
+// MessageConsumer c2 = s.createConsumer(queue);
+//
+// // TODO the test should be modified to deal with the multiple consumer receive undeterminism
+//
+// Message r = c2.receive(2000);
+//
+//
+// assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
/**
* Test server-side consumer delegate activation (on receive())
*/
- public void testReceive2() throws Exception
- {
- Connection conn = null;
- try
- {
- conn = cf.createConnection();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- s.createConsumer(queue);
-
- conn.start();
+ //This test is not valid since the message will be in the first consumer, not c2
- MessageProducer p = s.createProducer(queue);
- Message m = s.createTextMessage("1");
- p.send(m);
-
- MessageConsumer c2 = s.createConsumer(queue);
- final Set received = new HashSet();
-
- // TODO the test should be modified to deal with the multiple consumer receive undeterminism
-
- class Listener implements MessageListener
- {
- Latch latch = new Latch();
-
- public void onMessage(Message m)
- {
- received.add(m);
- latch.release();
- }
- }
-
- Listener list = new Listener();
- c2.setMessageListener(list);
-
- list.latch.acquire();
- assertEquals(1, received.size());
- Message r = (Message)received.iterator().next();
- assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
+// public void testReceive2() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = cf.createConnection();
+//
+// Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// s.createConsumer(queue);
+//
+// conn.start();
+//
+// MessageProducer p = s.createProducer(queue);
+// Message m = s.createTextMessage("1");
+// p.send(m);
+//
+// MessageConsumer c2 = s.createConsumer(queue);
+// final Set received = new HashSet();
+//
+// // TODO the test should be modified to deal with the multiple consumer receive undeterminism
+//
+// class Listener implements MessageListener
+// {
+// Latch latch = new Latch();
+//
+// public void onMessage(Message m)
+// {
+// received.add(m);
+// latch.release();
+// }
+// }
+//
+// Listener list = new Listener();
+// c2.setMessageListener(list);
+//
+// list.latch.acquire();
+// assertEquals(1, received.size());
+// Message r = (Message)received.iterator().next();
+// assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
+//
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
public void testSendAndReceivePersistentDifferentConnections() throws Exception
@@ -1526,7 +1552,7 @@
topicProducer.send(m1);
// block this thread for a while to allow ServerConsumerDelegate's delivery thread to kick in
- Thread.sleep(5);
+ Thread.sleep(500);
m = topicConsumer.receiveNoWait();
@@ -2076,10 +2102,19 @@
Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session sess2 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ log.trace("careting consumer1");
MessageConsumer cons1 = sess1.createConsumer(topic);
+
+ log.trace("creating consumer2");
MessageConsumer cons2 = sess2.createConsumer(topic);
+
+ log.trace("starting connection");
+
conn1.start();
+ log.trace("started connection");
+
Session sess3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess3.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
@@ -2116,6 +2151,7 @@
{
if (conn1 != null)
{
+ log.trace("closing connection");
conn1.close();
}
}
@@ -2831,6 +2867,9 @@
public void testRedeliveredDifferentSessions() throws Exception
{
+ producerSession.close();
+ consumerSession.close();
+
Session sessProducer = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessProducer.createProducer(queue);
TextMessage tm = sessProducer.createTextMessage("testRedeliveredDifferentSessions");
1.5 +118 -5 jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageProxyTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MessageProxyTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageProxyTest.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- MessageProxyTest.java 31 Mar 2006 21:20:12 -0000 1.4
+++ MessageProxyTest.java 17 Jul 2006 17:14:52 -0000 1.5
@@ -23,14 +23,15 @@
import java.util.Map;
-import javax.naming.InitialContext;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.naming.InitialContext;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.message.JBossMessage;
@@ -91,6 +92,121 @@
// Public --------------------------------------------------------
+
+ public void testMessageIDs1() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ Message msent = sess.createMessage();
+
+ prod.send(msent);
+
+ Message mrec = cons.receive();
+
+ //The two ids should be the same
+
+ long id1 = ((MessageProxy)msent).getMessage().getMessageID();
+ long id2 = ((MessageProxy)mrec).getMessage().getMessageID();
+
+ assertEquals(id1, id2);
+
+ //Now send the message again
+ prod.send(msent);
+
+ //The sent id should be different
+ long id3 = ((MessageProxy)msent).getMessage().getMessageID();
+ long id4 = ((MessageProxy)mrec).getMessage().getMessageID();
+
+ assertFalse(id1 == id3);
+
+ //But this shouldn't affect the received id
+ assertEquals(id2, id4);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+
+ public void testMessageIDs2() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ Message msent = sess.createMessage();
+
+ prod.send(msent);
+
+ Message mrec = cons.receive();
+
+ //The two ids should be the same
+
+ long id1 = ((MessageProxy)msent).getMessage().getMessageID();
+ long id2 = ((MessageProxy)mrec).getMessage().getMessageID();
+
+ assertEquals(id1, id2);
+
+ //Now send the received again
+ prod.send(mrec);
+
+ //The sent id should be different
+ long id3 = ((MessageProxy)msent).getMessage().getMessageID();
+
+ //But this shouldn't affect the sent id
+ assertEquals(id1, id3);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
public void testCopyAfterSend() throws Exception
{
if (ServerManagement.isRemote())
@@ -259,9 +375,6 @@
//And the bodies shouldn't be copied since we didn't change it either
assertTrue(usent_2.getPayload() == usent_1.getPayload());
- log.info("urec2:" + urec_2.getPayload());
- log.info("urec1:" + urec_1.getPayload());
-
assertTrue(urec_2.getPayload() == urec_1.getPayload());
assertTrue(usent_1.getPayload() == urec_1.getPayload());
1.17 +2 -2 jboss-jms/tests/src/org/jboss/test/messaging/jms/PersistenceTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: PersistenceTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/PersistenceTest.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -b -r1.16 -r1.17
--- PersistenceTest.java 30 Mar 2006 23:52:43 -0000 1.16
+++ PersistenceTest.java 17 Jul 2006 17:14:52 -0000 1.17
@@ -39,7 +39,7 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
- * $Id: PersistenceTest.java,v 1.16 2006/03/30 23:52:43 ovidiu Exp $
+ * $Id: PersistenceTest.java,v 1.17 2006/07/17 17:14:52 timfox Exp $
*/
public class PersistenceTest extends MessagingTestCase
{
@@ -246,7 +246,7 @@
assertEquals("a", t.getText());
}
{
- TextMessage t = (TextMessage)cons.receiveNoWait();
+ TextMessage t = (TextMessage)cons.receive(500);
assertNull(t);
}
1.18 +5 -3 jboss-jms/tests/src/org/jboss/test/messaging/jms/ReferencingTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ReferencingTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/ReferencingTest.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -b -r1.17 -r1.18
--- ReferencingTest.java 28 Mar 2006 14:26:20 -0000 1.17
+++ ReferencingTest.java 17 Jul 2006 17:14:52 -0000 1.18
@@ -41,9 +41,9 @@
* A ReferencingTest.
*
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*
- * $Id: ReferencingTest.java,v 1.17 2006/03/28 14:26:20 timfox Exp $
+ * $Id: ReferencingTest.java,v 1.18 2006/07/17 17:14:52 timfox Exp $
*/
public class ReferencingTest extends MessagingTestCase
{
@@ -87,6 +87,8 @@
queue = (Destination)initialContext.lookup("/queue/Queue");
+ this.drainDestination(cf, queue);
+
}
public void tearDown() throws Exception
1.29 +5 -3 jboss-jms/tests/src/org/jboss/test/messaging/jms/SessionTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SessionTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/SessionTest.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -b -r1.28 -r1.29
--- SessionTest.java 20 Apr 2006 20:42:27 -0000 1.28
+++ SessionTest.java 17 Jul 2006 17:14:52 -0000 1.29
@@ -50,9 +50,9 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.28 $</tt>
+ * @version <tt>$Revision: 1.29 $</tt>
*
- * $Id: SessionTest.java,v 1.28 2006/04/20 20:42:27 timfox Exp $
+ * $Id: SessionTest.java,v 1.29 2006/07/17 17:14:52 timfox Exp $
*/
public class SessionTest extends MessagingTestCase
{
@@ -162,7 +162,9 @@
MessageConsumer c = sess.createConsumer(queue);
conn.start();
- TextMessage rm = (TextMessage)c.receiveNoWait();
+
+ //receiveNoWait is not guaranteed to return message immediately
+ TextMessage rm = (TextMessage)c.receive(1000);
assertEquals("something", rm.getText());
}
1.12 +470 -550 jboss-jms/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: WireFormatTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- WireFormatTest.java 27 Jun 2006 00:43:26 -0000 1.11
+++ WireFormatTest.java 17 Jul 2006 17:14:52 -0000 1.12
@@ -24,6 +24,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -33,12 +35,13 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.DeliveryRunnable;
+import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.JMSWireFormat;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.tx.AckInfo;
@@ -49,10 +52,8 @@
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvocationResponse;
import org.jboss.remoting.InvokerLocator;
-import org.jboss.serial.io.JBossObjectInputStream;
-import org.jboss.serial.io.JBossObjectOutputStream;
-import org.jboss.test.messaging.jms.message.MessageTest;
import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.jms.message.MessageTest;
import org.jboss.util.id.GUID;
/**
@@ -71,7 +72,7 @@
private static final Logger log = Logger.getLogger(WireFormatTest.class);
// TODO - replace with a dynamic value
- private static final byte CURRENT_VERSION = 3;
+ private static final byte CURRENT_VERSION = 4;
// Static --------------------------------------------------------
@@ -79,21 +80,26 @@
protected TestWireFormat wf;
+ //Session
+
protected Method sendMethod;
protected Method acknowledgeMethod;
- protected Method activateMethod;
+ protected Method acknowledgeBatchMethod;
+
+ protected Method cancelDeliveriesMethod;
- protected Method deactivateMethod;
+ //Consumer
- protected Method getMessageNowMethod;
+ protected Method moreMethod;
- protected Method sendTransactionMethod;
- protected Method cancelDeliveryMethod;
+ //connection
- protected Method cancelDeliveriesMethod;
+ protected Method sendTransactionMethod;
+
+ //callback
// Constructors --------------------------------------------------
@@ -116,21 +122,25 @@
Class connectionDelegate = ConnectionDelegate.class;
+ //Session
+
sendMethod = sessionDelegate.getMethod("send", new Class[] { JBossMessage.class });
- acknowledgeMethod = sessionDelegate.getMethod("acknowledge", null);
+ acknowledgeMethod = sessionDelegate.getMethod("acknowledge", new Class[] { AckInfo.class });
- activateMethod = consumerDelegate.getMethod("activate", null);
+ acknowledgeBatchMethod = sessionDelegate.getMethod("acknowledgeBatch", new Class[] { java.util.List.class });
- deactivateMethod = consumerDelegate.getMethod("deactivate", null);
+ cancelDeliveriesMethod = sessionDelegate.getMethod("cancelDeliveries", new Class[] { java.util.List.class });
- getMessageNowMethod = consumerDelegate.getMethod("getMessageNow", new Class[] { Boolean.TYPE });
- sendTransactionMethod = connectionDelegate.getMethod("sendTransaction", new Class[] { TransactionRequest.class });
+ //Consumer
+
- cancelDeliveryMethod = consumerDelegate.getMethod("cancelDelivery", new Class[] { Long.TYPE });
+ moreMethod = consumerDelegate.getMethod("more", null);
- cancelDeliveriesMethod = consumerDelegate.getMethod("cancelDeliveries", new Class[] { List.class });
+ //Connection
+
+ sendTransactionMethod = connectionDelegate.getMethod("sendTransaction", new Class[] { TransactionRequest.class });
log.debug("setup done");
}
@@ -140,55 +150,53 @@
super.tearDown();
}
+ //Session
public void testAcknowledge() throws Exception
{
wf.testAcknowledge();
}
- public void testActivate() throws Exception
+ public void testAcknowledgeBatch() throws Exception
{
- wf.testActivate();
+ wf.testAcknowledgeBatch();
}
- public void testCallback() throws Exception
+ public void testSend() throws Exception
{
- wf.testCallback();
+ wf.testSend();
}
- public void testDeactivate() throws Exception
- {
- wf.testDeactivate();
- }
+ //Consumer
- public void testExceptionResponse() throws Exception
+ public void testMore() throws Exception
{
- wf.testExceptionResponse();
+ wf.testMore();
}
- public void testGetMessageNow() throws Exception
+ public void testCancelDeliveries() throws Exception
{
- wf.testGetMessageNow();
+ wf.testCancelDeliveries();
}
- public void testMessageResponse() throws Exception
- {
- wf.testMessageResponse();
- }
+ //Connection
- public void testNullResponse() throws Exception
+ public void testSendTransaction() throws Exception
{
- wf.testNullResponse();
+ wf.testSendTransaction();
}
- public void testSend() throws Exception
+ //Others
+
+
+ public void testExceptionResponse() throws Exception
{
- wf.testSend();
+ wf.testExceptionResponse();
}
- public void testSendTransaction() throws Exception
+ public void testNullResponse() throws Exception
{
- wf.testSendTransaction();
+ wf.testNullResponse();
}
public void testSerializableRequest() throws Exception
@@ -201,22 +209,24 @@
wf.testSerializableResponse();
}
- public void testDeactivateResponse() throws Exception
+ public void testCallBack() throws Exception
{
- wf.testDeactivateResponse();
+ wf.testCallback();
}
- public void testCancelDelivery() throws Exception
+ public void testIDBlockResponse() throws Exception
{
- wf.testCancelDelivery();
+ wf.testGetIdBlockResponse();
}
- public void testCancelDeliveries() throws Exception
+ public void testHandleMessageResponse() throws Exception
{
- wf.testCancelDeliveries();
+ wf.testHandleMessageResponse();
}
+
+
// Public --------------------------------------------------------
public static class SerializableObject implements Serializable
@@ -246,6 +256,221 @@
*/
class TestWireFormat extends JMSWireFormat
{
+ public void testAcknowledge() throws Exception
+ {
+ long methodHash = 62365354;
+
+ int objectId = 54321;
+
+ MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeMethod, acknowledgeMethod, null);
+
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ long messageID = 123456;
+ int consumerID = 65432;
+ AckInfo ack = new AckInfo(messageID, consumerID);
+
+ Object[] args = new Object[] { ack };
+
+ mi.setArguments(args);
+
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+
+ InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+ wf.write(ir, oos);
+
+ oos.flush();
+
+ byte[] bytes = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ ObjectInputStream ois = new ObjectInputStream(bis);
+
+ //Check the bytes
+
+ //First byte should be version
+ assertEquals(77, ois.readByte());
+
+ //First byte should be ACKNOWLEDGE
+ assertEquals(JMSWireFormat.ACKNOWLEDGE, ois.readByte());
+
+ //Next int should be objectId
+ assertEquals(objectId, ois.readInt());
+
+ //Next long should be methodHash
+ assertEquals(methodHash, ois.readLong());
+
+ //Next should be the externalized AckInfo
+ AckInfo ack2 = new AckInfo();
+
+ ack2.readExternal(ois);
+
+ assertEquals(ack.getMessageID(), ack2.getMessageID());
+ assertEquals(ack.getConsumerID(), ack2.getConsumerID());
+
+ //Now eos
+ try
+ {
+ ois.readByte();
+ fail("End of stream expected");
+ }
+ catch (EOFException e)
+ {
+ //Ok
+ }
+
+ bis.reset();
+
+ ois = new ObjectInputStream(bis);
+
+ InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+
+ mm = (MessagingMarshallable)ir2.getParameter();
+
+ assertEquals(77, mm.getVersion());
+
+ MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+
+ assertEquals(methodHash, mi2.getMethodHash());
+
+ assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+
+ AckInfo ack3 = (AckInfo)mi2.getArguments()[0];
+
+ assertEquals(ack3.getMessageID(), ack3.getMessageID());
+ assertEquals(ack3.getConsumerID(), ack3.getConsumerID());
+
+ }
+
+ public void testAcknowledgeBatch() throws Exception
+ {
+ long methodHash = 62365354;
+
+ int objectId = 54321;
+
+ MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeBatchMethod, acknowledgeBatchMethod, null);
+
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ AckInfo ackA = new AckInfo(1524, 71627);
+ AckInfo ackB = new AckInfo(987987, 45354);
+ AckInfo ackC = new AckInfo(32423, 4533);
+
+ List acks = new ArrayList();
+ acks.add(ackA);
+ acks.add(ackB);
+ acks.add(ackC);
+
+ Object[] args = new Object[] { acks };
+
+ mi.setArguments(args);
+
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+
+ InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+ wf.write(ir, oos);
+
+ oos.flush();
+
+ byte[] bytes = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ ObjectInputStream ois = new ObjectInputStream(bis);
+
+ //Check the bytes
+
+ //First byte should be version
+ assertEquals(77, ois.readByte());
+
+ //First byte should be ACKNOWLEDGE
+ assertEquals(JMSWireFormat.ACKNOWLEDGE_BATCH, ois.readByte());
+
+ //Next int should be objectId
+ assertEquals(objectId, ois.readInt());
+
+ //Next long should be methodHash
+ assertEquals(methodHash, ois.readLong());
+
+ //Next should be number of acks
+ assertEquals(3, ois.readInt());
+
+ //Now the acks
+ AckInfo ack = new AckInfo();
+
+ ack.readExternal(ois);
+
+ assertEquals(ackA.getMessageID(), ack.getMessageID());
+ assertEquals(ackA.getConsumerID(), ack.getConsumerID());
+
+ ack = new AckInfo();
+
+ ack.readExternal(ois);
+
+ assertEquals(ackB.getMessageID(), ack.getMessageID());
+ assertEquals(ackB.getConsumerID(), ack.getConsumerID());
+
+ ack = new AckInfo();
+
+ ack.readExternal(ois);
+
+ assertEquals(ackC.getMessageID(), ack.getMessageID());
+ assertEquals(ackC.getConsumerID(), ack.getConsumerID());
+
+
+ //Now eos
+ try
+ {
+ ois.readByte();
+ fail("End of stream expected");
+ }
+ catch (EOFException e)
+ {
+ //Ok
+ }
+
+ bis.reset();
+
+ ois = new ObjectInputStream(bis);
+
+ InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+
+ mm = (MessagingMarshallable)ir2.getParameter();
+
+ assertEquals(77, mm.getVersion());
+
+ MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+
+ assertEquals(methodHash, mi2.getMethodHash());
+
+ assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+
+ List acks2 = (List)mi2.getArguments()[0];
+
+ assertEquals(3, acks.size());
+
+ assertEquals(ackA.getMessageID(), ((AckInfo)(acks2.get(0))).getMessageID());
+ assertEquals(ackA.getConsumerID(), ((AckInfo)(acks2.get(0))).getConsumerID());
+
+ assertEquals(ackB.getMessageID(), ((AckInfo)(acks2.get(1))).getMessageID());
+ assertEquals(ackB.getConsumerID(), ((AckInfo)(acks2.get(1))).getConsumerID());
+
+ assertEquals(ackC.getMessageID(), ((AckInfo)(acks2.get(2))).getMessageID());
+ assertEquals(ackC.getConsumerID(), ((AckInfo)(acks2.get(2))).getConsumerID());
+ }
+
+
/*
* Test that general serializable invocation requests are marshalled correctky
*/
@@ -275,7 +500,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -285,7 +510,7 @@
// Check the bytes
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
// First byte should be version
byte version = ois.readByte();
@@ -294,7 +519,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -336,7 +561,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -344,7 +569,7 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
// First byte should be version
byte version = ois.readByte();
@@ -353,7 +578,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
@@ -380,7 +605,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -388,7 +613,7 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
// First byte should be version
byte version = ois.readByte();
@@ -397,7 +622,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
@@ -435,7 +660,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -445,7 +670,7 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
//Check the bytes
@@ -486,7 +711,7 @@
MessageTest.ensureEquivalent(m, m2);
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -535,7 +760,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -545,7 +770,7 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
//Check the bytes
@@ -591,7 +816,7 @@
assertEquals(info.getMessageID(), info2.getMessageID());
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -620,19 +845,25 @@
}
- public void testCancelDelivery() throws Exception
+
+ public void testCancelDeliveries() throws Exception
{
long methodHash = 62365354;
int objectId = 54321;
- Long lid = new Long(87654321);
+ List ids = new ArrayList();
+
+ AckInfo ack1 = new AckInfo(1254, 78123);
+ AckInfo ack2 = new AckInfo(786, 8979);
+ ids.add(ack1);
+ ids.add(ack2);
- MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveryMethod, cancelDeliveryMethod, null);
+ MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
- mi.setArguments(new Object[] {lid});
+ mi.setArguments(new Object[] {ids});
MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
@@ -640,7 +871,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -650,15 +881,15 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
//Check the bytes
//First byte should be version
assertEquals(77, ois.readByte());
- //Next byte should be CANCEL_MESSAGE
- assertEquals(JMSWireFormat.CANCEL_DELIVERY, ois.readByte());
+ //Next byte should be CANCEL_MESSAGES
+ assertEquals(JMSWireFormat.CANCEL_DELIVERIES, ois.readByte());
//Next int should be objectId
assertEquals(objectId, ois.readInt());
@@ -666,11 +897,28 @@
//Next long should be methodHash
assertEquals(methodHash, ois.readLong());
- //Next should come the ID
+ //Next should the size of the list
+
+ int size = ois.readInt();
+
+ assertEquals(2, size);
+
+ //then the AckInfos
+ AckInfo rack1 = new AckInfo();
+
+ AckInfo rack2 = new AckInfo();
+
+ rack1.readExternal(ois);
+
+ rack2.readExternal(ois);
+
+ assertEquals(ack1.getConsumerID(), rack1.getConsumerID());
+
+ assertEquals(ack1.getMessageID(), rack1.getMessageID());
- long id = ois.readLong();
+ assertEquals(ack2.getConsumerID(), rack2.getConsumerID());
- assertEquals(lid, new Long(id));
+ assertEquals(ack2.getMessageID(), rack2.getMessageID());
//should be eos
@@ -686,7 +934,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -700,76 +948,46 @@
assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
- Long lid2 = (Long)mi2.getArguments()[0];
-
- assertEquals(lid, lid2);
- }
-
- public void testCancelDeliveries() throws Exception
- {
- long methodHash = 62365354;
+ List list = (List)mi2.getArguments()[0];
- int objectId = 54321;
+ assertEquals(2, list.size());
- List ids = new ArrayList();
- ids.add(new Long(123));
- ids.add(new Long(456));
+ AckInfo xack1 = (AckInfo)list.get(0);
+ AckInfo xack2 = (AckInfo)list.get(1);
- MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
+ assertEquals(ack1.getConsumerID(), xack1.getConsumerID());
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+ assertEquals(ack1.getMessageID(), xack1.getMessageID());
- mi.setArguments(new Object[] {ids});
+ assertEquals(ack2.getConsumerID(), xack2.getConsumerID());
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+ assertEquals(ack2.getMessageID(), xack2.getMessageID());
+ }
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+ public void testNullResponse() throws Exception
+ {
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, null);
+ InvocationResponse resp = new InvocationResponse(null, mm, false, null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
- wf.write(ir, oos);
-
+ wf.write(resp, oos);
oos.flush();
- byte[] bytes = bos.toByteArray();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ // First byte should be version
+ assertEquals(77, ois.readByte());
- //Check the bytes
+ // Should be 1 byte
+ byte b = ois.readByte();
- //First byte should be version
- assertEquals(77, ois.readByte());
-
- //Next byte should be CANCEL_MESSAGES
- assertEquals(JMSWireFormat.CANCEL_DELIVERIES, ois.readByte());
-
- //Next int should be objectId
- assertEquals(objectId, ois.readInt());
-
- //Next long should be methodHash
- assertEquals(methodHash, ois.readLong());
-
- //Next should the size of the list
-
- int size = ois.readInt();
-
- assertEquals(2, size);
-
- //then the longs
- long l1 = ois.readLong();
-
- long l2 = ois.readLong();
-
- assertEquals(123, l1);
-
- assertEquals(456, l2);
-
- //should be eos
+ assertEquals(JMSWireFormat.NULL_RESPONSE, b);
+ // Should be eos
try
{
ois.readByte();
@@ -780,73 +998,11 @@
//Ok
}
-
bis.reset();
- ois = new JBossObjectInputStream(bis);
-
- InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getParameter();
-
- assertEquals(77, mm.getVersion());
-
- MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-
- assertEquals(methodHash, mi2.getMethodHash());
-
- assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
-
- List list = (List)mi2.getArguments()[0];
-
- assertEquals(2, list.size());
-
- assertEquals(new Long(123), list.get(0));
- assertEquals(new Long(456), list.get(1));
- }
-
- public void testNullResponse() throws Exception
- {
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, null);
- InvocationResponse resp = new InvocationResponse(null, mm, false, null);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
- wf.write(resp, oos);
- oos.flush();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-
-
- // TODO - this section seems invalid
- // See http://jira.jboss.org/jira/browse/JBMESSAGING-361 and adjust the test accordingly
-
-// DataInputStream dis = new DataInputStream(bis);
-//
-// // First byte should be version
-// assertEquals(77, dis.readByte());
-//
-// // Should be 1 byte
-// byte b = dis.readByte();
-//
-// assertEquals(JMSWireFormat.NULL_RESPONSE, b);
-//
-// // Should be eos
-// try
-// {
-// dis.readByte();
-// fail("End of stream expected");
-// }
-// catch (EOFException e)
-// {
-// //Ok
-// }
-//
-// dis.reset();
// END of the invalid section
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
@@ -858,82 +1014,25 @@
}
- public void testDeactivateResponse() throws Exception
- {
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, new Long(123456));
-
- InvocationResponse resp = new InvocationResponse(null, mm, false, null);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
- wf.write(resp, oos);
-
- oos.flush();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-
- // TODO - this section seems invalid
- // See http://jira.jboss.org/jira/browse/JBMESSAGING-361 and adjust the test accordingly
-
-// DataInputStream dis = new DataInputStream(bis);
-//
-// //First byte should be version
-// assertEquals(77, dis.readByte());
-//
-// byte b = dis.readByte();
-//
-// assertEquals(JMSWireFormat.DEACTIVATE_RESPONSE, b);
-//
-// long l = dis.readLong();
-//
-// assertEquals(123456, l);
-//
-// //Should be eos
-// try
-// {
-// dis.readByte();
-// fail("End of stream expected");
-// }
-// catch (EOFException e)
-// {
-// //Ok
-// }
-//
-// dis.reset();
-
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
- InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getResult();
-
- assertEquals(77, mm.getVersion());
- assertEquals(new Long(123456), mm.getLoad());
-
- }
-
- public void testGetMessageNow() throws Exception
+ public void testMore() throws Exception
{
long methodHash = 62365354;
int objectId = 54321;
- MethodInvocation mi = new MethodInvocation(null, methodHash, getMessageNowMethod, getMessageNowMethod, null);
+ MethodInvocation mi = new MethodInvocation(null, methodHash, moreMethod, moreMethod, null);
mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
- mi.setArguments(new Object[] {Boolean.valueOf(true)});
-
MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -943,15 +1042,15 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
//Check the bytes
//First byte should be version
assertEquals(77, ois.readByte());
- //Second byte should be GETMESSAGENOW
- assertEquals(JMSWireFormat.GETMESSAGENOW, ois.readByte());
+ //Second byte should be MORE
+ assertEquals(JMSWireFormat.MORE, ois.readByte());
//Next int should be objectId
assertEquals(objectId, ois.readInt());
@@ -959,9 +1058,6 @@
//Next long should be methodHash
assertEquals(methodHash, ois.readLong());
- //Next boolean should be wait
- assertEquals(true, ois.readBoolean());
-
//Now eos
try
{
@@ -974,7 +1070,7 @@
}
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -987,273 +1083,104 @@
assertEquals(methodHash, mi2.getMethodHash());
assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
-
- boolean wait = ((Boolean)mi2.getArguments()[0]).booleanValue();
-
- assertEquals(true, wait);
-
}
- public void testActivate() throws Exception
- {
- long methodHash = 62365354;
- int objectId = 54321;
-
- MethodInvocation mi = new MethodInvocation(null, methodHash, activateMethod, activateMethod, null);
-
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
-
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
-
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
- wf.write(ir, oos);
-
- oos.flush();
-
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
-
- //Check the bytes
-
- //First byte should be version
- assertEquals(77, ois.readByte());
-
- //Second byte should be ACTIVATE
- assertEquals(JMSWireFormat.ACTIVATE, ois.readByte());
-
- //Next int should be objectId
- assertEquals(objectId, ois.readInt());
-
- //Next long should be methodHash
- assertEquals(methodHash, ois.readLong());
- //Now eos
- try
- {
- ois.readByte();
- fail("End of stream expected");
- }
- catch (EOFException e)
+ public void testCallback() throws Exception
{
- //Ok
- }
-
- bis.reset();
- ois = new JBossObjectInputStream(bis);
-
- InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getParameter();
-
- assertEquals(77, mm.getVersion());
-
- MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-
- assertEquals(methodHash, mi2.getMethodHash());
-
- assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
- }
+ int consumerID = 12345678;
- public void testDeactivate() throws Exception
- {
- long methodHash = 62365354;
+ JBossMessage m1 = new JBossMessage(123);
+ JBossMessage m2 = new JBossMessage(456);
+ JBossMessage m3 = new JBossMessage(789);
- int objectId = 54321;
+ List msgs = new ArrayList();
- MethodInvocation mi = new MethodInvocation(null, methodHash, deactivateMethod, deactivateMethod, null);
+ MessageProxy del1 = JBossMessage.createThinDelegate(m1, 7);
+ MessageProxy del2 = JBossMessage.createThinDelegate(m2, 8);
+ MessageProxy del3 = JBossMessage.createThinDelegate(m3, 9);
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+ MessageTest.configureMessage(m1);
+ MessageTest.configureMessage(m2);
+ MessageTest.configureMessage(m3);
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+ msgs.add(del1);
+ msgs.add(del2);
+ msgs.add(del3);
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+ ClientDelivery dr = new ClientDelivery(msgs, consumerID);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
- wf.write(ir, oos);
-
- oos.flush();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
-
- //Check the bytes
-
- //First byte should be version
- assertEquals(77, ois.readByte());
-
- //Second byte should be ACTIVATE
- assertEquals(JMSWireFormat.DEACTIVATE, ois.readByte());
-
- //Next int should be objectId
- assertEquals(objectId, ois.readInt());
-
- //Next long should be methodHash
- assertEquals(methodHash, ois.readLong());
-
- //Now eos
- try
- {
- ois.readByte();
- fail("End of stream expected");
- }
- catch (EOFException e)
- {
- //Ok
- }
-
- bis.reset();
-
- ois = new JBossObjectInputStream(bis);
-
- InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getParameter();
-
- assertEquals(77, mm.getVersion());
-
- MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-
- assertEquals(methodHash, mi2.getMethodHash());
-
- assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
- }
-
- public void testAcknowledge() throws Exception
- {
- long methodHash = 62365354;
-
- int objectId = 54321;
-
- MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeMethod, acknowledgeMethod, null);
-
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
-
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, dr);
InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
-
wf.write(ir, oos);
oos.flush();
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
//Check the bytes
//First byte should be version
assertEquals(77, ois.readByte());
- //First byte should be ACKNOWLEDGE
- assertEquals(JMSWireFormat.ACKNOWLEDGE, ois.readByte());
-
- //Next int should be objectId
- assertEquals(objectId, ois.readInt());
-
- //Next long should be methodHash
- assertEquals(methodHash, ois.readLong());
-
- //Now eos
- try
- {
- ois.readByte();
- fail("End of stream expected");
- }
- catch (EOFException e)
- {
- //Ok
- }
-
- bis.reset();
-
- ois = new JBossObjectInputStream(bis);
-
- InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getParameter();
-
- assertEquals(77, mm.getVersion());
-
- MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-
- assertEquals(methodHash, mi2.getMethodHash());
-
- assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
- }
-
- public void testCallback() throws Exception
- {
- int consumerID = 12345678;
-
- JBossMessage m = new JBossMessage(123);
-
- MessageProxy del = JBossMessage.createThinDelegate(m, 7);
+ //Second byte should be CALLBACK
+ assertEquals(JMSWireFormat.CALLBACK, ois.readByte());
- MessageTest.configureMessage(m);
+ //Next int should be consumer id
+ assertEquals(12345678, ois.readInt());
- DeliveryRunnable dr = new DeliveryRunnable(del, consumerID, null, false);
+ //Next int should be number of messages
+ assertEquals(3, ois.readInt());
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, dr);
+ //Next byte should be type
+ assertEquals(JBossMessage.TYPE, ois.readByte());
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+ //Next int should be delivery count
+ assertEquals(7, ois.readInt());
- wf.write(ir, oos);
+ //And now the message itself
+ JBossMessage r1 = new JBossMessage();
- oos.flush();
+ r1.readExternal(ois);
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ //Next byte should be type
+ assertEquals(JBossMessage.TYPE, ois.readByte());
- //Check the bytes
+ //Next int should be delivery count
+ assertEquals(8, ois.readInt());
- //First byte should be version
- assertEquals(77, ois.readByte());
+ //And now the message itself
+ JBossMessage r2 = new JBossMessage();
- //Second byte should be CALLBACK
- assertEquals(JMSWireFormat.CALLBACK, ois.readByte());
+ r2.readExternal(ois);
- //Next int should be consumer id
- assertEquals(12345678, ois.readInt());
//Next byte should be type
assertEquals(JBossMessage.TYPE, ois.readByte());
//Next int should be delivery count
- assertEquals(7, ois.readInt());
+ assertEquals(9, ois.readInt());
//And now the message itself
- JBossMessage m2 = new JBossMessage();
+ JBossMessage r3 = new JBossMessage();
- m2.readExternal(ois);
+ r3.readExternal(ois);
- MessageTest.ensureEquivalent(m, m2);
+ MessageTest.ensureEquivalent(m1, r1);
+ MessageTest.ensureEquivalent(m2, r2);
+ MessageTest.ensureEquivalent(m3, r3);
//eos
try
@@ -1268,7 +1195,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
@@ -1276,35 +1203,41 @@
assertEquals(77, mm.getVersion());
- DeliveryRunnable dr2 = (DeliveryRunnable)mm.getLoad();
-
- MessageProxy del2 = dr2.getMessageProxy();
+ ClientDelivery dr2 = (ClientDelivery)mm.getLoad();
- JBossMessage m3 = del2.getMessage();
+ List msgs2 = dr2.getMessages();
assertEquals(consumerID, dr2.getConsumerID());
- assertEquals(7, del2.getDeliveryCount());
-
- MessageTest.ensureEquivalent(m, m3);
-
+ MessageProxy p1 = (MessageProxy)msgs2.get(0);
+ MessageProxy p2 = (MessageProxy)msgs2.get(1);
+ MessageProxy p3 = (MessageProxy)msgs2.get(2);
+
+ assertEquals(del1.getDeliveryCount(), p1.getDeliveryCount());
+ assertEquals(del2.getDeliveryCount(), p2.getDeliveryCount());
+ assertEquals(del3.getDeliveryCount(), p3.getDeliveryCount());
+
+ JBossMessage q1 = p1.getMessage();
+ JBossMessage q2 = p1.getMessage();
+ JBossMessage q3 = p1.getMessage();
+
+ MessageTest.ensureEquivalent(m1, q1);
+ MessageTest.ensureEquivalent(m2, q2);
+ MessageTest.ensureEquivalent(m3, q3);
}
- public void testMessageResponse() throws Exception
- {
- JBossMessage m = new JBossMessage(123);
-
- MessageTest.configureMessage(m);
- MessageProxy del = JBossMessage.createThinDelegate(m, 4);
+ public void testGetIdBlockResponse() throws Exception
+ {
+ IdBlock block = new IdBlock(132, 465);
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, del);
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, block);
InvocationResponse ir = new InvocationResponse(null, mm, false, null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -1312,31 +1245,21 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
- //First byte should be version
+ // First byte should be version
assertEquals(77, ois.readByte());
int b = ois.readByte();
- assertEquals(JMSWireFormat.MESSAGE_RESPONSE, b);
-
- // Next byte is type
- byte type = ois.readByte();
-
- assertEquals(JBossMessage.TYPE, type);
-
- //Next is delivery count
- int deliveryCount = ois.readInt();
-
- assertEquals(4, deliveryCount);
+ assertEquals(JMSWireFormat.ID_BLOCK_RESPONSE, b);
- //And now the message itself
- JBossMessage m2 = new JBossMessage();
+ IdBlock block2 = new IdBlock();
- m2.readExternal(ois);
+ block2.readExternal(ois);
- MessageTest.ensureEquivalent(m, m2);
+ assertEquals(block.getLow(), block2.getLow());
+ assertEquals(block.getHigh(), block2.getHigh());
//eos
try
@@ -1351,7 +1274,7 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
@@ -1359,26 +1282,23 @@
assertEquals(77, mm.getVersion());
- MessageProxy del2 = (MessageProxy)mm.getLoad();
-
- JBossMessage m3 = del2.getMessage();
-
- MessageTest.ensureEquivalent(m, m3);
+ IdBlock block3 = (IdBlock)mm.getLoad();
- assertEquals(4, del2.getDeliveryCount());
+ assertEquals(block.getLow(), block3.getLow());
+ assertEquals(block.getHigh(), block3.getHigh());
}
- public void testGetIdBlockResponse() throws Exception
+ public void testHandleMessageResponse() throws Exception
{
- IdBlock block = new IdBlock(132, 465);
+ HandleMessageResponse h = new HandleMessageResponse(true, 76876);
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, block);
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, h);
InvocationResponse ir = new InvocationResponse(null, mm, false, null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- JBossObjectOutputStream oos = new JBossObjectOutputStream(bos);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
wf.write(ir, oos);
@@ -1386,21 +1306,21 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- JBossObjectInputStream ois = new JBossObjectInputStream(bis);
+ ObjectInputStream ois = new ObjectInputStream(bis);
// First byte should be version
- assertEquals(CURRENT_VERSION, ois.readByte());
+ assertEquals(77, ois.readByte());
int b = ois.readByte();
- assertEquals(JMSWireFormat.ID_BLOCK_RESPONSE, b);
+ assertEquals(JMSWireFormat.HANDLE_MESSAGE_RESPONSE, b);
- IdBlock block2 = new IdBlock();
+ HandleMessageResponse h2 = new HandleMessageResponse();
- block2.readExternal(ois);
+ h2.readExternal(ois);
- assertEquals(block.getLow(), block2.getLow());
- assertEquals(block.getHigh(), block2.getHigh());
+ assertEquals(h.clientIsFull(), h2.clientIsFull());
+ assertEquals(h.getNumberAccepted(), h2.getNumberAccepted());
//eos
try
@@ -1415,18 +1335,18 @@
bis.reset();
- ois = new JBossObjectInputStream(bis);
+ ois = new ObjectInputStream(bis);
InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
mm = (MessagingMarshallable)ir2.getResult();
- assertEquals(2, mm.getVersion());
+ assertEquals(77, mm.getVersion());
- IdBlock block3 = (IdBlock)mm.getLoad();
+ HandleMessageResponse h3 = (HandleMessageResponse)mm.getLoad();
- assertEquals(block.getLow(), block3.getLow());
- assertEquals(block.getHigh(), block3.getHigh());
+ assertEquals(h.clientIsFull(), h3.clientIsFull());
+ assertEquals(h.getNumberAccepted(), h3.getNumberAccepted());
}
}
}
\ No newline at end of file
1.17 +28 -18 jboss-jms/tests/src/org/jboss/test/messaging/jms/XATest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: XATest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/XATest.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -b -r1.16 -r1.17
--- XATest.java 19 Jun 2006 15:16:08 -0000 1.16
+++ XATest.java 17 Jul 2006 17:14:52 -0000 1.17
@@ -96,6 +96,15 @@
{
ServerManagement.undeployQueue("Queue");
+ if (!ServerManagement.isRemote())
+ {
+ if (tm.getTransaction() != null)
+ {
+ //roll it back
+ tm.rollback();
+ }
+ }
+
if (suspendedTx != null)
{
tm.resume(suspendedTx);
@@ -1193,6 +1202,8 @@
assertNotNull(r2);
assertEquals("jellyfish4", r2.getText());
+ cons2.close();
+
//rollback
tx.rollback();
@@ -1208,12 +1219,10 @@
r3 = (TextMessage)cons.receive(1000);
assertNotNull(r3);
assertEquals("jellyfish2", r3.getText());
- //log.info(r3.getText());
TextMessage r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
assertEquals("jellyfish3", r4.getText());
- //log.info(r4.getText());
r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
@@ -1302,6 +1311,8 @@
//rollback
+ cons2.close();
+
tx.rollback();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1321,12 +1332,10 @@
r3 = (TextMessage)cons.receive(1000);
assertNotNull(r3);
assertEquals("jellyfish4", r3.getText());
- //log.info(r3.getText());
TextMessage r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
assertEquals("jellyfish1", r4.getText());
- //log.info(r4.getText());
r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
@@ -1357,7 +1366,7 @@
try
{
- //First send 2 messages
+ //First send 4 messages
conn2 = cf.createConnection();
Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessProducer.createProducer(queue);
@@ -1385,7 +1394,6 @@
tx.enlistResource(res1);
tx.enlistResource(res2);
- //Receive the messages, two on each consumer
MessageConsumer cons1 = sess1.createConsumer(queue);
TextMessage r1 = (TextMessage)cons1.receive(1000);
@@ -1433,12 +1441,10 @@
r3 = (TextMessage)cons.receive(1000);
assertNotNull(r3);
assertEquals("jellyfish2", r3.getText());
- //log.info(r3.getText());
TextMessage r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
assertEquals("jellyfish3", r4.getText());
- //log.info(r4.getText());
r4 = (TextMessage)cons.receive(1000);
assertNotNull(r4);
@@ -1873,6 +1879,8 @@
assertNotNull(r2);
assertEquals("jellyfish2", r2.getText());
+ cons1.close();
+
//rollback this transaction
tx2.rollback();
@@ -1881,7 +1889,9 @@
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2.start();
MessageConsumer cons = sess.createConsumer(queue);
+
TextMessage r3 = (TextMessage)cons.receive(1000);
+
assertNotNull(r3);
assertEquals("jellyfish2", r3.getText());
r3 = (TextMessage)cons.receive(1000);
1.4 +2 -2 jboss-jms/tests/src/org/jboss/test/messaging/jms/XATransactionTest.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: XATransactionTest.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/XATransactionTest.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- XATransactionTest.java 21 Apr 2006 07:33:12 -0000 1.3
+++ XATransactionTest.java 17 Jul 2006 17:14:52 -0000 1.4
@@ -39,7 +39,7 @@
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * $Id: XATransactionTest.java,v 1.3 2006/04/21 07:33:12 timfox Exp $
+ * $Id: XATransactionTest.java,v 1.4 2006/07/17 17:14:52 timfox Exp $
*/
public class XATransactionTest extends MessagingTestCase
{
@@ -91,7 +91,7 @@
s = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
conn.start();
- TextMessage rm = (TextMessage)s.createConsumer(queue).receiveNoWait();
+ TextMessage rm = (TextMessage)s.createConsumer(queue).receive(500);
assertEquals("one", rm.getText());
}
More information about the jboss-cvs-commits
mailing list