[hornetq-commits] JBoss hornetq SVN: r8082 - in trunk: src/main/org/hornetq/core/remoting/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 13 05:02:46 EDT 2009


Author: jmesnil
Date: 2009-10-13 05:02:45 -0400 (Tue, 13 Oct 2009)
New Revision: 8082

Added:
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-50: ReceiveImmediate on consumer should check the server for messages

* added packet SessionForceConsumerDelivery. when it is sent one-way by the client, the server will prompt the queue
  for delivery and sent back to the consumer a "force delivery" message
* in ClientConsumerImpl.receive(long, boolean), added a boolean forcingDelivery to force the delivery and wait for a "forced delivery"
  message which is discarded when it is received (and receive() returns null)
* ClientConsumer.receiveImmediate() is the only receive*() method which forces message delivery
* JMS HornetQQueueBrowser uses ClientConsumer.receiveImmediate() to check if there are more messages to browse on the queue

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -15,6 +15,7 @@
 
 import java.io.File;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.ClientMessage;
@@ -52,6 +53,8 @@
    public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
 
    public static final int NUM_PRIORITIES = 10;
+   
+   public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
 
    // Attributes
    // -----------------------------------------------------------------------------------
@@ -108,6 +111,8 @@
 
    private boolean stopped = false;
 
+   private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -146,7 +151,7 @@
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-   public ClientMessage receive(long timeout) throws HornetQException
+   private ClientMessage receive(long timeout, boolean forcingDelivery) throws HornetQException
    {
       checkClosed();
 
@@ -181,6 +186,8 @@
          timeout = Long.MAX_VALUE;
       }
 
+      boolean deliveryForced = false;
+      
       long start = -1;
 
       long toWait = timeout;
@@ -194,13 +201,22 @@
             synchronized (this)
             {
                while ((stopped || (m = buffer.removeFirst()) == null) && !closed && toWait > 0)
-
                {
                   if (start == -1)
                   {
                      start = System.currentTimeMillis();
                   }
 
+                  if (m == null && forcingDelivery)
+                  {                     
+                     // we only force delivery once per call to receive
+                     if (!deliveryForced)
+                     {
+                        session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+                        deliveryForced = true;
+                     }
+                  }
+                  
                   try
                   {
                      wait(toWait);
@@ -213,6 +229,11 @@
                   {
                      break;
                   }
+                  
+                  if (forcingDelivery && stopped)
+                  {
+                     break;
+                  }
 
                   long now = System.currentTimeMillis();
 
@@ -224,6 +245,20 @@
 
             if (m != null)
             {
+               if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
+               {
+                  Long seq = (Long)m.getProperty(FORCED_DELIVERY_MESSAGE);
+                  if (seq >= forceDeliveryCount.longValue())
+                  {
+                     // forced delivery messages are discarded, nothing has been delivered by the queue
+                     return null;
+                  }
+                  else
+                  {
+                     // ignore any previous forced delivery message
+                     continue;                  
+                  }
+               }
                // if we have already pre acked we cant expire
                boolean expired = m.isExpired();
 
@@ -269,14 +304,19 @@
       }
    }
 
+   public ClientMessage receive(long timeout) throws HornetQException
+   {
+      return receive(timeout, false);
+   }
+   
    public ClientMessage receive() throws HornetQException
    {
-      return receive(0);
+      return receive(0, false);
    }
 
    public ClientMessage receiveImmediate() throws HornetQException
    {
-      return receive(-1);
+      return receive(0, true);
    }
 
    public MessageHandler getMessageHandler() throws HornetQException

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -211,6 +211,17 @@
          return true;
       }
    }
+   
+   @Override
+   public String toString()
+   {
+      return "ClientMessage[messageID=" + messageID +
+             ", durable=" +
+             durable +
+             ", destination=" +
+             getDestination() +
+             "]";
+   }
 
 
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -49,6 +49,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -329,7 +330,16 @@
 
       return response;
    }
+   
+   public void forceDelivery(long consumerID, long sequence) throws HornetQException
+   {
+      checkClosed();
 
+      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+
+      channel.send(request);
+   }
+
    public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
    {
       return createConsumer(queueName, null, false);

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -68,4 +68,6 @@
    FailoverManager getConnectionManager();
    
    void workDone();
+   
+   void forceDelivery(long consumerID, long sequence) throws HornetQException;
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -119,6 +119,11 @@
       return session.bindingQuery(address);
    }
 
+   public void forceDelivery(long consumerID, long sequence) throws HornetQException
+   {
+      session.forceDelivery(consumerID, sequence);
+   }
+   
    public void cleanUp() throws Exception
    {
       session.cleanUp();

Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,6 +30,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -77,6 +78,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -350,6 +352,11 @@
             packet = new SessionSendContinuationMessage();
             break;
          }        
+         case SESS_FORCE_CONSUMER_DELIVERY:
+         {
+            packet = new SessionForceConsumerDelivery();
+            break;
+         }        
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -137,6 +137,8 @@
 
    public static final byte SESS_RECEIVE_CONTINUATION = 76;
 
+   public static final byte SESS_FORCE_CONSUMER_DELIVERY = 77;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * 
+ * A SessionConsumerForceDelivery
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class SessionForceConsumerDelivery extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long consumerID;
+   private long sequence;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionForceConsumerDelivery(final long consumerID, final long sequence)
+   {
+      super(SESS_FORCE_CONSUMER_DELIVERY);
+
+      this.consumerID = consumerID;
+      this.sequence = sequence;
+   }
+
+   public SessionForceConsumerDelivery()
+   {
+      super(SESS_FORCE_CONSUMER_DELIVERY);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getConsumerID()
+   {
+      return consumerID;
+   }
+   
+   public long getSequence()
+   {
+      return sequence;
+   }
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+   }
+
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(consumerID);
+      buffer.writeLong(sequence);
+   }
+
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      consumerID = buffer.readLong();
+      sequence = buffer.readLong();
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buf = new StringBuffer(getParentString());
+      buf.append(", consumerID=" + consumerID);
+      buf.append(", sequence=" + sequence);
+      buf.append("]");
+      return buf.toString();
+   }
+
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionForceConsumerDelivery == false)
+      {
+         return false;
+      }
+
+      SessionForceConsumerDelivery r = (SessionForceConsumerDelivery)other;
+
+      return super.equals(other) && this.consumerID == r.consumerID && this.sequence == r.sequence;
+   }
+
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,8 +30,6 @@
 	
 	void close() throws Exception;
 	
-	int getCountOfPendingDeliveries();
-
 	List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
 	
 	void setStarted(boolean started);
@@ -43,4 +41,6 @@
 	MessageReference getExpired(long messageID) throws Exception;
 	
 	void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+
+   void forceDelivery(long sequence);
 }

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -21,6 +21,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -66,7 +67,7 @@
 
    void close() throws Exception;
 
-   void promptDelivery(Queue queue);
+   void promptDelivery(Queue queue, boolean async);
 
    void handleAcknowledge(final SessionAcknowledgeMessage packet);
 
@@ -124,6 +125,8 @@
 
    void handleSendLargeMessage(SessionSendLargeMessage packet);
 
+   void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+
    void handleClose(Packet packet);
 
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -280,10 +280,9 @@
       }
    }
 
-   // Only used in testing - do not call directly!
    public synchronized void deliverNow()
    {
-      deliver();
+      deliverRunner.run();
    }
 
    public synchronized void addConsumer(final Consumer consumer) throws Exception

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -22,6 +22,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
 import org.hornetq.core.client.management.impl.ManagementHelper;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
@@ -46,7 +47,6 @@
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -255,9 +255,30 @@
       }
    }
 
-   public int getCountOfPendingDeliveries()
-   {
-      return deliveringRefs.size();
+   /**
+    * Prompt delivery and send a "forced delivery" message to the consumer.
+    * When the consumer receives such a "forced delivery" message, it discards it
+    * and knows that there are no other messages to be delivered.
+    */
+   public synchronized void forceDelivery(final long sequence)
+   {        
+      // The prompt delivery is called synchronously to ensure the "forced delivery" message is
+      // sent after any queue delivery.
+      executor.execute(new Runnable()
+      {         
+         public void run()
+         {
+            promptDelivery(false);
+            
+            ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+            forcedDeliveryMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
+            forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+            forcedDeliveryMessage.setDestination(messageQueue.getName());
+
+            final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+            channel.send(packet);
+         }
+      });
    }
 
    public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
@@ -305,7 +326,7 @@
       // Outside the lock
       if (started)
       {
-         promptDelivery();
+         promptDelivery(true);
       }
    }
 
@@ -331,7 +352,7 @@
 
          if (previous <= 0 && previous + credits > 0)
          {
-            promptDelivery();
+            promptDelivery(true);
          }
       }
    }
@@ -420,7 +441,7 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private void promptDelivery()
+   private void promptDelivery(boolean asyncDelivery)
    {
       lock.lock();
       try
@@ -435,11 +456,18 @@
          {
             if (browseOnly)
             {
-               executor.execute(browserDeliverer);
+               if (asyncDelivery)
+               {
+                  executor.execute(browserDeliverer);
+               }
+               else
+               {
+                  browserDeliverer.run();
+               }
             }
             else
             {
-               session.promptDelivery(messageQueue);
+               session.promptDelivery(messageQueue, asyncDelivery);
             }
          }
       }
@@ -590,7 +618,7 @@
                else
                {
                   // prompt Delivery only if chunk was finished
-                  session.promptDelivery(messageQueue);
+                  session.promptDelivery(messageQueue, true);
                }
             }
          }
@@ -865,7 +893,7 @@
             }
             catch (Exception e)
             {
-               log.warn("Exception while browser handled from " + messageQueue + ": " + current);
+               log.warn("Exception while browser handled from " + messageQueue + ": " + current, e);
                return;
             }
          }
@@ -886,7 +914,7 @@
             }
             catch (Exception e)
             {
-               log.warn("Exception while browser handled from " + messageQueue + ": " + ref);
+               log.warn("Exception while browser handled from " + messageQueue + ": " + ref, e);
                break;
             }
          }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -56,6 +56,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -320,9 +321,16 @@
       remotingConnection.removeFailureListener(this);
    }
 
-   public void promptDelivery(final Queue queue)
+   public void promptDelivery(final Queue queue, boolean async)
    {
-      queue.deliverAsync(executor);
+      if (async)
+      {
+         queue.deliverAsync(executor);
+      }
+      else
+      {
+         queue.deliverNow();
+      }
    }
 
    public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
@@ -626,7 +634,21 @@
 
       channel.send(response);
    }
+   
+   public void handleForceConsumerDelivery(SessionForceConsumerDelivery message)
+   {
+      try
+      {
+         ServerConsumer consumer = consumers.get(message.getConsumerID());
 
+         consumer.forceDelivery(message.getSequence());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to query consumer deliveries", e);
+      }
+   }
+
    public void handleAcknowledge(final SessionAcknowledgeMessage packet)
    {
       Packet response = null;

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -20,6 +20,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -51,6 +52,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -267,6 +269,12 @@
                session.handleSendContinuations(message);
                break;
             }
+            case SESS_FORCE_CONSUMER_DELIVERY:
+            {
+               SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
+               session.handleForceConsumerDelivery(message);
+               break;               
+            }
          }
       }
       catch (Throwable t)

Modified: trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java	2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -32,8 +32,8 @@
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- *         <p/>
- *         $Id$
+ *
+ * $Id$
  */
 public class HornetQQueueBrowser implements QueueBrowser
 {
@@ -41,8 +41,6 @@
 
    private static final Logger log = Logger.getLogger(HornetQQueueBrowser.class);
 
-   private static final long NEXT_MESSAGE_TIMEOUT = 1000;
-
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
@@ -136,9 +134,7 @@
          {
             try
             {
-               // todo change this to consumer.receiveImmediate() once
-               // https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
-               current = consumer.receive(NEXT_MESSAGE_TIMEOUT);
+               current = consumer.receiveImmediate();
             }
             catch (HornetQException e)
             {

Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ReceiveImmediateTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ReceiveImmediateTest.class);
+
+   private HornetQServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
+   
+   private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration config = createDefaultConfig(false);
+      server = HornetQ.newHornetQServer(config, false);
+
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   private ClientSessionFactory sf;
+
+   public void testConsumerReceiveImmediateWithNoMessages() throws Exception
+   {
+      doConsumerReceiveImmediateWithNoMessages(false);
+   }
+
+   public void testConsumerReceiveImmediate() throws Exception
+   {
+      doConsumerReceiveImmediate(false);
+   }
+
+   public void testBrowserReceiveImmediateWithNoMessages() throws Exception
+   {
+      doConsumerReceiveImmediateWithNoMessages(true);
+   }
+
+   public void testBrowserReceiveImmediate() throws Exception
+   {
+      doConsumerReceiveImmediate(true);
+   }
+
+   private void doConsumerReceiveImmediateWithNoMessages(boolean browser) throws Exception
+   {
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+      sf.setAckBatchSize(0);
+      
+      ClientSession session = sf.createSession(false, true, false);
+
+      session.createQueue(ADDRESS, QUEUE, null, false);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+      session.start();
+
+      ClientMessage message = consumer.receiveImmediate();
+      assertNull(message);
+
+      session.close();
+
+      sf.close();
+   }
+
+   private void doConsumerReceiveImmediate(boolean browser) throws Exception
+   {
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+      sf.setAckBatchSize(0);
+      
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, QUEUE, null, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receiveImmediate();
+         assertNotNull("did not receive message " + i, message2);
+         assertEquals("m" + i, message2.getBody().readString());
+         if (!browser)
+         {
+            message2.acknowledge();
+         }
+      }
+
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+
+      assertNull(consumer.receiveImmediate());
+
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      int messagesOnServer = (browser ? numMessages : 0);
+      assertEquals(messagesOnServer, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+      consumer.close();
+
+      session.close();
+
+      sf.close();
+
+   }
+
+}



More information about the hornetq-commits mailing list