[jboss-cvs] JBoss Messaging SVN: r3688 - in trunk: src/main/org/jboss/jms/server/endpoint and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 8 14:11:24 EST 2008
Author: timfox
Date: 2008-02-08 14:11:23 -0500 (Fri, 08 Feb 2008)
New Revision: 3688
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java
trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/MessageReference.java
trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
Log:
Fixed some stuff related to redelivery
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -39,7 +39,6 @@
*/
public class JBossConnectionMetaData implements ConnectionMetaData
{
-
// Constants -----------------------------------------------------
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -100,7 +100,7 @@
if (!transactedOrClientAck)
{
try
- {
+ {
session.getCoreSession().rollback();
session.setRecoverCalled(true);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -21,18 +21,15 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_HASNEXTMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidSelectorException;
-
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
@@ -40,11 +37,11 @@
import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageResponseMessage;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
@@ -69,10 +66,10 @@
// Attributes -----------------------------------------------------------------------------------
- private String id;
- private ServerSessionEndpoint session;
- private Queue destination;
- private Filter filter;
+ private final String id;
+ private final ServerSessionEndpoint session;
+ private final Queue destination;
+ private final Filter filter;
private Iterator iterator;
// Constructors ---------------------------------------------------------------------------------
@@ -86,15 +83,12 @@
if (messageFilter != null)
{
- try
- {
- filter = new FilterImpl(messageFilter);
- }
- catch (Exception e)
- {
- throw new InvalidSelectorException("Invalid selector " + messageFilter);
- }
+ filter = new FilterImpl(messageFilter);
}
+ else
+ {
+ filter = null;
+ }
}
// BrowserEndpoint implementation ---------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -73,31 +73,31 @@
// Attributes -----------------------------------------------------------------------------------
- private String id;
+ private final String id;
private volatile boolean started;
- private String username;
+ private final String username;
- private String password;
+ private final String password;
- private String remotingClientSessionID;
+ private final String remotingClientSessionID;
- private String jmsClientVMID;
+ private final String jmsClientVMID;
- private MessagingServer messagingServer;
+ private final MessagingServer messagingServer;
- private PostOffice postOffice;
+ private final PostOffice postOffice;
- private SecurityStore sm;
+ private final SecurityStore sm;
- private ConnectionManager cm;
+ private final ConnectionManager cm;
- private ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+ private final ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
- private Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
+ private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
- private int prefetchSize;
+ private final int prefetchSize;
// Constructors ---------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -139,35 +139,35 @@
// Attributes
// -----------------------------------------------------------------------------------
- private SecurityAspect security = new SecurityAspect();
+ private final SecurityAspect security = new SecurityAspect();
- private boolean trace = log.isTraceEnabled();
+ private final boolean trace = log.isTraceEnabled();
- private String id;
+ private final String id;
- private ServerConnectionEndpoint connectionEndpoint;
+ private final ServerConnectionEndpoint connectionEndpoint;
- private MessagingServer sp;
+ private final MessagingServer sp;
- private Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
+ private final Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
- private Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
+ private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
- private PostOffice postOffice;
+ private final PostOffice postOffice;
- private volatile LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
+ private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
private long deliveryIDSequence = 0;
- ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Transaction tx;
- private boolean autoCommitSends;
+ private final boolean autoCommitSends;
- private boolean autoCommitAcks;
+ private final boolean autoCommitAcks;
- private ResourceManager resourceManager;
+ private final ResourceManager resourceManager;
// Constructors
// ---------------------------------------------------------------------------------
@@ -378,6 +378,9 @@
else
{
tx.addAcknowledgement(ref);
+
+ //Del count is not actually updated in storage unless it's cancelled
+ ref.incrementDeliveryCount();
}
if (rec.getDeliveryID() == deliveryID)
@@ -413,6 +416,9 @@
else
{
tx.addAcknowledgement(ref);
+
+ //Del count is not actually updated in storage unless it's cancelled
+ ref.incrementDeliveryCount();
}
break;
@@ -800,17 +806,19 @@
private void addAddress(String address) throws Exception
{
- if (postOffice.containsAllowableAddress(address)) { throw new MessagingException(
- MessagingException.ADDRESS_EXISTS, "Address already exists: "
- + address); }
+ if (postOffice.containsAllowableAddress(address))
+ {
+ throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+ }
postOffice.addAllowableAddress(address);
}
private void removeAddress(String address) throws Exception
{
- if (!postOffice.removeAllowableAddress(address)) { throw new MessagingException(
- MessagingException.ADDRESS_DOES_NOT_EXIST,
- "Address does not exist: " + address); }
+ if (!postOffice.removeAllowableAddress(address))
+ {
+ throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
+ }
}
private void createQueue(String address, String queueName,
@@ -819,8 +827,10 @@
{
Binding binding = postOffice.getBinding(queueName);
- if (binding != null) { throw new MessagingException(
- MessagingException.QUEUE_EXISTS); }
+ if (binding != null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_EXISTS);
+ }
if (temporary)
{
@@ -849,7 +859,10 @@
{
Binding binding = postOffice.removeBinding(queueName);
- if (binding == null) { throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST); }
+ if (binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
Queue queue = binding.getQueue();
@@ -869,7 +882,10 @@
{
Binding binding = postOffice.getBinding(queueName);
- if (binding == null) { throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST); }
+ if (binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
int prefetchSize = connectionEndpoint.getPrefetchSize();
@@ -961,8 +977,10 @@
{
Binding binding = postOffice.getBinding(queueName);
- if (binding == null) { throw new MessagingException(
- MessagingException.QUEUE_DOES_NOT_EXIST); }
+ if (binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
String browserID = UUID.randomUUID().toString();
@@ -1170,8 +1188,7 @@
}
else
{
- throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
- "Unsupported packet " + type);
+ throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
}
// reply if necessary
Modified: trunk/src/main/org/jboss/messaging/core/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessageReference.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/MessageReference.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -56,11 +56,13 @@
void setDeliveryCount(int deliveryCount);
+ void incrementDeliveryCount();
+
Queue getQueue();
void acknowledge(PersistenceManager persistenceManager) throws Exception;
- void cancel(PersistenceManager persistenceManager) throws Exception;
+ boolean cancel(PersistenceManager persistenceManager) throws Exception;
void expire(PersistenceManager persistenceManager) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -53,12 +53,8 @@
// Constructors --------------------------------------------------
- /**
- * Required by externalization.
- */
public MessageReferenceImpl()
{
- if (trace) { log.trace("Creating using default constructor"); }
}
public MessageReferenceImpl(MessageReferenceImpl other, Queue queue)
@@ -96,6 +92,11 @@
this.deliveryCount = deliveryCount;
}
+ public void incrementDeliveryCount()
+ {
+ deliveryCount++;
+ }
+
public long getScheduledDeliveryTime()
{
return scheduledDeliveryTime;
@@ -126,10 +127,8 @@
queue.decrementDeliveringCount();
}
- public void cancel(PersistenceManager persistenceManager) throws Exception
+ public boolean cancel(PersistenceManager persistenceManager) throws Exception
{
- deliveryCount++;
-
if (message.isDurable() && queue.isDurable())
{
persistenceManager.updateDeliveryCount(queue, this);
@@ -154,8 +153,14 @@
log.warn("Message has reached maximum delivery attempts, no DLQ is configured so dropping it");
acknowledge(persistenceManager);
- }
+ }
+
+ return false;
}
+ else
+ {
+ return true;
+ }
}
public void expire(PersistenceManager persistenceManager) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -254,18 +254,16 @@
queueMap.put(queue, list);
}
- list.add(ref);
+ if (ref.cancel(persistenceManager))
+ {
+ list.add(ref);
+ }
}
for (Map.Entry<Queue, LinkedList<MessageReference>> entry: queueMap.entrySet())
{
LinkedList<MessageReference> refs = entry.getValue();
-
- for (MessageReference ref: refs)
- {
- ref.cancel(persistenceManager);
- }
-
+
entry.getKey().addListFirst(refs);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2008-02-08 19:11:23 UTC (rev 3688)
@@ -550,8 +550,7 @@
assertEquals(tm.getText(), rm.getText());
- //Delivery count is not hard and fast - is best effort
- assertEquals(5, rm.getIntProperty("JMSXDeliveryCount"));
+ assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
assertTrue(rm.getJMSRedelivered());
@@ -575,8 +574,7 @@
catch (Exception ignore)
{
}
- }
-
+ }
if (xaConn != null)
{
xaConn.close();
More information about the jboss-cvs-commits
mailing list