[hornetq-commits] JBoss hornetq SVN: r9066 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Apr 7 08:12:03 EDT 2010


Author: timfox
Date: 2010-04-07 08:12:01 -0400 (Wed, 07 Apr 2010)
New Revision: 9066

Added:
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
Removed:
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
Modified:
   trunk/docs/user-manual/en/client-reconnection.xml
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
   trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-275

Modified: trunk/docs/user-manual/en/client-reconnection.xml
===================================================================
--- trunk/docs/user-manual/en/client-reconnection.xml	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/docs/user-manual/en/client-reconnection.xml	2010-04-07 12:12:01 UTC (rev 9066)
@@ -50,11 +50,10 @@
             instance using the appropriate setter method.</para>
         <para>If you're using core you can set these values directly on the <literal
                 >ClientSessionFactory</literal> instance using the appropriate setter method.</para>
-        <para>The window is specified in bytes, and has a default value of <literal
-            >1MiB</literal>.</para>
+        <para>The window is specified in bytes.</para>
         <para>Setting this parameter to <literal>-1</literal> disables any buffering and prevents
             any re-attachment from occurring, forcing reconnect instead. The default value for this
-            parameter is <literal>-1</literal>.</para>
+            parameter is <literal>-1</literal>. (Which means by default no auto re-attachment will occur)</para>
     </section>
     <section>
         <title>Session reconnection</title>

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -79,7 +79,8 @@
 
    private final int ackBatchSize;
 
-   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false, ClientConsumerImpl.NUM_PRIORITIES);
+   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false,
+                                                                                                                      ClientConsumerImpl.NUM_PRIORITIES);
 
    private final Runner runner = new Runner();
 
@@ -114,9 +115,11 @@
    private boolean stopped = false;
 
    private final AtomicLong forceDeliveryCount = new AtomicLong(0);
-   
+
    private final SessionQueueQueryResponseMessage queueInfo;
 
+   private volatile boolean ackIndividually;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -151,7 +154,7 @@
       this.clientWindowSize = clientWindowSize;
 
       this.ackBatchSize = ackBatchSize;
-      
+
       this.queueInfo = queueInfo;
    }
 
@@ -192,7 +195,7 @@
          // Effectively infinite
          timeout = Long.MAX_VALUE;
       }
-      
+
       boolean deliveryForced = false;
 
       long start = -1;
@@ -414,6 +417,8 @@
       lastAckedMessage = null;
 
       creditsToSend = 0;
+      
+      ackIndividually = false;
    }
 
    public synchronized void start()
@@ -435,7 +440,7 @@
    {
       return queueInfo;
    }
-   
+
    public long getID()
    {
       return id;
@@ -463,16 +468,24 @@
          // This is ok - we just ignore the message
          return;
       }
-        
+
       ClientMessageInternal messageToHandle = message;
-      
+
       if (messageToHandle.getAddress() == null)
       {
          messageToHandle.setAddressTransient(queueInfo.getAddress());
       }
 
       messageToHandle.onReceipt(this);
-      
+
+      if (message.getPriority() != 4)
+      {
+         // We have messages of different priorities so we need to ack them individually since the order
+         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
+         // consumed in, which means that acking all up to won't work
+         ackIndividually = true;
+      }
+
       // Add it to the buffer
       buffer.addLast(messageToHandle, messageToHandle.getPriority());
 
@@ -546,9 +559,9 @@
          // Need to send credits for the messages in the buffer
 
          HQIterator<ClientMessageInternal> iter = buffer.iterator();
-         
+
          ClientMessageInternal message;
-         
+
          while ((message = iter.next()) != null)
          {
             flowControlBeforeConsumption(message);
@@ -575,16 +588,28 @@
    public void acknowledge(final ClientMessage message) throws HornetQException
    {
       ClientMessageInternal cmi = (ClientMessageInternal)message;
-      
-      ackBytes += message.getEncodeSize();
 
-      if (ackBytes >= ackBatchSize)
+      if (ackIndividually)
       {
-         doAck(cmi);
+         if (lastAckedMessage != null)
+         {
+            flushAcks();
+         }
+         
+         session.individualAcknowledge(id, message.getMessageID());
       }
       else
       {
-         lastAckedMessage = cmi;
+         ackBytes += message.getEncodeSize();
+
+         if (ackBytes >= ackBatchSize)
+         {
+            doAck(cmi);
+         }
+         else
+         {
+            lastAckedMessage = cmi;
+         }
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,8 +51,9 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
@@ -576,10 +577,10 @@
    }
 
    public ClientMessage createMessage(final byte type,
-                                            final boolean durable,
-                                            final long expiration,
-                                            final long timestamp,
-                                            final byte priority)
+                                      final boolean durable,
+                                      final long expiration,
+                                      final long timestamp,
+                                      final byte priority)
    {
       return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize);
    }
@@ -712,6 +713,30 @@
       }
    }
 
+   public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+   {
+      // if we're pre-acknowledging then we don't need to do anything
+      if (preAcknowledge)
+      {
+         return;
+      }
+
+      checkClosed();
+
+      SessionIndividualAcknowledgeMessage message = new SessionIndividualAcknowledgeMessage(consumerID,
+                                                                                            messageID,
+                                                                                            blockOnAcknowledge);
+
+      if (blockOnAcknowledge)
+      {
+         channel.sendBlocking(message);
+      }
+      else
+      {
+         channel.sendBatched(message);
+      }
+   }
+
    public void expire(final long consumerID, final long messageID) throws HornetQException
    {
       checkClosed();
@@ -719,7 +744,7 @@
       // We don't send expiries for pre-ack since message will already have been acked on server
       if (!preAcknowledge)
       {
-         SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+         SessionExpireMessage message = new SessionExpireMessage(consumerID, messageID);
 
          channel.send(message);
       }
@@ -851,9 +876,9 @@
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
          remotingConnection = backupConnection;
-         
+
          int lcid = channel.getLastConfirmedCommandID();
-         
+
          Packet request = new ReattachSessionMessage(name, lcid);
 
          Channel channel1 = backupConnection.getChannel(1, -1);
@@ -864,11 +889,11 @@
          {
             // The session was found on the server - we reattached transparently ok
 
-            channel.replayCommands(response.getLastConfirmedCommandID());                        
+            channel.replayCommands(response.getLastConfirmedCommandID());
          }
          else
          {
-            
+
             // The session wasn't found on the server - probably we're failing over onto a backup server where the
             // session won't exist or the target server has been restarted - in this case the session will need to be
             // recreated,
@@ -892,7 +917,8 @@
                                                                autoCommitAcks,
                                                                preAcknowledge,
                                                                confirmationWindowSize,
-                                                               defaultAddress == null ? null : defaultAddress.toString());
+                                                               defaultAddress == null ? null
+                                                                                     : defaultAddress.toString());
                boolean retry = false;
                do
                {
@@ -924,7 +950,7 @@
                for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
                {
                   SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-                  
+
                   // We try and recreate any non durable queues, since they probably won't be there unless
                   // they are defined in hornetq-configuration.xml
                   // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
@@ -950,7 +976,7 @@
                                                                                                         false);
 
                   sendPacketWithoutLock(createConsumerRequest);
-                  
+
                   int clientWindowSize = entry.getValue().getClientWindowSize();
 
                   if (clientWindowSize != 0)
@@ -995,7 +1021,7 @@
             channel.returnBlocking();
          }
 
-         channel.setTransferring(false);         
+         channel.setTransferring(false);
       }
       catch (Throwable t)
       {
@@ -1014,15 +1040,15 @@
          // not having any credits to send
       }
    }
-   
+
    private volatile SimpleString defaultAddress;
-   
+
    public void setAddress(final Message message, final SimpleString address)
    {
       if (defaultAddress == null)
       {
          defaultAddress = address;
-         
+
          message.setAddress(address);
       }
       else
@@ -1037,9 +1063,7 @@
          }
       }
    }
-   
-   
-   
+
    public void setPacketSize(final int packetSize)
    {
       if (packetSize > this.initialMessagePacketSize)
@@ -1083,7 +1107,7 @@
    {
       return producerCreditManager.getCredits(address, anon);
    }
-   
+
    public void returnCredits(final SimpleString address)
    {
       producerCreditManager.returnCredits(address);
@@ -1093,7 +1117,7 @@
    {
       producerCreditManager.receiveCredits(address, credits);
    }
-   
+
    public ClientProducerCreditManager getProducerCreditManager()
    {
       return producerCreditManager;

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -33,6 +33,8 @@
    String getName();
 
    void acknowledge(long consumerID, long messageID) throws HornetQException;
+   
+   void individualAcknowledge(long consumerID, long messageID) throws HornetQException;
 
    boolean isCacheLargeMessageClient();
 

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -103,6 +103,11 @@
    {
       session.acknowledge(consumerID, messageID);
    }
+   
+   public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+   {
+      session.individualAcknowledge(consumerID, messageID);
+   }
 
    public void addConsumer(final ClientConsumerInternal consumer)
    {

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -24,6 +24,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -67,8 +68,9 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@@ -281,7 +283,7 @@
                }
                case SESS_EXPIRED:
                {
-                  SessionExpiredMessage message = (SessionExpiredMessage)packet;
+                  SessionExpireMessage message = (SessionExpireMessage)packet;
                   session.expire(message.getConsumerID(), message.getMessageID());
                   break;
                }
@@ -414,6 +416,17 @@
                   closeChannel = true;
                   break;
                }
+               case SESS_INDIVIDUAL_ACKNOWLEDGE:
+               {
+                  SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage)packet;
+                  requiresResponse = message.isRequiresResponse();
+                  session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
+                  if (requiresResponse)
+                  {
+                     response = new NullResponseMessage();
+                  }
+                  break;
+               }
                case SESS_CONSUMER_CLOSE:
                {
                   requiresResponse = true;

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -188,13 +188,15 @@
             {
                resendCache.add(packet);
             }
-
-            connection.getTransportConnection().write(buffer, flush, batch);
          }
          finally
          {
             lock.unlock();
          }
+         
+         //The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+         //buffer is full, preventing any incoming buffers being handled and blocking failover
+         connection.getTransportConnection().write(buffer, flush, batch);
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -48,6 +48,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_CREDITS;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
@@ -114,8 +115,9 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -216,7 +218,7 @@
          }
          case SESS_EXPIRED:
          {
-            packet = new SessionExpiredMessage();
+            packet = new SessionExpireMessage();
             break;
          }
          case SESS_COMMIT:
@@ -379,6 +381,11 @@
             packet = new SessionConsumerCloseMessage();
             break;
          }
+         case SESS_INDIVIDUAL_ACKNOWLEDGE:
+         {
+            packet = new SessionIndividualAcknowledgeMessage();
+            break;
+         }
          case NULL_RESPONSE:
          {
             packet = new NullResponseMessage();

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -149,6 +149,8 @@
    public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
 
    public static final byte SESS_PRODUCER_CREDITS = 80;
+   
+   public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
 
    // Replication
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -29,7 +29,7 @@
    private long consumerID;
 
    private long messageID;
-
+   
    private boolean requiresResponse;
 
    // Static --------------------------------------------------------

Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java (from rev 9052, trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionExpireMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long consumerID;
+
+   private long messageID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionExpireMessage(final long consumerID, final long messageID)
+   {
+      super(PacketImpl.SESS_EXPIRED);
+
+      this.consumerID = consumerID;
+
+      this.messageID = messageID;
+   }
+
+   public SessionExpireMessage()
+   {
+      super(PacketImpl.SESS_EXPIRED);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getConsumerID()
+   {
+      return consumerID;
+   }
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(consumerID);
+
+      buffer.writeLong(messageID);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      consumerID = buffer.readLong();
+
+      messageID = buffer.readLong();
+   }
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (other instanceof SessionExpireMessage == false)
+      {
+         return false;
+      }
+
+      SessionExpireMessage r = (SessionExpireMessage)other;
+
+      return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class SessionExpiredMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private long consumerID;
-
-   private long messageID;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionExpiredMessage(final long consumerID, final long messageID)
-   {
-      super(PacketImpl.SESS_EXPIRED);
-
-      this.consumerID = consumerID;
-
-      this.messageID = messageID;
-   }
-
-   public SessionExpiredMessage()
-   {
-      super(PacketImpl.SESS_EXPIRED);
-   }
-
-   // Public --------------------------------------------------------
-
-   public long getConsumerID()
-   {
-      return consumerID;
-   }
-
-   public long getMessageID()
-   {
-      return messageID;
-   }
-
-   @Override
-   public void encodeRest(final HornetQBuffer buffer)
-   {
-      buffer.writeLong(consumerID);
-
-      buffer.writeLong(messageID);
-   }
-
-   @Override
-   public void decodeRest(final HornetQBuffer buffer)
-   {
-      consumerID = buffer.readLong();
-
-      messageID = buffer.readLong();
-   }
-
-   @Override
-   public boolean equals(final Object other)
-   {
-      if (other instanceof SessionExpiredMessage == false)
-      {
-         return false;
-      }
-
-      SessionExpiredMessage r = (SessionExpiredMessage)other;
-
-      return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionIndividualAcknowledgeMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long consumerID;
+
+   private long messageID;
+   
+   private boolean requiresResponse;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionIndividualAcknowledgeMessage(final long consumerID, final long messageID, final boolean requiresResponse)
+   {
+      super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+
+      this.consumerID = consumerID;
+
+      this.messageID = messageID;
+
+      this.requiresResponse = requiresResponse;
+   }
+
+   public SessionIndividualAcknowledgeMessage()
+   {
+      super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getConsumerID()
+   {
+      return consumerID;
+   }
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(consumerID);
+
+      buffer.writeLong(messageID);
+
+      buffer.writeBoolean(requiresResponse);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      consumerID = buffer.readLong();
+
+      messageID = buffer.readLong();
+
+      requiresResponse = buffer.readBoolean();
+   }
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (other instanceof SessionIndividualAcknowledgeMessage == false)
+      {
+         return false;
+      }
+
+      SessionIndividualAcknowledgeMessage r = (SessionIndividualAcknowledgeMessage)other;
+
+      return super.equals(other) && consumerID == r.consumerID &&
+             messageID == r.messageID &&
+             requiresResponse == r.requiresResponse;
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -38,9 +38,11 @@
 
    Queue getQueue();
 
-   MessageReference getExpired(long messageID) throws Exception;
+   MessageReference removeReferenceByID(long messageID) throws Exception;
 
    void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+   
+   void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
 
    void forceDelivery(long sequence);   
    

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -18,7 +18,6 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.spi.core.protocol.SessionCallback;
 
 /**
  *
@@ -43,6 +42,8 @@
    void removeConsumer(long consumerID) throws Exception;
 
    void acknowledge(long consumerID, long messageID) throws Exception;
+   
+   void individualAcknowledge(long consumerID, long messageID) throws Exception;
 
    void expire(long consumerID, long messageID) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -517,8 +517,32 @@
       }
       while (ref.getMessage().getMessageID() != messageID);
    }
+   
+   public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+   {
+      if (browseOnly)
+      {
+         return;
+      }
+      
+      MessageReference ref = removeReferenceByID(messageID);
+      
+      if (ref == null)
+      {
+         throw new IllegalStateException("Cannot find ref to ack " + messageID);
+      }
+      
+      if (autoCommitAcks)
+      {
+         ref.getQueue().acknowledge(ref);
+      }
+      else
+      {
+         ref.getQueue().acknowledge(tx, ref);
+      }
+   }
 
-   public MessageReference getExpired(final long messageID) throws Exception
+   public MessageReference removeReferenceByID(final long messageID) throws Exception
    {
       if (browseOnly)
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -473,10 +473,22 @@
 
       consumer.acknowledge(autoCommitAcks, tx, messageID);
    }
+   
+   public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
+   {
+      ServerConsumer consumer = consumers.get(consumerID);
+      
+      if (this.xa && tx == null)
+      {
+         throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
+      }
 
+      consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+   }
+
    public void expire(final long consumerID, final long messageID) throws Exception
    {
-      MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+      MessageReference ref = consumers.get(consumerID).removeReferenceByID(messageID);
 
       if (ref != null)
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -201,7 +201,68 @@
       session.deleteQueue(queue);
 
    }
+   
+   // https://jira.jboss.org/jira/browse/HORNETQ-275
+   public void testOutOfOrderAcknowledgement() throws Exception
+   {
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString address = RandomUtil.randomSimpleString();
 
+      session.createQueue(address, queue, false);
+
+      ClientProducer producer = session.createProducer(address);
+
+      ClientConsumer consumer = session.createConsumer(queue);
+
+      session.start();
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage m = createTextMessage(Integer.toString(i), session);
+         m.setPriority((byte)i);
+         producer.send(m);
+
+         Thread.sleep(20);
+      }
+
+      // Now we wait a little bit to make sure the messages are in the client side buffer
+
+      // They should have been added to the delivering list in the ServerConsumerImpl in the order
+      // they were sent, not priority order
+
+      //We receive one of the messages
+      ClientMessage m = consumer.receive(500);
+      Assert.assertNotNull(m);
+      Assert.assertEquals(9, m.getPriority());
+
+      //Ack it
+      m.acknowledge();
+
+      consumer.close();
+      
+      //Close and try and receive the other ones
+
+      consumer = session.createConsumer(queue);
+
+      // Other messages should be received now
+      // Previously there was a bug whereby if deliveries were stored on server side in send order
+      // then if received in priority order, and acked
+      // the ack would ack all messages up to the one received - resulting in acking
+      // messages that hadn't been delivered yet
+      for (int i = 8; i >= 0; i--)
+      {
+         m = consumer.receive(500);
+         Assert.assertNotNull(m);
+         Assert.assertEquals(i, m.getPriority());
+
+         m.acknowledge();
+      }
+      
+      consumer.close();
+
+      session.deleteQueue(queue);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,6 +51,7 @@
       final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.integration.transports.netty.NettyConnectorFactory"));
       sf.setReconnectAttempts(-1);
       sf.setConfirmationWindowSize(1024 * 1024);
+      sf.setAckBatchSize(0);
       return sf;
    }
 

Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2010-04-07 12:12:01 UTC (rev 9066)
@@ -881,10 +881,10 @@
    protected ClientMessage createTextMessage(final String s, final boolean durable, final ClientSession clientSession)
    {
       ClientMessage message = clientSession.createMessage(HornetQTextMessage.TYPE,
-                                                                durable,
-                                                                0,
-                                                                System.currentTimeMillis(),
-                                                                (byte)1);
+                                                          durable,
+                                                          0,
+                                                          System.currentTimeMillis(),
+                                                          (byte)4);
       message.getBodyBuffer().writeString(s);
       return message;
    }



More information about the hornetq-commits mailing list