[jboss-cvs] JBoss Messaging SVN: r5130 - in trunk: src/main/org/jboss/messaging/core/client and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 17 08:22:46 EDT 2008
Author: timfox
Date: 2008-10-17 08:22:46 -0400 (Fri, 17 Oct 2008)
New Revision: 5130
Modified:
trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
trunk/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
trunk/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
trunk/examples/messaging/src/org/jboss/messaging/example/WildCardClient.java
trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessageConsumer.java
trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageConsumerTest.java
Log:
More tweaks
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -95,7 +95,7 @@
}
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException e)
{
@@ -170,7 +170,7 @@
do
{
m = clientConsumer.receive(5000);
- m.processed();
+ m.acknowledge();
}
while (m != null);
clientSession.commit();
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/SSLClient.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/SSLClient.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -59,7 +59,7 @@
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
clientSession.start();
ClientMessage msg = clientConsumer.receive(5000);
- msg.processed();
+ msg.acknowledge();
System.out.println("msg.getPayload() = " + msg.getBody().getString());
}
catch(Exception e)
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -66,7 +66,7 @@
clientSession.start();
ClientMessage msg = clientConsumer.receive(7000);
log.info("message received at " + df.format(Calendar.getInstance().getTime()));
- msg.processed();
+ msg.acknowledge();
}
catch(Exception e)
{
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -58,7 +58,7 @@
clientSession.start();
ClientMessage msg = clientConsumer.receive(5000);
System.out.println("msg.getPayload() = " + msg.getBody().getString());
- msg.processed();
+ msg.acknowledge();
}
catch(Exception e)
{
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -71,7 +71,7 @@
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
clientSession.start();
ClientMessage msg = clientConsumer.receive(5000);
- msg.processed();
+ msg.acknowledge();
System.out.println("msg.getPayload() = " + msg.getBody().getString());
}
catch (Exception e)
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/WildCardClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/WildCardClient.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/WildCardClient.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -66,10 +66,10 @@
log.info("message sent to " + queue2);
clientSession.start();
ClientMessage msg = clientConsumer.receive(5000);
- msg.processed();
+ msg.acknowledge();
log.info("message received: " + msg.getBody().getString());
ClientMessage msg2 = clientConsumer.receive(5000);
- msg2.processed();
+ msg2.acknowledge();
log.info("message received: " + msg2.getBody().getString());
}
catch(Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -43,5 +43,5 @@
void onReceipt(ClientSessionInternal session, long consumerID);
- void processed() throws MessagingException;
+ void acknowledge() throws MessagingException;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -50,10 +50,6 @@
{
super();
- this.session = session;
-
- this.consumerID = consumerID;
-
this.deliveryCount = deliveryCount;
}
@@ -98,7 +94,7 @@
return this.deliveryCount;
}
- public void processed() throws MessagingException
+ public void acknowledge() throws MessagingException
{
if (session != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -806,6 +806,7 @@
if (response.isError())
{
+ log.info(response.getMessage() + ":" + response.getResponseCode());
throw new XAException(response.getResponseCode());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -63,13 +63,13 @@
Queue getQueue();
- boolean cancel(StorageManager persistenceManager, PostOffice postOffice,
+ boolean cancel(StorageManager storageManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
- void sendToDLQ(StorageManager persistenceManager, PostOffice postOffice,
+ void sendToDLQ(StorageManager storageManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
- void expire(StorageManager persistenceManager, PostOffice postOffice,
+ void expire(StorageManager storageManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
void move(Binding otherBinding, StorageManager persistenceManager, PostOffice postOffice) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -128,6 +128,8 @@
{
persistenceManager.updateDeliveryCount(this);
}
+
+ log.info("cancelling ref " + this);
QueueSettings queueSettings = queueSettingsRepository.getMatch(queue.getName().toString());
int maxDeliveries = queueSettings.getMaxDeliveryAttempts();
@@ -142,6 +144,8 @@
else
{
long redeliveryDelay = queueSettings.getRedeliveryDelay();
+
+ log.info("redelivery delay " + redeliveryDelay);
if (redeliveryDelay > 0)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.server.impl;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -216,6 +217,18 @@
session.removeConsumer(this);
LinkedList<MessageReference> refs = cancelRefs();
+
+ Iterator<MessageReference> iter = refs.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+
+ if (!ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ iter.remove();
+ }
+ }
if (!refs.isEmpty())
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -392,6 +392,8 @@
private void doRollback(final Transaction theTx) throws Exception
{
+ log.info("rolling back");
+
boolean wasStarted = started;
List<MessageReference> toCancel = new ArrayList<MessageReference>();
@@ -425,18 +427,21 @@
for (MessageReference ref : rolledBack)
{
- Queue queue = ref.getQueue();
-
- LinkedList<MessageReference> list = queueMap.get(queue);
-
- if (list == null)
+ if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
{
- list = new LinkedList<MessageReference>();
-
- queueMap.put(queue, list);
+ Queue queue = ref.getQueue();
+
+ LinkedList<MessageReference> list = queueMap.get(queue);
+
+ if (list == null)
+ {
+ list = new LinkedList<MessageReference>();
+
+ queueMap.put(queue, list);
+ }
+
+ list.add(ref);
}
-
- list.add(ref);
}
for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
@@ -585,7 +590,7 @@
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
- Transaction theTx = resourceManager.removeTransaction(xid);
+ Transaction theTx = resourceManager.getTransaction(xid);
if (theTx == null)
{
@@ -596,10 +601,6 @@
if (theTx.getState() == Transaction.State.SUSPENDED)
{
- // Put it back
-
- resourceManager.putTransaction(xid, tx);
-
return new SessionXAResponseMessage(true,
XAException.XAER_PROTO,
"Cannot prepare transaction, it is suspended " + xid);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -354,20 +354,20 @@
for (MessageReference ref : acknowledgements)
{
- Queue queue = ref.getQueue();
+// Queue queue = ref.getQueue();
+//
+// ServerMessage message = ref.getMessage();
- ServerMessage message = ref.getMessage();
-
// Putting back the size on pagingManager, and reverting the counters
- if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
- {
- pagingManager.addSize(message);
- }
+
+ //FIXME - why????
+ //Surely paging happens before routing, so cancellation shouldn't effect anything......
+// if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
+// {
+// pagingManager.addSize(message);
+// }
- if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
- {
- toCancel.add(ref);
- }
+ toCancel.add(ref);
}
clear();
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessageConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageConsumer.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessageConsumer.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -214,7 +214,7 @@
if (message != null)
{
- message.processed();
+ message.acknowledge();
jbm = JBossMessage.createMessage(message, session.getCoreSession());
Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -79,7 +79,7 @@
{
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException e)
{
@@ -119,7 +119,7 @@
//We don't want to call this if the connection/session was closed from inside onMessage
if (!session.getCoreSession().isClosed() && !this.transactedOrClientAck)
{
- message.processed();
+ message.acknowledge();
}
}
catch (MessagingException e)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -243,7 +243,7 @@
messagesReceived++;
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException e)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -105,7 +105,7 @@
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
}
session.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -272,7 +272,7 @@
{
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException me)
{
@@ -390,7 +390,7 @@
{
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException me)
{
@@ -523,7 +523,7 @@
{
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException me)
{
@@ -686,7 +686,7 @@
{
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException me)
{
@@ -830,7 +830,7 @@
assertNotNull(msg);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -911,7 +911,7 @@
assertNotNull(msg);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1005,7 +1005,7 @@
assertNotNull(msg);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1022,7 +1022,7 @@
assertNotNull(msg);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1123,7 +1123,7 @@
{
ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1138,7 +1138,7 @@
{
ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1194,7 +1194,7 @@
assertNotNull(message2);
- message2.processed();
+ message2.acknowledge();
sess.close();
@@ -1230,7 +1230,7 @@
assertNotNull(message2);
- message2.processed();
+ message2.acknowledge();
sess.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -308,7 +308,7 @@
try
{
- message.processed();
+ message.acknowledge();
}
catch (MessagingException me)
{
@@ -840,7 +840,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -933,7 +933,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1039,7 +1039,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1065,7 +1065,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1179,7 +1179,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1208,7 +1208,7 @@
assertEquals(i, msg.getProperty(new SimpleString("count")));
- msg.processed();
+ msg.acknowledge();
}
}
@@ -1274,7 +1274,7 @@
assertNotNull(message2);
- message2.processed();
+ message2.acknowledge();
sess.close();
@@ -1310,7 +1310,7 @@
assertNotNull(message2);
- message2.processed();
+ message2.acknowledge();
sess.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -125,7 +125,7 @@
assertEquals("aardvarks", message2.getBody().getString());
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
//ClientMessage message3 = consumer.receive(250);
@@ -179,7 +179,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
session.close();
@@ -198,7 +198,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
ClientMessage message3 = consumer.receive(250);
@@ -252,7 +252,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
session.close();
@@ -274,7 +274,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
ClientMessage message3 = consumer.receive(250);
@@ -328,7 +328,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
session.close();
@@ -391,7 +391,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
@@ -472,7 +472,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
session.close();
@@ -560,7 +560,7 @@
- message2.processed();
+ message2.acknowledge();
}
ClientMessage message3 = cons.receive(250);
@@ -696,7 +696,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
RemotingConnection conn2 = ((ClientSessionImpl)session).getConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -115,7 +115,7 @@
assertEquals(i, message2.getProperty(new SimpleString("count")));
- message2.processed();
+ message2.acknowledge();
}
ClientMessage message3 = consumer.receive(250);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -21,6 +21,12 @@
*/
package org.jboss.messaging.tests.integration.scheduling;
+import java.io.File;
+import java.util.Calendar;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -29,6 +35,7 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -38,16 +45,14 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.util.id.GUID;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.util.Calendar;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public class ScheduledMessageTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(ScheduledMessageTest.class);
+
+
private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
@@ -181,7 +186,7 @@
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m1", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
// Make sure no more messages
consumer.close();
@@ -193,7 +198,6 @@
public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception
{
-
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
configuration.setPagingMaxGlobalSizeBytes(0);
@@ -232,10 +236,11 @@
message2 = consumer2.receive(5250);
time += 5000;
assertTrue(System.currentTimeMillis() >= time);
+ log.info(message3.getBody().getString());
assertEquals("m1", message3.getBody().getString());
assertEquals("m1", message2.getBody().getString());
- message2.processed();
- message3.processed();
+ message2.acknowledge();
+ message3.acknowledge();
// Make sure no more messages
consumer.close();
@@ -298,8 +303,8 @@
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m1", message3.getBody().getString());
assertEquals("m1", message2.getBody().getString());
- message2.processed();
- message3.processed();
+ message2.acknowledge();
+ message3.acknowledge();
// Make sure no more messages
consumer.close();
@@ -354,7 +359,7 @@
assertTrue(System.currentTimeMillis() >= time);
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
// Make sure no more messages
consumer.close();
@@ -415,27 +420,27 @@
ClientMessage message = consumer.receive(11000);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m1", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m2", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m3", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m4", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m5", message.getBody().getString());
- message.processed();
+ message.acknowledge();
// Make sure no more messages
consumer.close();
@@ -497,27 +502,27 @@
ClientMessage message = consumer.receive(10250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m1", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m3", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m5", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m2", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m4", message.getBody().getString());
- message.processed();
+ message.acknowledge();
// Make sure no more messages
consumer.close();
@@ -575,24 +580,24 @@
ClientMessage message = consumer.receive(1000);
assertEquals("m2", message.getBody().getString());
- message.processed();
+ message.acknowledge();
message = consumer.receive(1000);
assertEquals("m4", message.getBody().getString());
- message.processed();
+ message.acknowledge();
message = consumer.receive(10250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m1", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m3", message.getBody().getString());
- message.processed();
+ message.acknowledge();
time += 1000;
message = consumer.receive(1250);
assertTrue(System.currentTimeMillis() >= time);
assertEquals("m5", message.getBody().getString());
- message.processed();
+ message.acknowledge();
// Make sure no more messages
consumer.close();
@@ -653,7 +658,7 @@
assertNotNull(message2);
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
session.commit(xid2, true);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -299,7 +299,7 @@
{
ClientMessage m = pageConsumer.receive(10000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
}
}
@@ -878,19 +878,19 @@
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m1");
m = clientConsumer.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m2");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m3");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
@@ -936,19 +936,19 @@
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m1");
m = clientConsumer.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m2");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m3");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
@@ -1018,19 +1018,19 @@
clientSession2.start(xid2, XAResource.TMNOFLAGS);
clientSession2.start();
ClientMessage m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m5");
m = clientConsumer2.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m6");
m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m7");
m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m8");
clientSession2.end(xid2, XAResource.TMSUCCESS);
@@ -1040,19 +1040,19 @@
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m1");
m = clientConsumer.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m2");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m3");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
@@ -1108,19 +1108,19 @@
clientSession2.start(xid2, XAResource.TMNOFLAGS);
clientSession2.start();
ClientMessage m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m5");
m = clientConsumer2.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m6");
m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m7");
m = clientConsumer2.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m8");
clientSession2.end(xid2, XAResource.TMSUCCESS);
@@ -1130,19 +1130,19 @@
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m1");
m = clientConsumer.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m2");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m3");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
@@ -1164,19 +1164,19 @@
clientSession.rollback(xid);
clientSession.start();
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m1");
m = clientConsumer.receive(1000);
assertNotNull(m);
- m.processed();
+ m.acknowledge();
assertEquals(m.getBody().getString(), "m2");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m3");
m = clientConsumer.receive(1000);
- m.processed();
+ m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
}
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -162,7 +162,7 @@
msg = consumer.receive(1000);
if (msg != null)
{
- msg.processed();
+ msg.acknowledge();
if (++msgs % 10000 == 0)
{
System.out.println("received " + msgs);
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -139,7 +139,7 @@
assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
session.close();
}
@@ -186,7 +186,7 @@
assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
session.close();
}
@@ -224,7 +224,7 @@
assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
assertEquals("testINVMCoreClient", message2.getBody().getString());
- message2.processed();
+ message2.acknowledge();
session.close();
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageConsumerTest.java 2008-10-17 11:44:20 UTC (rev 5129)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageConsumerTest.java 2008-10-17 12:22:46 UTC (rev 5130)
@@ -361,7 +361,7 @@
expect(session.getCoreSession()).andStubReturn(clientSession);
ClientConsumer clientConsumer = createStrictMock(ClientConsumer.class);
ClientMessage clientMessage = createStrictMock(ClientMessage.class);
- clientMessage.processed();
+ clientMessage.acknowledge();
expect(clientMessage.getType()).andReturn(JBossMessage.TYPE);
MessagingBuffer body = createStrictMock(MessagingBuffer.class);
expect(clientMessage.getBody()).andStubReturn(body );
More information about the jboss-cvs-commits
mailing list