[jboss-cvs] JBoss Messaging SVN: r5898 - in trunk: src/main/org/jboss/messaging/core/client/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 18 23:08:33 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-02-18 23:08:32 -0500 (Wed, 18 Feb 2009)
New Revision: 5898
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
JBMESSAGING-1339 and JBMESSAGING-1294 - Fix on Delivery Counter + implement rollback(lastMessageAsDelivered)
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -161,6 +161,12 @@
void rollback() throws MessagingException;
+ /**
+ * @param isLastMessageAsDelived the first message on deliveringMessage Buffer is considered as delivered
+ * @throws MessagingException
+ */
+ void rollback(boolean isLastMessageAsDelived) throws MessagingException;
+
void close() throws MessagingException;
boolean isClosed();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -51,6 +51,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -534,6 +535,11 @@
public void rollback() throws MessagingException
{
+ rollback(false);
+ }
+
+ public void rollback(final boolean isLastMessageAsDelived) throws MessagingException
+ {
checkClosed();
flushAcks();
@@ -556,8 +562,8 @@
consumer.clear();
}
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK));
-
+ channel.sendBlocking(new RollbackMessage(isLastMessageAsDelived));
+
if (wasStarted)
{
start();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -100,6 +100,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -701,7 +702,7 @@
}
case SESS_ROLLBACK:
{
- packet = new PacketImpl(PacketImpl.SESS_ROLLBACK);
+ packet = new RollbackMessage();
break;
}
case SESS_QUEUEQUERY:
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A RollbackMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Feb 18, 2009 2:11:17 PM
+ *
+ *
+ */
+public class RollbackMessage extends PacketImpl
+{
+
+ /**
+ * @param type
+ */
+ public RollbackMessage()
+ {
+ super(SESS_ROLLBACK);
+ }
+
+ public RollbackMessage(final boolean isLastMessageAsDelived)
+ {
+ super(SESS_ROLLBACK);
+ this.isLastMessageAsDelived = isLastMessageAsDelived;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean isLastMessageAsDelived;
+
+ /**
+ * @return the isLastMessageAsDelived
+ */
+ public boolean isLastMessageAsDelived()
+ {
+ return isLastMessageAsDelived;
+ }
+
+ /**
+ * @param isLastMessageAsDelived the isLastMessageAsDelived to set
+ */
+ public void setLastMessageAsDelived(final boolean isLastMessageAsDelived)
+ {
+ this.isLastMessageAsDelived = isLastMessageAsDelived;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl#getRequiredBufferSize()
+ */
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BOOLEAN;
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putBoolean(isLastMessageAsDelived);
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ this.isLastMessageAsDelived = buffer.getBoolean();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -56,6 +56,8 @@
void setDeliveryCount(int deliveryCount);
void incrementDeliveryCount();
+
+ void decrementDeliveryCount();
Queue getQueue();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -42,7 +42,7 @@
void close() throws Exception;
- List<MessageReference> cancelRefs() throws Exception;
+ List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
void setStarted(boolean started);
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -81,7 +82,7 @@
void handleExpired(final SessionExpiredMessage packet);
- void handleRollback(Packet packet);
+ void handleRollback(RollbackMessage packet);
void handleCommit(Packet packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -103,6 +103,11 @@
{
deliveryCount++;
}
+
+ public void decrementDeliveryCount()
+ {
+ deliveryCount--;
+ }
public long getScheduledDeliveryTime()
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -120,7 +120,7 @@
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
private final boolean browseOnly;
-
+
private final boolean updateDeliveries;
private final StorageManager storageManager;
@@ -184,7 +184,7 @@
binding.getQueue().addConsumer(this);
minLargeMessageSize = session.getMinLargeMessageSize();
-
+
this.updateDeliveries = updateDeliveries;
}
@@ -276,7 +276,7 @@
session.removeConsumer(this);
- LinkedList<MessageReference> refs = cancelRefs();
+ LinkedList<MessageReference> refs = cancelRefs(false, null);
Iterator<MessageReference> iter = refs.iterator();
@@ -300,11 +300,10 @@
props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
props.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-
+
props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putStringProperty(ManagementHelper.HDR_FILTERSTRING,
- filter == null ? null : filter.getFilterString());
+ props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -314,15 +313,27 @@
}
}
- public LinkedList<MessageReference> cancelRefs() throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
+
+ boolean performACK = lastConsumedAsDelivered;
+
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
{
for (MessageReference ref : deliveringRefs)
{
- refs.add(ref);
+ if (performACK)
+ {
+ acknowledge(false, tx, ref.getMessage().getMessageID());
+ performACK = false;
+ }
+ else
+ {
+ ref.decrementDeliveryCount();
+ refs.add(ref);
+ }
}
deliveringRefs.clear();
@@ -586,7 +597,7 @@
}
lock.lock();
-
+
try
{
@@ -629,18 +640,32 @@
}
ref.getQueue().referenceHandled();
- }
- if (preAcknowledge)
- {
- if (message.isLargeMessage())
+
+ ref.incrementDeliveryCount();
+
+ // If updateDeliveries = false (set by strict-update),
+ // the updateDeliveryCount would still be updated after cancel
+ if (updateDeliveries)
{
- // we must hold one reference, or the file will be deleted before it could be delivered
- message.incrementRefCount();
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ storageManager.updateDeliveryCount(ref);
+ }
}
- // With pre-ack, we ack *before* sending to the client
- ref.getQueue().acknowledge(ref);
+ if (preAcknowledge)
+ {
+ if (message.isLargeMessage())
+ {
+ // we must hold one reference, or the file will be deleted before it could be delivered
+ message.incrementRefCount();
+ }
+
+ // With pre-ack, we ack *before* sending to the client
+ ref.getQueue().acknowledge(ref);
+ }
+
}
if (message.isLargeMessage())
@@ -652,18 +677,6 @@
deliverStandardMessage(ref, message);
}
- ref.incrementDeliveryCount();
-
- // If updateDeliveries = false (set by strict-update),
- // the updateDeliveryCount would still be updated after cancel
- if (updateDeliveries)
- {
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
- {
- storageManager.updateDeliveryCount(ref);
- }
- }
-
return HandleStatus.HANDLED;
}
finally
@@ -724,7 +737,7 @@
availableCredits.addAndGet(-message.getEncodeSize());
}
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
@@ -824,7 +837,7 @@
pendingLargeMessage.encodeProperties(headerBuffer);
- initialMessage = new SessionReceiveMessage(id, headerBuffer.array(), ref.getDeliveryCount() + 1);
+ initialMessage = new SessionReceiveMessage(id, headerBuffer.array(), ref.getDeliveryCount());
}
int precalculateAvailableCredits;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -50,6 +50,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -305,7 +306,7 @@
{
// We only rollback local txs on close, not XA tx branches
- rollback();
+ rollback(false);
}
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -556,7 +557,7 @@
}
}
- public void handleRollback(final Packet packet)
+ public void handleRollback(final RollbackMessage packet)
{
DelayedResult result = channel.replicatePacket(packet);
@@ -1806,13 +1807,13 @@
channel.send(response);
}
- private void doHandleRollback(final Packet packet)
+ private void doHandleRollback(final RollbackMessage packet)
{
Packet response = null;
try
{
- rollback();
+ rollback(packet.isLastMessageAsDelived());
response = new NullResponseMessage();
}
@@ -2134,7 +2135,7 @@
}
else
{
- doRollback(theTx);
+ doRollback(false, theTx);
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
@@ -2702,7 +2703,7 @@
return largeMessage;
}
- private void doRollback(final Transaction theTx) throws Exception
+ private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
{
boolean wasStarted = started;
@@ -2715,7 +2716,7 @@
consumer.setStarted(false);
}
- toCancel.addAll(consumer.cancelRefs());
+ toCancel.addAll(consumer.cancelRefs(lastMessageAsDelived, theTx));
}
for (MessageReference ref : toCancel)
@@ -2734,7 +2735,7 @@
}
}
- private void rollback() throws Exception
+ private void rollback(boolean lastMessageAsDelived) throws Exception
{
if (tx == null)
{
@@ -2743,7 +2744,7 @@
tx = new TransactionImpl(storageManager);
}
- doRollback(tx);
+ doRollback(lastMessageAsDelived, tx);
tx = new TransactionImpl(storageManager);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -49,6 +49,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -159,7 +160,7 @@
}
case SESS_ROLLBACK:
{
- session.handleRollback(packet);
+ session.handleRollback((RollbackMessage)packet);
break;
}
case SESS_XA_COMMIT:
Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -106,7 +106,7 @@
{
try
{
- session.getCoreSession().rollback();
+ session.getCoreSession().rollback(true);
session.setRecoverCalled(true);
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -30,8 +30,6 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> <p/> $Id: AcknowledgementTest.java 3173 2007-10-05 12:48:16Z
* timfox $
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -518,14 +518,11 @@
assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
log.info("rolling back");
- conn.stop();
sess.rollback();
log.info("closing");
sess.close();
log.info("Closed");
-
- conn.start();
Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -164,26 +164,26 @@
tm = (TextMessage)c.receive(1000);
assertEquals("message2", tm.getText());
- assertTrue(tm.getJMSRedelivered());
- assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+ assertFalse(tm.getJMSRedelivered());
+ assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
tm = (TextMessage)c.receive(1000);
assertEquals("message3", tm.getText());
- assertTrue(tm.getJMSRedelivered());
- assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+ assertFalse(tm.getJMSRedelivered());
+ assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
tm = (TextMessage)c.receive(1000);
assertEquals("message4", tm.getText());
- assertTrue(tm.getJMSRedelivered());
- assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+ assertFalse(tm.getJMSRedelivered());
+ assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
tm = (TextMessage)c.receive(1000);
assertEquals("message5", tm.getText());
- assertTrue(tm.getJMSRedelivered());
- assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+ assertFalse(tm.getJMSRedelivered());
+ assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
tm.acknowledge();
}
@@ -633,7 +633,7 @@
assertEquals(tm.getText(), rm.getText());
- assertEquals(5, rm.getIntProperty("JMSXDeliveryCount"));
+ assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
assertTrue(rm.getJMSRedelivered());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -78,7 +78,7 @@
public void testSimpleConsumerBrowser() throws Exception
{
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setBlockOnNonPersistentSend(true);
ClientSession session = sf.createSession(false, true, true);
@@ -284,12 +284,22 @@
}
- public void testConsumerBrowserMessagesArentAcked() throws Exception
+ public void testConsumerBrowserMessages() throws Exception
{
+ testConsumerBrowserMessagesArentAcked(false);
+ }
+
+ public void testConsumerBrowserMessagesPreACK() throws Exception
+ {
+ testConsumerBrowserMessagesArentAcked(false);
+ }
+
+ private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
+ {
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
- ClientSession session = sf.createSession(false, true, true);
+ ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
session.createQueue(QUEUE, QUEUE, null, false, false);
@@ -312,8 +322,10 @@
assertEquals("m" + i, message2.getBody().getString());
}
// assert that all the messages are there and none have been acked
- assertEquals(((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount(), 0);
- assertEquals(((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount(), 100);
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -347,8 +359,10 @@
assertEquals("m" + i, message2.getBody().getString());
}
// assert that all the messages are there and none have been acked
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(100, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -441,8 +455,10 @@
assertEquals("m" + i, message2.getBody().getString());
}
// assert that all the messages are there and none have been acked
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -475,8 +491,10 @@
assertEquals("m" + i, message2.getBody().getString());
}
// assert that all the messages are there and none have been acked
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -513,8 +531,10 @@
}
}
// assert that all the messages are there and none have been acked
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -551,13 +571,17 @@
}
}
// assert that all the messages are there and none have been acked
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
}
private ClientMessage createMessage(final ClientSession session, final String msg)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -29,7 +29,6 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.SimpleString;
@@ -72,45 +71,153 @@
testDedeliveryMessageOnPersistent(false);
}
- protected void testDedeliveryMessageOnPersistent(boolean strictUpdate) throws Exception
+ public void testDeliveryNonPersistent() throws Exception
{
- setUp(strictUpdate);
- ClientSession session = factory.createSession(false, true, false);
+ testDelivery(false);
+ }
+
+ public void testDeliveryPersistent() throws Exception
+ {
+ testDelivery(true);
+ }
+
+ public void testDelivery(final boolean persistent) throws Exception
+ {
+ setUp(true);
+ ClientSession session = factory.createSession(false, false, false);
ClientProducer prod = session.createProducer(ADDRESS);
- prod.send(createTextMessage(session, "Hello"));
+
+ for (int i = 0; i < 10; i++)
+ {
+ prod.send(createTextMessage(session, Integer.toString(i), persistent));
+ }
+
session.commit();
session.close();
+
- messagingService.stop();
- messagingService.start();
+ session = factory.createSession(null, null, false, true, true, true, 0);
- session = factory.createSession(false, true, false);
session.start();
+ for (int loopAck = 0; loopAck < 5; loopAck++)
+ {
+ ClientConsumer browser = session.createConsumer(ADDRESS, null, true);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = browser.receive(1000);
+ assertNotNull("element i=" + i + " loopAck = " + loopAck + " was expected", msg);
+ msg.acknowledge();
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+
+ // We don't change the deliveryCounter on Browser, so this should be always 0
+ assertEquals(0, msg.getDeliveryCount());
+ }
+
+ session.commit();
+ browser.close();
+ }
+
+ session.close();
+
+
+
+ session = factory.createSession(false, false, false);
+ session.start();
+
ClientConsumer consumer = session.createConsumer(ADDRESS);
-
+
+ for (int loopAck = 0; loopAck < 5; loopAck++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+
+ // No ACK done, so deliveryCount should be always = 1
+ assertEquals(1, msg.getDeliveryCount());
+ }
+ session.rollback();
+ }
+
+ if (persistent)
+ {
+ session.close();
+ messagingService.stop();
+ messagingService.start();
+ session = factory.createSession(false, false, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ }
+
+ for (int loopAck = 1; loopAck <= 5; loopAck++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+ assertEquals(loopAck, msg.getDeliveryCount());
+ }
+ if (loopAck < 5)
+ {
+ if (persistent)
+ {
+ session.close();
+ messagingService.stop();
+ messagingService.start();
+ session = factory.createSession(false, false, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ }
+
+ session.close();
+ }
+
+ protected void testDedeliveryMessageOnPersistent(final boolean strictUpdate) throws Exception
+ {
+ setUp(strictUpdate);
+ ClientSession session = factory.createSession(false, false, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
ClientMessage msg = consumer.receive(1000);
assertEquals(1, msg.getDeliveryCount());
- assertNotNull(msg);
session.stop();
-
- // if strictUpdate == true, this will simulating a crash, where the server is stopped without closing/rolling back the session
+
+ // if strictUpdate == true, this will simulate a crash, where the server is stopped without closing/rolling back
+ // the session
if (!strictUpdate)
{
// If non Strict, at least rollback/cancel should still update the delivery-counts
- session.rollback();
+ session.rollback(true);
session.close();
}
-
+
messagingService.stop();
-
+
messagingService.start();
-
+
session = factory.createSession(false, true, false);
session.start();
consumer = session.createConsumer(ADDRESS);
msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals(2, msg.getDeliveryCount());
+ session.close();
}
// Package protected ---------------------------------------------
@@ -128,7 +235,7 @@
* @throws Exception
* @throws MessagingException
*/
- private void setUp(boolean strictUpdateDelivery) throws Exception, MessagingException
+ private void setUp(final boolean strictUpdateDelivery) throws Exception, MessagingException
{
Configuration config = createFileConfig();
config.setJournalFileSize(10 * 1024);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -73,7 +73,7 @@
final SimpleString queue2 = new SimpleString("queue2");
- MessagingService messagingService = Messaging.newMessagingService(conf);
+ MessagingService messagingService = createService(true, conf);
messagingService.start();
@@ -139,7 +139,7 @@
consumer2 = session2.createConsumer(queue2);
- m1 = consumer1.receive(1000);
+ m1 = consumer1.receive(100);
assertNull(m1);
@@ -171,11 +171,11 @@
consumer2 = session2.createConsumer(queue2);
- m1 = consumer1.receive(1000);
+ m1 = consumer1.receive(100);
assertNull(m1);
- m2 = consumer2.receive(1000);
+ m2 = consumer2.receive(100);
assertNull(m2);
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-02-19 04:08:32 UTC (rev 5898)
@@ -244,6 +244,12 @@
{
return createTextMessage(session, s, true);
}
+
+ public String getTextMessage(ClientMessage m)
+ {
+ m.getBody().rewind();
+ return m.getBody().getString();
+ }
protected ClientMessage createTextMessage(final ClientSession session, final String s, final boolean durable)
{
More information about the jboss-cvs-commits
mailing list