[hornetq-commits] JBoss hornetq SVN: r8082 - in trunk: src/main/org/hornetq/core/remoting/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 13 05:02:46 EDT 2009
Author: jmesnil
Date: 2009-10-13 05:02:45 -0400 (Tue, 13 Oct 2009)
New Revision: 8082
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-50: ReceiveImmediate on consumer should check the server for messages
* added packet SessionForceConsumerDelivery. when it is sent one-way by the client, the server will prompt the queue
for delivery and sent back to the consumer a "force delivery" message
* in ClientConsumerImpl.receive(long, boolean), added a boolean forcingDelivery to force the delivery and wait for a "forced delivery"
message which is discarded when it is received (and receive() returns null)
* ClientConsumer.receiveImmediate() is the only receive*() method which forces message delivery
* JMS HornetQQueueBrowser uses ClientConsumer.receiveImmediate() to check if there are more messages to browse on the queue
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientMessage;
@@ -52,6 +53,8 @@
public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
public static final int NUM_PRIORITIES = 10;
+
+ public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
// Attributes
// -----------------------------------------------------------------------------------
@@ -108,6 +111,8 @@
private boolean stopped = false;
+ private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -146,7 +151,7 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- public ClientMessage receive(long timeout) throws HornetQException
+ private ClientMessage receive(long timeout, boolean forcingDelivery) throws HornetQException
{
checkClosed();
@@ -181,6 +186,8 @@
timeout = Long.MAX_VALUE;
}
+ boolean deliveryForced = false;
+
long start = -1;
long toWait = timeout;
@@ -194,13 +201,22 @@
synchronized (this)
{
while ((stopped || (m = buffer.removeFirst()) == null) && !closed && toWait > 0)
-
{
if (start == -1)
{
start = System.currentTimeMillis();
}
+ if (m == null && forcingDelivery)
+ {
+ // we only force delivery once per call to receive
+ if (!deliveryForced)
+ {
+ session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+ deliveryForced = true;
+ }
+ }
+
try
{
wait(toWait);
@@ -213,6 +229,11 @@
{
break;
}
+
+ if (forcingDelivery && stopped)
+ {
+ break;
+ }
long now = System.currentTimeMillis();
@@ -224,6 +245,20 @@
if (m != null)
{
+ if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
+ {
+ Long seq = (Long)m.getProperty(FORCED_DELIVERY_MESSAGE);
+ if (seq >= forceDeliveryCount.longValue())
+ {
+ // forced delivery messages are discarded, nothing has been delivered by the queue
+ return null;
+ }
+ else
+ {
+ // ignore any previous forced delivery message
+ continue;
+ }
+ }
// if we have already pre acked we cant expire
boolean expired = m.isExpired();
@@ -269,14 +304,19 @@
}
}
+ public ClientMessage receive(long timeout) throws HornetQException
+ {
+ return receive(timeout, false);
+ }
+
public ClientMessage receive() throws HornetQException
{
- return receive(0);
+ return receive(0, false);
}
public ClientMessage receiveImmediate() throws HornetQException
{
- return receive(-1);
+ return receive(0, true);
}
public MessageHandler getMessageHandler() throws HornetQException
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -211,6 +211,17 @@
return true;
}
}
+
+ @Override
+ public String toString()
+ {
+ return "ClientMessage[messageID=" + messageID +
+ ", durable=" +
+ durable +
+ ", destination=" +
+ getDestination() +
+ "]";
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -49,6 +49,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -329,7 +330,16 @@
return response;
}
+
+ public void forceDelivery(long consumerID, long sequence) throws HornetQException
+ {
+ checkClosed();
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+
+ channel.send(request);
+ }
+
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
{
return createConsumer(queueName, null, false);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -68,4 +68,6 @@
FailoverManager getConnectionManager();
void workDone();
+
+ void forceDelivery(long consumerID, long sequence) throws HornetQException;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -119,6 +119,11 @@
return session.bindingQuery(address);
}
+ public void forceDelivery(long consumerID, long sequence) throws HornetQException
+ {
+ session.forceDelivery(consumerID, sequence);
+ }
+
public void cleanUp() throws Exception
{
session.cleanUp();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,6 +30,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -77,6 +78,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -350,6 +352,11 @@
packet = new SessionSendContinuationMessage();
break;
}
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ packet = new SessionForceConsumerDelivery();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -137,6 +137,8 @@
public static final byte SESS_RECEIVE_CONTINUATION = 76;
+ public static final byte SESS_FORCE_CONSUMER_DELIVERY = 77;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ *
+ * A SessionConsumerForceDelivery
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class SessionForceConsumerDelivery extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+ private long sequence;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionForceConsumerDelivery(final long consumerID, final long sequence)
+ {
+ super(SESS_FORCE_CONSUMER_DELIVERY);
+
+ this.consumerID = consumerID;
+ this.sequence = sequence;
+ }
+
+ public SessionForceConsumerDelivery()
+ {
+ super(SESS_FORCE_CONSUMER_DELIVERY);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getSequence()
+ {
+ return sequence;
+ }
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+ }
+
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+ buffer.writeLong(sequence);
+ }
+
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+ sequence = buffer.readLong();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", consumerID=" + consumerID);
+ buf.append(", sequence=" + sequence);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionForceConsumerDelivery == false)
+ {
+ return false;
+ }
+
+ SessionForceConsumerDelivery r = (SessionForceConsumerDelivery)other;
+
+ return super.equals(other) && this.consumerID == r.consumerID && this.sequence == r.sequence;
+ }
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,8 +30,6 @@
void close() throws Exception;
- int getCountOfPendingDeliveries();
-
List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
void setStarted(boolean started);
@@ -43,4 +41,6 @@
MessageReference getExpired(long messageID) throws Exception;
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+
+ void forceDelivery(long sequence);
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -21,6 +21,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -66,7 +67,7 @@
void close() throws Exception;
- void promptDelivery(Queue queue);
+ void promptDelivery(Queue queue, boolean async);
void handleAcknowledge(final SessionAcknowledgeMessage packet);
@@ -124,6 +125,8 @@
void handleSendLargeMessage(SessionSendLargeMessage packet);
+ void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+
void handleClose(Packet packet);
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -280,10 +280,9 @@
}
}
- // Only used in testing - do not call directly!
public synchronized void deliverNow()
{
- deliver();
+ deliverRunner.run();
}
public synchronized void addConsumer(final Consumer consumer) throws Exception
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -22,6 +22,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
@@ -46,7 +47,6 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
/**
@@ -255,9 +255,30 @@
}
}
- public int getCountOfPendingDeliveries()
- {
- return deliveringRefs.size();
+ /**
+ * Prompt delivery and send a "forced delivery" message to the consumer.
+ * When the consumer receives such a "forced delivery" message, it discards it
+ * and knows that there are no other messages to be delivered.
+ */
+ public synchronized void forceDelivery(final long sequence)
+ {
+ // The prompt delivery is called synchronously to ensure the "forced delivery" message is
+ // sent after any queue delivery.
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ promptDelivery(false);
+
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+ forcedDeliveryMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setDestination(messageQueue.getName());
+
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+ channel.send(packet);
+ }
+ });
}
public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
@@ -305,7 +326,7 @@
// Outside the lock
if (started)
{
- promptDelivery();
+ promptDelivery(true);
}
}
@@ -331,7 +352,7 @@
if (previous <= 0 && previous + credits > 0)
{
- promptDelivery();
+ promptDelivery(true);
}
}
}
@@ -420,7 +441,7 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery()
+ private void promptDelivery(boolean asyncDelivery)
{
lock.lock();
try
@@ -435,11 +456,18 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ if (asyncDelivery)
+ {
+ executor.execute(browserDeliverer);
+ }
+ else
+ {
+ browserDeliverer.run();
+ }
}
else
{
- session.promptDelivery(messageQueue);
+ session.promptDelivery(messageQueue, asyncDelivery);
}
}
}
@@ -590,7 +618,7 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
+ session.promptDelivery(messageQueue, true);
}
}
}
@@ -865,7 +893,7 @@
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue + ": " + current);
+ log.warn("Exception while browser handled from " + messageQueue + ": " + current, e);
return;
}
}
@@ -886,7 +914,7 @@
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue + ": " + ref);
+ log.warn("Exception while browser handled from " + messageQueue + ": " + ref, e);
break;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -56,6 +56,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -320,9 +321,16 @@
remotingConnection.removeFailureListener(this);
}
- public void promptDelivery(final Queue queue)
+ public void promptDelivery(final Queue queue, boolean async)
{
- queue.deliverAsync(executor);
+ if (async)
+ {
+ queue.deliverAsync(executor);
+ }
+ else
+ {
+ queue.deliverNow();
+ }
}
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
@@ -626,7 +634,21 @@
channel.send(response);
}
+
+ public void handleForceConsumerDelivery(SessionForceConsumerDelivery message)
+ {
+ try
+ {
+ ServerConsumer consumer = consumers.get(message.getConsumerID());
+ consumer.forceDelivery(message.getSequence());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to query consumer deliveries", e);
+ }
+ }
+
public void handleAcknowledge(final SessionAcknowledgeMessage packet)
{
Packet response = null;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -20,6 +20,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -51,6 +52,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -267,6 +269,12 @@
session.handleSendContinuations(message);
break;
}
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
+ session.handleForceConsumerDelivery(message);
+ break;
+ }
}
}
catch (Throwable t)
Modified: trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -32,8 +32,8 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * <p/>
- * $Id$
+ *
+ * $Id$
*/
public class HornetQQueueBrowser implements QueueBrowser
{
@@ -41,8 +41,6 @@
private static final Logger log = Logger.getLogger(HornetQQueueBrowser.class);
- private static final long NEXT_MESSAGE_TIMEOUT = 1000;
-
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -136,9 +134,7 @@
{
try
{
- // todo change this to consumer.receiveImmediate() once
- // https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
- current = consumer.receive(NEXT_MESSAGE_TIMEOUT);
+ current = consumer.receiveImmediate();
}
catch (HornetQException e)
{
Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ReceiveImmediateTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveImmediateTest.class);
+
+ private HornetQServer server;
+
+ private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
+
+ private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(false);
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ private ClientSessionFactory sf;
+
+ public void testConsumerReceiveImmediateWithNoMessages() throws Exception
+ {
+ doConsumerReceiveImmediateWithNoMessages(false);
+ }
+
+ public void testConsumerReceiveImmediate() throws Exception
+ {
+ doConsumerReceiveImmediate(false);
+ }
+
+ public void testBrowserReceiveImmediateWithNoMessages() throws Exception
+ {
+ doConsumerReceiveImmediateWithNoMessages(true);
+ }
+
+ public void testBrowserReceiveImmediate() throws Exception
+ {
+ doConsumerReceiveImmediate(true);
+ }
+
+ private void doConsumerReceiveImmediateWithNoMessages(boolean browser) throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession(false, true, false);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+ session.start();
+
+ ClientMessage message = consumer.receiveImmediate();
+ assertNull(message);
+
+ session.close();
+
+ sf.close();
+ }
+
+ private void doConsumerReceiveImmediate(boolean browser) throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receiveImmediate();
+ assertNotNull("did not receive message " + i, message2);
+ assertEquals("m" + i, message2.getBody().readString());
+ if (!browser)
+ {
+ message2.acknowledge();
+ }
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+
+ assertNull(consumer.receiveImmediate());
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ int messagesOnServer = (browser ? numMessages : 0);
+ assertEquals(messagesOnServer, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+ consumer.close();
+
+ session.close();
+
+ sf.close();
+
+ }
+
+}
More information about the hornetq-commits
mailing list