[jboss-cvs] JBoss Messaging SVN: r5295 - in trunk/src/main/org/jboss/messaging/core: management/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 6 13:41:09 EST 2008


Author: timfox
Date: 2008-11-06 13:41:08 -0500 (Thu, 06 Nov 2008)
New Revision: 5295

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
Log:
Fixed expiry


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -167,8 +167,8 @@
 
                if (expired)
                {
-                  session.acknowledge(id, m.getMessageID());
-
+                  session.expire(id, m.getMessageID());
+                                   
                   if (toWait > 0)
                   {
                      continue;
@@ -302,7 +302,7 @@
             }
             else
             {
-               session.acknowledge(id, message.getMessageID());
+               session.expire(id, message.getMessageID());
             }
          }
          else
@@ -451,7 +451,7 @@
             }
             else
             {
-               session.acknowledge(id, message.getMessageID());
+               session.expire(id, message.getMessageID());
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -59,6 +59,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -85,7 +86,6 @@
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
-import org.jboss.messaging.util.TypedProperties;
 
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -602,7 +602,16 @@
          channel.send(message);
       }
    }
+   
+   public void expire(final long consumerID, final long messageID) throws MessagingException
+   {
+      checkClosed();
 
+      SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+
+      channel.send(message);      
+   }
+
    public void addConsumer(final ClientConsumerInternal consumer)
    {
       consumers.put(consumer.getID(), consumer);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -31,6 +31,8 @@
    String getName();
 
    void acknowledge(long consumerID, long messageID) throws MessagingException;
+   
+   void expire(long consumerID, long messageID) throws MessagingException;
 
    void addConsumer(ClientConsumerInternal consumer);
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -46,7 +46,6 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
@@ -60,8 +59,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class MessagingServerControl extends StandardMBean implements
-      MessagingServerControlMBean, NotificationEmitter
+public class MessagingServerControl extends StandardMBean implements MessagingServerControlMBean, NotificationEmitter
 {
 
    // Constants -----------------------------------------------------
@@ -69,11 +67,17 @@
    // Attributes ----------------------------------------------------
 
    private final PostOffice postOffice;
+
    private final StorageManager storageManager;
+
    private final Configuration configuration;
+
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
    private final MessagingServer server;
+
    private final MessageCounterManager messageCounterManager;
+
    private final NotificationBroadcasterSupport broadcaster;
 
    private boolean messageCounterEnabled;
@@ -82,21 +86,23 @@
 
    // Constructors --------------------------------------------------
 
-   public MessagingServerControl(PostOffice postOffice,
-         StorageManager storageManager, Configuration configuration,
-         HierarchicalRepository<QueueSettings> queueSettingsRepository,
-         MessagingServer messagingServer, MessageCounterManager messageCounterManager,
-         NotificationBroadcasterSupport broadcaster) throws Exception
+   public MessagingServerControl(final PostOffice postOffice,
+                                 final StorageManager storageManager,
+                                 final Configuration configuration,
+                                 final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                                 final MessagingServer messagingServer,
+                                 final MessageCounterManager messageCounterManager,
+                                 final NotificationBroadcasterSupport broadcaster) throws Exception
    {
       super(MessagingServerControlMBean.class);
       this.postOffice = postOffice;
       this.storageManager = storageManager;
       this.configuration = configuration;
       this.queueSettingsRepository = queueSettingsRepository;
-      this.server = messagingServer;
+      server = messagingServer;
       this.messageCounterManager = messageCounterManager;
       this.broadcaster = broadcaster;
-      
+
       messageCounterEnabled = configuration.isMessageCounterEnabled();
       if (messageCounterEnabled)
       {
@@ -106,17 +112,17 @@
 
    // Public --------------------------------------------------------
 
-   public void addDestination(SimpleString simpleAddress) throws Exception
+   public void addDestination(final SimpleString simpleAddress) throws Exception
    {
       postOffice.addDestination(simpleAddress, false);
    }
 
-   public void removeDestination(SimpleString simpleAddress) throws Exception
+   public void removeDestination(final SimpleString simpleAddress) throws Exception
    {
       postOffice.removeDestination(simpleAddress, false);
    }
 
-   public Queue getQueue(String address) throws Exception
+   public Queue getQueue(final String address) throws Exception
    {
       SimpleString sAddress = new SimpleString(address);
       Binding binding = postOffice.getBinding(sAddress);
@@ -133,26 +139,26 @@
       return configuration;
    }
 
-   public int expireMessages(Filter filter, SimpleString simpleAddress)
-         throws Exception
+   public int expireMessages(final Filter filter, final SimpleString simpleAddress) throws Exception
    {
       Binding binding = postOffice.getBinding(simpleAddress);
       if (binding != null)
       {
          Queue queue = binding.getQueue();
          List<MessageReference> refs = queue.list(filter);
+         
+         //FIXME - what if the refs have been consumed between listing them and expiring them?
+         
          for (MessageReference ref : refs)
          {
-            queue.expireMessage(ref.getMessage().getMessageID(),
-                  storageManager, postOffice, queueSettingsRepository);
+            queue.expireMessage(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
          }
          return refs.size();
       }
       return 0;
    }
 
-   public int sendMessagesToDLQ(Filter filter, SimpleString simpleAddress)
-         throws Exception
+   public int sendMessagesToDLQ(final Filter filter, final SimpleString simpleAddress) throws Exception
    {
       Binding binding = postOffice.getBinding(simpleAddress);
       if (binding != null)
@@ -161,16 +167,14 @@
          List<MessageReference> refs = queue.list(filter);
          for (MessageReference ref : refs)
          {
-            queue.sendMessageToDLQ(ref.getMessage().getMessageID(),
-                  storageManager, postOffice, queueSettingsRepository);
+            queue.sendMessageToDLQ(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
          }
          return refs.size();
       }
       return 0;
    }
 
-   public int changeMessagesPriority(Filter filter, byte newPriority,
-         SimpleString simpleAddress) throws Exception
+   public int changeMessagesPriority(final Filter filter, final byte newPriority, final SimpleString simpleAddress) throws Exception
    {
       Binding binding = postOffice.getBinding(simpleAddress);
       if (binding != null)
@@ -180,8 +184,10 @@
          for (MessageReference ref : refs)
          {
             queue.changeMessagePriority(ref.getMessage().getMessageID(),
-                  newPriority, storageManager, postOffice,
-                  queueSettingsRepository);
+                                        newPriority,
+                                        storageManager,
+                                        postOffice,
+                                        queueSettingsRepository);
          }
          return refs.size();
       }
@@ -194,10 +200,12 @@
    public MBeanInfo getMBeanInfo()
    {
       MBeanInfo info = super.getMBeanInfo();
-      return new MBeanInfo(info.getClassName(), info.getDescription(), info
-            .getAttributes(), info.getConstructors(), MBeanInfoHelper
-            .getMBeanOperationsInfo(MessagingServerControlMBean.class),
-            getNotificationInfo());
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(MessagingServerControlMBean.class),
+                           getNotificationInfo());
    }
 
    // MessagingServerControlMBean implementation --------------------
@@ -208,17 +216,18 @@
       if (backupConf != null)
       {
          return backupConf.getParams();
-      } else
+      }
+      else
       {
          return Collections.emptyMap();
       }
    }
-   
+
    public Map<String, Map<String, Object>> getAcceptorConfigurations()
    {
-      Map<String, Map<String, Object>> result = new HashMap<String, Map<String,Object>>();
+      Map<String, Map<String, Object>> result = new HashMap<String, Map<String, Object>>();
       Set<TransportConfiguration> acceptorConfs = configuration.getAcceptorConfigurations();
-      
+
       for (TransportConfiguration acceptorConf : acceptorConfs)
       {
          result.put(acceptorConf.getFactoryClassName(), acceptorConf.getParams());
@@ -226,7 +235,6 @@
       return result;
    }
 
-   
    public boolean isStarted()
    {
       return server.isStarted();
@@ -241,7 +249,7 @@
    {
       return configuration.isBackup();
    }
-      
+
    public String getBindingsDirectory()
    {
       return configuration.getBindingsDirectory();
@@ -249,7 +257,7 @@
 
    public long getConnectionScanPeriod()
    {
-      return configuration.getConnectionScanPeriod();      
+      return configuration.getConnectionScanPeriod();
    }
 
    public List<String> getInterceptorClassNames()
@@ -261,7 +269,7 @@
    {
       return configuration.getJournalBufferReuseSize();
    }
-   
+
    public String getJournalDirectory()
    {
       return configuration.getJournalDirectory();
@@ -288,15 +296,15 @@
    }
 
    public long getPagingMaxGlobalSizeBytes()
-   {   
+   {
       return configuration.getPagingMaxGlobalSizeBytes();
    }
-   
+
    public String getPagingDirectory()
-   {      
+   {
       return configuration.getPagingDirectory();
    }
-   
+
    public int getScheduledThreadPoolMaxSize()
    {
       return configuration.getScheduledThreadPoolMaxSize();
@@ -306,7 +314,7 @@
    {
       return configuration.getSecurityInvalidationInterval();
    }
-  
+
    public boolean isClustered()
    {
       return configuration.isClustered();
@@ -347,8 +355,7 @@
       return postOffice.addDestination(new SimpleString(address), false);
    }
 
-   public void createQueue(final String address, final String name)
-         throws Exception
+   public void createQueue(final String address, final String name) throws Exception
    {
       SimpleString sAddress = new SimpleString(address);
       SimpleString sName = new SimpleString(name);
@@ -358,14 +365,11 @@
       }
    }
 
-   public void createQueue(final String address, final String name,
-         final String filterStr, final boolean durable)
-         throws Exception
+   public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
    {
       SimpleString sAddress = new SimpleString(address);
       SimpleString sName = new SimpleString(name);
-      SimpleString sFilter = (filterStr == null || filterStr.length() == 0) ? null
-            : new SimpleString(filterStr);
+      SimpleString sFilter = filterStr == null || filterStr.length() == 0 ? null : new SimpleString(filterStr);
       Filter filter = null;
       if (sFilter != null)
       {
@@ -411,7 +415,7 @@
    {
       setMessageCounterEnabled(false);
    }
-      
+
    public void resetAllMessageCounters()
    {
       messageCounterManager.resetAllCounters();
@@ -425,14 +429,14 @@
    public boolean isMessageCounterEnabled()
    {
       return messageCounterEnabled;
-   } 
-   
+   }
+
    public synchronized long getMessageCounterSamplePeriod()
    {
       return messageCounterManager.getSamplePeriod();
    }
 
-   public synchronized void setMessageCounterSamplePeriod(long newPeriod)
+   public synchronized void setMessageCounterSamplePeriod(final long newPeriod)
    {
       if (newPeriod < 1000)
       {
@@ -444,13 +448,13 @@
          messageCounterManager.reschedule(newPeriod);
       }
    }
-   
+
    public int getMessageCounterMaxDayCount()
    {
       return messageCounterManager.getMaxDayCount();
    }
-   
-   public void setMessageCounterMaxDayCount(int count)
+
+   public void setMessageCounterMaxDayCount(final int count)
    {
       if (count <= 0)
       {
@@ -458,25 +462,24 @@
       }
       messageCounterManager.setMaxDayCount(count);
    }
-   
+
    // NotificationEmitter implementation ----------------------------
 
    public void removeNotificationListener(final NotificationListener listener,
-         final NotificationFilter filter, final Object handback)
-         throws ListenerNotFoundException
+                                          final NotificationFilter filter,
+                                          final Object handback) throws ListenerNotFoundException
    {
       broadcaster.removeNotificationListener(listener, filter, handback);
    }
 
-   public void removeNotificationListener(final NotificationListener listener)
-         throws ListenerNotFoundException
+   public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
    {
       broadcaster.removeNotificationListener(listener);
    }
 
    public void addNotificationListener(final NotificationListener listener,
-         final NotificationFilter filter, final Object handback)
-         throws IllegalArgumentException
+                                       final NotificationFilter filter,
+                                       final Object handback) throws IllegalArgumentException
    {
       broadcaster.addNotificationListener(listener, filter, handback);
    }
@@ -490,7 +493,8 @@
          names[i] = values[i].toString();
       }
       return new MBeanNotificationInfo[] { new MBeanNotificationInfo(names,
-            this.getClass().getName(), "Notifications emitted by a Core Server") };
+                                                                     this.getClass().getName(),
+                                                                     "Notifications emitted by a Core Server") };
    }
 
    // Package protected ---------------------------------------------
@@ -499,7 +503,7 @@
 
    // Private -------------------------------------------------------
 
-   private synchronized void setMessageCounterEnabled(boolean enable) 
+   private synchronized void setMessageCounterEnabled(boolean enable)
    {
       if (isStarted())
       {
@@ -510,7 +514,7 @@
          else if (!messageCounterEnabled && enable)
          {
             startMessageCounters();
-         }        
+         }
       }
       messageCounterEnabled = enable;
    }
@@ -519,11 +523,11 @@
    {
       messageCounterManager.start();
    }
-   
+
    private void stopMessageCounters()
    {
       messageCounterManager.stop();
-      
+
       messageCounterManager.resetAllCounters();
 
       messageCounterManager.resetAllCounterHistories();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -37,6 +37,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -117,6 +118,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -613,6 +615,11 @@
             packet = new SessionAcknowledgeMessage();
             break;
          }
+         case SESS_EXPIRED:
+         {
+            packet = new SessionExpiredMessage();
+            break;
+         }
          case SESS_COMMIT:
          {
             packet = new PacketImpl(PacketImpl.SESS_COMMIT);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -75,82 +75,84 @@
    public static final byte SESS_CREATEPRODUCER_RESP = 43;
 
    public static final byte SESS_ACKNOWLEDGE = 44;
+   
+   public static final byte SESS_EXPIRED = 45;
 
-   public static final byte SESS_COMMIT = 45;
+   public static final byte SESS_COMMIT = 46;
 
-   public static final byte SESS_ROLLBACK = 46;
+   public static final byte SESS_ROLLBACK = 47;
 
-   public static final byte SESS_QUEUEQUERY = 47;
+   public static final byte SESS_QUEUEQUERY = 48;
 
-   public static final byte SESS_QUEUEQUERY_RESP = 48;
+   public static final byte SESS_QUEUEQUERY_RESP = 49;
 
-   public static final byte SESS_CREATEQUEUE = 49;
+   public static final byte SESS_CREATEQUEUE = 50;
 
-   public static final byte SESS_DELETE_QUEUE = 50;
+   public static final byte SESS_DELETE_QUEUE = 51;
 
-   public static final byte SESS_ADD_DESTINATION = 51;
+   public static final byte SESS_ADD_DESTINATION = 52;
 
-   public static final byte SESS_REMOVE_DESTINATION = 52;
+   public static final byte SESS_REMOVE_DESTINATION = 53;
 
-   public static final byte SESS_BINDINGQUERY = 53;
+   public static final byte SESS_BINDINGQUERY = 54;
 
-   public static final byte SESS_BINDINGQUERY_RESP = 54;
+   public static final byte SESS_BINDINGQUERY_RESP = 55;
 
-   public static final byte SESS_XA_START = 55;
+   public static final byte SESS_XA_START = 56;
 
-   public static final byte SESS_XA_END = 56;
+   public static final byte SESS_XA_END = 57;
 
-   public static final byte SESS_XA_COMMIT = 57;
+   public static final byte SESS_XA_COMMIT = 58;
 
-   public static final byte SESS_XA_PREPARE = 58;
+   public static final byte SESS_XA_PREPARE = 59;
 
-   public static final byte SESS_XA_RESP = 59;
+   public static final byte SESS_XA_RESP = 60;
 
-   public static final byte SESS_XA_ROLLBACK = 60;
+   public static final byte SESS_XA_ROLLBACK = 61;
 
-   public static final byte SESS_XA_JOIN = 61;
+   public static final byte SESS_XA_JOIN = 62;
 
-   public static final byte SESS_XA_SUSPEND = 62;
+   public static final byte SESS_XA_SUSPEND = 63;
 
-   public static final byte SESS_XA_RESUME = 63;
+   public static final byte SESS_XA_RESUME = 64;
 
-   public static final byte SESS_XA_FORGET = 64;
+   public static final byte SESS_XA_FORGET = 65;
 
-   public static final byte SESS_XA_INDOUBT_XIDS = 65;
+   public static final byte SESS_XA_INDOUBT_XIDS = 66;
 
-   public static final byte SESS_XA_INDOUBT_XIDS_RESP = 66;
+   public static final byte SESS_XA_INDOUBT_XIDS_RESP = 67;
 
-   public static final byte SESS_XA_SET_TIMEOUT = 67;
+   public static final byte SESS_XA_SET_TIMEOUT = 68;
 
-   public static final byte SESS_XA_SET_TIMEOUT_RESP = 68;
+   public static final byte SESS_XA_SET_TIMEOUT_RESP = 69;
 
-   public static final byte SESS_XA_GET_TIMEOUT = 69;
+   public static final byte SESS_XA_GET_TIMEOUT = 70;
 
-   public static final byte SESS_XA_GET_TIMEOUT_RESP = 70;
+   public static final byte SESS_XA_GET_TIMEOUT_RESP = 71;
 
-   public static final byte SESS_START = 71;
+   public static final byte SESS_START = 72;
 
-   public static final byte SESS_STOP = 72;
+   public static final byte SESS_STOP = 73;
 
-   public static final byte SESS_CLOSE = 73;
+   public static final byte SESS_CLOSE = 74;
 
-   public static final byte SESS_FLOWTOKEN = 74;
+   public static final byte SESS_FLOWTOKEN = 75;
 
-   public static final byte SESS_SEND = 75;
+   public static final byte SESS_SEND = 76;
 
-   public static final byte SESS_CONSUMER_CLOSE = 76;
+   public static final byte SESS_CONSUMER_CLOSE = 77;
 
-   public static final byte SESS_PRODUCER_CLOSE = 77;
+   public static final byte SESS_PRODUCER_CLOSE = 78;
 
-   public static final byte SESS_RECEIVE_MSG = 78;
+   public static final byte SESS_RECEIVE_MSG = 79;
 
-   public static final byte SESS_MANAGEMENT_SEND = 79;
+   public static final byte SESS_MANAGEMENT_SEND = 80;
 
-   public static final byte SESS_SCHEDULED_SEND = 80;
+   public static final byte SESS_SCHEDULED_SEND = 81;
 
-   public static final byte SESS_FAILOVER_COMPLETE = 81;
+   public static final byte SESS_FAILOVER_COMPLETE = 82;
 
-   public static final byte SESS_REPLICATE_DELIVERY = 82;
+   public static final byte SESS_REPLICATE_DELIVERY = 83;
 
    // Static --------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAcknowledgeMessage.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -29,7 +29,7 @@
    private long messageID;
 
    private boolean requiresResponse;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
@@ -80,6 +81,8 @@
    void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
 
    void handleAcknowledge(final SessionAcknowledgeMessage packet);
+   
+   void handleExpired(final SessionExpiredMessage packet);
 
    void handleRollback(Packet packet);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -187,6 +187,8 @@
       {
          Binding expiryBinding = postOffice.getBinding(expiryQueue);
 
+         //FIXME - this is not threadsafe - what if two refs get expired for same queue at same time
+         //might try and create the binding twice?
          if (expiryBinding == null)
          {
             expiryBinding = postOffice.addBinding(expiryQueue, expiryQueue, null, true, false);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -60,6 +60,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -979,6 +980,47 @@
       }
    }
    
+   public void handleExpired(final SessionExpiredMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+      
+      if (result == null)
+      {
+         doHandleExpired(packet);
+      }
+      else
+      {
+         //Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleExpired(packet);
+            }
+         });
+      }
+   }
+   
+   public void doHandleExpired(final SessionExpiredMessage packet)
+   {
+      try
+      {
+         MessageReference ref = consumers.get(packet.getConsumerID()).getReference(packet.getMessageID());
+
+         // Null implies a browser
+         if (ref != null)
+         {
+            ref.expire(storageManager, postOffice, queueSettingsRepository);
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to acknowledge", e);
+      }
+
+      channel.confirm(packet);
+   }
+   
    public void handleCommit(final Packet packet)
    {
       DelayedResult result = channel.replicatePacket(packet);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -23,6 +23,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -52,6 +53,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -61,6 +63,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
@@ -158,6 +161,12 @@
                session.handleAcknowledge(message);               
                break;
             }
+            case SESS_EXPIRED:
+            {
+               SessionExpiredMessage message = (SessionExpiredMessage)packet;
+               session.handleExpired(message);               
+               break;
+            }
             case SESS_COMMIT:
             {
                session.handleCommit(packet);               

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-11-06 17:56:25 UTC (rev 5294)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-11-06 18:41:08 UTC (rev 5295)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.transaction;
 
 import java.util.List;
-import java.util.LinkedList;
 
 import javax.transaction.xa.Xid;
 




More information about the jboss-cvs-commits mailing list