[jboss-cvs] JBoss Messaging SVN: r5295 - in trunk/src/main/org/jboss/messaging/core: management/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 6 13:41:09 EST 2008
Author: timfox
Date: 2008-11-06 13:41:08 -0500 (Thu, 06 Nov 2008)
New Revision: 5295
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
Log:
Fixed expiry
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -167,8 +167,8 @@
if (expired)
{
- session.acknowledge(id, m.getMessageID());
-
+ session.expire(id, m.getMessageID());
+
if (toWait > 0)
{
continue;
@@ -302,7 +302,7 @@
}
else
{
- session.acknowledge(id, message.getMessageID());
+ session.expire(id, message.getMessageID());
}
}
else
@@ -451,7 +451,7 @@
}
else
{
- session.acknowledge(id, message.getMessageID());
+ session.expire(id, message.getMessageID());
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -59,6 +59,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -85,7 +86,6 @@
import org.jboss.messaging.util.SimpleIDGenerator;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
-import org.jboss.messaging.util.TypedProperties;
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -602,7 +602,16 @@
channel.send(message);
}
}
+
+ public void expire(final long consumerID, final long messageID) throws MessagingException
+ {
+ checkClosed();
+ SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+
+ channel.send(message);
+ }
+
public void addConsumer(final ClientConsumerInternal consumer)
{
consumers.put(consumer.getID(), consumer);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -31,6 +31,8 @@
String getName();
void acknowledge(long consumerID, long messageID) throws MessagingException;
+
+ void expire(long consumerID, long messageID) throws MessagingException;
void addConsumer(ClientConsumerInternal consumer);
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -46,7 +46,6 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
@@ -60,8 +59,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class MessagingServerControl extends StandardMBean implements
- MessagingServerControlMBean, NotificationEmitter
+public class MessagingServerControl extends StandardMBean implements MessagingServerControlMBean, NotificationEmitter
{
// Constants -----------------------------------------------------
@@ -69,11 +67,17 @@
// Attributes ----------------------------------------------------
private final PostOffice postOffice;
+
private final StorageManager storageManager;
+
private final Configuration configuration;
+
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
private final MessagingServer server;
+
private final MessageCounterManager messageCounterManager;
+
private final NotificationBroadcasterSupport broadcaster;
private boolean messageCounterEnabled;
@@ -82,21 +86,23 @@
// Constructors --------------------------------------------------
- public MessagingServerControl(PostOffice postOffice,
- StorageManager storageManager, Configuration configuration,
- HierarchicalRepository<QueueSettings> queueSettingsRepository,
- MessagingServer messagingServer, MessageCounterManager messageCounterManager,
- NotificationBroadcasterSupport broadcaster) throws Exception
+ public MessagingServerControl(final PostOffice postOffice,
+ final StorageManager storageManager,
+ final Configuration configuration,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final MessagingServer messagingServer,
+ final MessageCounterManager messageCounterManager,
+ final NotificationBroadcasterSupport broadcaster) throws Exception
{
super(MessagingServerControlMBean.class);
this.postOffice = postOffice;
this.storageManager = storageManager;
this.configuration = configuration;
this.queueSettingsRepository = queueSettingsRepository;
- this.server = messagingServer;
+ server = messagingServer;
this.messageCounterManager = messageCounterManager;
this.broadcaster = broadcaster;
-
+
messageCounterEnabled = configuration.isMessageCounterEnabled();
if (messageCounterEnabled)
{
@@ -106,17 +112,17 @@
// Public --------------------------------------------------------
- public void addDestination(SimpleString simpleAddress) throws Exception
+ public void addDestination(final SimpleString simpleAddress) throws Exception
{
postOffice.addDestination(simpleAddress, false);
}
- public void removeDestination(SimpleString simpleAddress) throws Exception
+ public void removeDestination(final SimpleString simpleAddress) throws Exception
{
postOffice.removeDestination(simpleAddress, false);
}
- public Queue getQueue(String address) throws Exception
+ public Queue getQueue(final String address) throws Exception
{
SimpleString sAddress = new SimpleString(address);
Binding binding = postOffice.getBinding(sAddress);
@@ -133,26 +139,26 @@
return configuration;
}
- public int expireMessages(Filter filter, SimpleString simpleAddress)
- throws Exception
+ public int expireMessages(final Filter filter, final SimpleString simpleAddress) throws Exception
{
Binding binding = postOffice.getBinding(simpleAddress);
if (binding != null)
{
Queue queue = binding.getQueue();
List<MessageReference> refs = queue.list(filter);
+
+ //FIXME - what if the refs have been consumed between listing them and expiring them?
+
for (MessageReference ref : refs)
{
- queue.expireMessage(ref.getMessage().getMessageID(),
- storageManager, postOffice, queueSettingsRepository);
+ queue.expireMessage(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
}
return refs.size();
}
return 0;
}
- public int sendMessagesToDLQ(Filter filter, SimpleString simpleAddress)
- throws Exception
+ public int sendMessagesToDLQ(final Filter filter, final SimpleString simpleAddress) throws Exception
{
Binding binding = postOffice.getBinding(simpleAddress);
if (binding != null)
@@ -161,16 +167,14 @@
List<MessageReference> refs = queue.list(filter);
for (MessageReference ref : refs)
{
- queue.sendMessageToDLQ(ref.getMessage().getMessageID(),
- storageManager, postOffice, queueSettingsRepository);
+ queue.sendMessageToDLQ(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
}
return refs.size();
}
return 0;
}
- public int changeMessagesPriority(Filter filter, byte newPriority,
- SimpleString simpleAddress) throws Exception
+ public int changeMessagesPriority(final Filter filter, final byte newPriority, final SimpleString simpleAddress) throws Exception
{
Binding binding = postOffice.getBinding(simpleAddress);
if (binding != null)
@@ -180,8 +184,10 @@
for (MessageReference ref : refs)
{
queue.changeMessagePriority(ref.getMessage().getMessageID(),
- newPriority, storageManager, postOffice,
- queueSettingsRepository);
+ newPriority,
+ storageManager,
+ postOffice,
+ queueSettingsRepository);
}
return refs.size();
}
@@ -194,10 +200,12 @@
public MBeanInfo getMBeanInfo()
{
MBeanInfo info = super.getMBeanInfo();
- return new MBeanInfo(info.getClassName(), info.getDescription(), info
- .getAttributes(), info.getConstructors(), MBeanInfoHelper
- .getMBeanOperationsInfo(MessagingServerControlMBean.class),
- getNotificationInfo());
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ MBeanInfoHelper.getMBeanOperationsInfo(MessagingServerControlMBean.class),
+ getNotificationInfo());
}
// MessagingServerControlMBean implementation --------------------
@@ -208,17 +216,18 @@
if (backupConf != null)
{
return backupConf.getParams();
- } else
+ }
+ else
{
return Collections.emptyMap();
}
}
-
+
public Map<String, Map<String, Object>> getAcceptorConfigurations()
{
- Map<String, Map<String, Object>> result = new HashMap<String, Map<String,Object>>();
+ Map<String, Map<String, Object>> result = new HashMap<String, Map<String, Object>>();
Set<TransportConfiguration> acceptorConfs = configuration.getAcceptorConfigurations();
-
+
for (TransportConfiguration acceptorConf : acceptorConfs)
{
result.put(acceptorConf.getFactoryClassName(), acceptorConf.getParams());
@@ -226,7 +235,6 @@
return result;
}
-
public boolean isStarted()
{
return server.isStarted();
@@ -241,7 +249,7 @@
{
return configuration.isBackup();
}
-
+
public String getBindingsDirectory()
{
return configuration.getBindingsDirectory();
@@ -249,7 +257,7 @@
public long getConnectionScanPeriod()
{
- return configuration.getConnectionScanPeriod();
+ return configuration.getConnectionScanPeriod();
}
public List<String> getInterceptorClassNames()
@@ -261,7 +269,7 @@
{
return configuration.getJournalBufferReuseSize();
}
-
+
public String getJournalDirectory()
{
return configuration.getJournalDirectory();
@@ -288,15 +296,15 @@
}
public long getPagingMaxGlobalSizeBytes()
- {
+ {
return configuration.getPagingMaxGlobalSizeBytes();
}
-
+
public String getPagingDirectory()
- {
+ {
return configuration.getPagingDirectory();
}
-
+
public int getScheduledThreadPoolMaxSize()
{
return configuration.getScheduledThreadPoolMaxSize();
@@ -306,7 +314,7 @@
{
return configuration.getSecurityInvalidationInterval();
}
-
+
public boolean isClustered()
{
return configuration.isClustered();
@@ -347,8 +355,7 @@
return postOffice.addDestination(new SimpleString(address), false);
}
- public void createQueue(final String address, final String name)
- throws Exception
+ public void createQueue(final String address, final String name) throws Exception
{
SimpleString sAddress = new SimpleString(address);
SimpleString sName = new SimpleString(name);
@@ -358,14 +365,11 @@
}
}
- public void createQueue(final String address, final String name,
- final String filterStr, final boolean durable)
- throws Exception
+ public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
SimpleString sAddress = new SimpleString(address);
SimpleString sName = new SimpleString(name);
- SimpleString sFilter = (filterStr == null || filterStr.length() == 0) ? null
- : new SimpleString(filterStr);
+ SimpleString sFilter = filterStr == null || filterStr.length() == 0 ? null : new SimpleString(filterStr);
Filter filter = null;
if (sFilter != null)
{
@@ -411,7 +415,7 @@
{
setMessageCounterEnabled(false);
}
-
+
public void resetAllMessageCounters()
{
messageCounterManager.resetAllCounters();
@@ -425,14 +429,14 @@
public boolean isMessageCounterEnabled()
{
return messageCounterEnabled;
- }
-
+ }
+
public synchronized long getMessageCounterSamplePeriod()
{
return messageCounterManager.getSamplePeriod();
}
- public synchronized void setMessageCounterSamplePeriod(long newPeriod)
+ public synchronized void setMessageCounterSamplePeriod(final long newPeriod)
{
if (newPeriod < 1000)
{
@@ -444,13 +448,13 @@
messageCounterManager.reschedule(newPeriod);
}
}
-
+
public int getMessageCounterMaxDayCount()
{
return messageCounterManager.getMaxDayCount();
}
-
- public void setMessageCounterMaxDayCount(int count)
+
+ public void setMessageCounterMaxDayCount(final int count)
{
if (count <= 0)
{
@@ -458,25 +462,24 @@
}
messageCounterManager.setMaxDayCount(count);
}
-
+
// NotificationEmitter implementation ----------------------------
public void removeNotificationListener(final NotificationListener listener,
- final NotificationFilter filter, final Object handback)
- throws ListenerNotFoundException
+ final NotificationFilter filter,
+ final Object handback) throws ListenerNotFoundException
{
broadcaster.removeNotificationListener(listener, filter, handback);
}
- public void removeNotificationListener(final NotificationListener listener)
- throws ListenerNotFoundException
+ public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
{
broadcaster.removeNotificationListener(listener);
}
public void addNotificationListener(final NotificationListener listener,
- final NotificationFilter filter, final Object handback)
- throws IllegalArgumentException
+ final NotificationFilter filter,
+ final Object handback) throws IllegalArgumentException
{
broadcaster.addNotificationListener(listener, filter, handback);
}
@@ -490,7 +493,8 @@
names[i] = values[i].toString();
}
return new MBeanNotificationInfo[] { new MBeanNotificationInfo(names,
- this.getClass().getName(), "Notifications emitted by a Core Server") };
+ this.getClass().getName(),
+ "Notifications emitted by a Core Server") };
}
// Package protected ---------------------------------------------
@@ -499,7 +503,7 @@
// Private -------------------------------------------------------
- private synchronized void setMessageCounterEnabled(boolean enable)
+ private synchronized void setMessageCounterEnabled(boolean enable)
{
if (isStarted())
{
@@ -510,7 +514,7 @@
else if (!messageCounterEnabled && enable)
{
startMessageCounters();
- }
+ }
}
messageCounterEnabled = enable;
}
@@ -519,11 +523,11 @@
{
messageCounterManager.start();
}
-
+
private void stopMessageCounters()
{
messageCounterManager.stop();
-
+
messageCounterManager.resetAllCounters();
messageCounterManager.resetAllCounterHistories();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -37,6 +37,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -117,6 +118,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -613,6 +615,11 @@
packet = new SessionAcknowledgeMessage();
break;
}
+ case SESS_EXPIRED:
+ {
+ packet = new SessionExpiredMessage();
+ break;
+ }
case SESS_COMMIT:
{
packet = new PacketImpl(PacketImpl.SESS_COMMIT);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -75,82 +75,84 @@
public static final byte SESS_CREATEPRODUCER_RESP = 43;
public static final byte SESS_ACKNOWLEDGE = 44;
+
+ public static final byte SESS_EXPIRED = 45;
- public static final byte SESS_COMMIT = 45;
+ public static final byte SESS_COMMIT = 46;
- public static final byte SESS_ROLLBACK = 46;
+ public static final byte SESS_ROLLBACK = 47;
- public static final byte SESS_QUEUEQUERY = 47;
+ public static final byte SESS_QUEUEQUERY = 48;
- public static final byte SESS_QUEUEQUERY_RESP = 48;
+ public static final byte SESS_QUEUEQUERY_RESP = 49;
- public static final byte SESS_CREATEQUEUE = 49;
+ public static final byte SESS_CREATEQUEUE = 50;
- public static final byte SESS_DELETE_QUEUE = 50;
+ public static final byte SESS_DELETE_QUEUE = 51;
- public static final byte SESS_ADD_DESTINATION = 51;
+ public static final byte SESS_ADD_DESTINATION = 52;
- public static final byte SESS_REMOVE_DESTINATION = 52;
+ public static final byte SESS_REMOVE_DESTINATION = 53;
- public static final byte SESS_BINDINGQUERY = 53;
+ public static final byte SESS_BINDINGQUERY = 54;
- public static final byte SESS_BINDINGQUERY_RESP = 54;
+ public static final byte SESS_BINDINGQUERY_RESP = 55;
- public static final byte SESS_XA_START = 55;
+ public static final byte SESS_XA_START = 56;
- public static final byte SESS_XA_END = 56;
+ public static final byte SESS_XA_END = 57;
- public static final byte SESS_XA_COMMIT = 57;
+ public static final byte SESS_XA_COMMIT = 58;
- public static final byte SESS_XA_PREPARE = 58;
+ public static final byte SESS_XA_PREPARE = 59;
- public static final byte SESS_XA_RESP = 59;
+ public static final byte SESS_XA_RESP = 60;
- public static final byte SESS_XA_ROLLBACK = 60;
+ public static final byte SESS_XA_ROLLBACK = 61;
- public static final byte SESS_XA_JOIN = 61;
+ public static final byte SESS_XA_JOIN = 62;
- public static final byte SESS_XA_SUSPEND = 62;
+ public static final byte SESS_XA_SUSPEND = 63;
- public static final byte SESS_XA_RESUME = 63;
+ public static final byte SESS_XA_RESUME = 64;
- public static final byte SESS_XA_FORGET = 64;
+ public static final byte SESS_XA_FORGET = 65;
- public static final byte SESS_XA_INDOUBT_XIDS = 65;
+ public static final byte SESS_XA_INDOUBT_XIDS = 66;
- public static final byte SESS_XA_INDOUBT_XIDS_RESP = 66;
+ public static final byte SESS_XA_INDOUBT_XIDS_RESP = 67;
- public static final byte SESS_XA_SET_TIMEOUT = 67;
+ public static final byte SESS_XA_SET_TIMEOUT = 68;
- public static final byte SESS_XA_SET_TIMEOUT_RESP = 68;
+ public static final byte SESS_XA_SET_TIMEOUT_RESP = 69;
- public static final byte SESS_XA_GET_TIMEOUT = 69;
+ public static final byte SESS_XA_GET_TIMEOUT = 70;
- public static final byte SESS_XA_GET_TIMEOUT_RESP = 70;
+ public static final byte SESS_XA_GET_TIMEOUT_RESP = 71;
- public static final byte SESS_START = 71;
+ public static final byte SESS_START = 72;
- public static final byte SESS_STOP = 72;
+ public static final byte SESS_STOP = 73;
- public static final byte SESS_CLOSE = 73;
+ public static final byte SESS_CLOSE = 74;
- public static final byte SESS_FLOWTOKEN = 74;
+ public static final byte SESS_FLOWTOKEN = 75;
- public static final byte SESS_SEND = 75;
+ public static final byte SESS_SEND = 76;
- public static final byte SESS_CONSUMER_CLOSE = 76;
+ public static final byte SESS_CONSUMER_CLOSE = 77;
- public static final byte SESS_PRODUCER_CLOSE = 77;
+ public static final byte SESS_PRODUCER_CLOSE = 78;
- public static final byte SESS_RECEIVE_MSG = 78;
+ public static final byte SESS_RECEIVE_MSG = 79;
- public static final byte SESS_MANAGEMENT_SEND = 79;
+ public static final byte SESS_MANAGEMENT_SEND = 80;
- public static final byte SESS_SCHEDULED_SEND = 80;
+ public static final byte SESS_SCHEDULED_SEND = 81;
- public static final byte SESS_FAILOVER_COMPLETE = 81;
+ public static final byte SESS_FAILOVER_COMPLETE = 82;
- public static final byte SESS_REPLICATE_DELIVERY = 82;
+ public static final byte SESS_REPLICATE_DELIVERY = 83;
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -29,7 +29,7 @@
private long messageID;
private boolean requiresResponse;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
@@ -80,6 +81,8 @@
void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
void handleAcknowledge(final SessionAcknowledgeMessage packet);
+
+ void handleExpired(final SessionExpiredMessage packet);
void handleRollback(Packet packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -187,6 +187,8 @@
{
Binding expiryBinding = postOffice.getBinding(expiryQueue);
+ //FIXME - this is not threadsafe - what if two refs get expired for same queue at same time
+ //might try and create the binding twice?
if (expiryBinding == null)
{
expiryBinding = postOffice.addBinding(expiryQueue, expiryQueue, null, true, false);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -60,6 +60,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -979,6 +980,47 @@
}
}
+ public void handleExpired(final SessionExpiredMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleExpired(packet);
+ }
+ else
+ {
+ //Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleExpired(packet);
+ }
+ });
+ }
+ }
+
+ public void doHandleExpired(final SessionExpiredMessage packet)
+ {
+ try
+ {
+ MessageReference ref = consumers.get(packet.getConsumerID()).getReference(packet.getMessageID());
+
+ // Null implies a browser
+ if (ref != null)
+ {
+ ref.expire(storageManager, postOffice, queueSettingsRepository);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to acknowledge", e);
+ }
+
+ channel.confirm(packet);
+ }
+
public void handleCommit(final Packet packet)
{
DelayedResult result = channel.replicatePacket(packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -23,6 +23,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -52,6 +53,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -61,6 +63,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
@@ -158,6 +161,12 @@
session.handleAcknowledge(message);
break;
}
+ case SESS_EXPIRED:
+ {
+ SessionExpiredMessage message = (SessionExpiredMessage)packet;
+ session.handleExpired(message);
+ break;
+ }
case SESS_COMMIT:
{
session.handleCommit(packet);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-06 18:41:08 UTC (rev 5295)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.transaction;
import java.util.List;
-import java.util.LinkedList;
import javax.transaction.xa.Xid;
More information about the jboss-cvs-commits
mailing list