[jboss-cvs] JBoss Messaging SVN: r3606 - in trunk: src/main/org/jboss/jms/client/api and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 21 16:01:36 EST 2008
Author: timfox
Date: 2008-01-21 16:01:36 -0500 (Mon, 21 Jan 2008)
New Revision: 3606
Added:
trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
Removed:
trunk/src/main/org/jboss/jms/client/api/Consumer.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java
trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
trunk/src/main/org/jboss/jms/client/api/ClientSession.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/Consumer.java
trunk/src/main/org/jboss/messaging/core/HandleStatus.java
trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
Log:
Client side cleanup part V
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -64,7 +64,7 @@
// Attributes ----------------------------------------------------
- private org.jboss.jms.client.api.Consumer cons;
+ private org.jboss.jms.client.api.ClientConsumer cons;
private org.jboss.jms.client.api.ClientSession sess;
@@ -117,7 +117,7 @@
cons = sess.createConsumerDelegate(dest.toCoreDestination(), messageSelector, false, subName, true);
- this.consumerID = cons.getConsumerID();
+ this.consumerID = cons.getID();
this.maxDeliveries = cons.getMaxDeliveries();
@@ -133,7 +133,7 @@
}
id = threadId.increment();
- internalThread = new Thread(this, "Connection Consumer for dest " + dest + " id=" + id);
+ internalThread = new Thread(this, "Connection ClientConsumer for dest " + dest + " id=" + id);
internalThread.start();
if (trace) { log.trace(this + " created"); }
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -32,7 +32,7 @@
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.destination.JBossDestination;
@@ -53,11 +53,11 @@
// Attributes ----------------------------------------------------
- protected Consumer consumer;
+ protected ClientConsumer consumer;
// Constructors --------------------------------------------------
- public JBossMessageConsumer(Consumer consumer)
+ public JBossMessageConsumer(ClientConsumer consumer)
{
this.consumer = consumer;
}
@@ -102,7 +102,6 @@
// QueueReceiver implementation ----------------------------------
-
public Queue getQueue() throws JMSException
{
return (Queue)JBossDestination.fromCoreDestination(consumer.getDestination());
@@ -121,7 +120,7 @@
return consumer.getNoLocal();
}
- public Consumer getDelegate()
+ public ClientConsumer getDelegate()
{
return consumer;
}
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -52,7 +52,7 @@
import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTemporaryQueue;
@@ -237,7 +237,7 @@
log.trace("attempting to create consumer for destination:" + d + (messageSelector == null ? "" : ", messageSelector: " + messageSelector) + (noLocal ? ", noLocal = true" : ""));
- org.jboss.jms.client.api.Consumer cd = session.
+ org.jboss.jms.client.api.ClientConsumer cd = session.
createConsumerDelegate(((JBossDestination)d).toCoreDestination(), messageSelector, noLocal, null, false);
return new JBossMessageConsumer(cd);
@@ -279,7 +279,7 @@
throw new InvalidDestinationException("Not a JBossTopic:" + topic);
}
- Consumer cd =
+ ClientConsumer cd =
session.createConsumerDelegate(((JBossTopic)topic).toCoreDestination(), null, false, name, false);
return new JBossMessageConsumer(cd);
@@ -309,7 +309,7 @@
messageSelector = null;
}
- Consumer cd = session.
+ ClientConsumer cd = session.
createConsumerDelegate(((JBossTopic)topic).toCoreDestination(), messageSelector, noLocal, name, false);
return new JBossMessageConsumer(cd);
Modified: trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -31,6 +31,4 @@
boolean hasNextMessage() throws JMSException;
Message[] nextMessageBlock(int maxMessages) throws JMSException;
-
-
}
Modified: trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConnection.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConnection.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,13 +29,12 @@
public interface ClientConnection extends Closeable
{
ClientSession createSessionDelegate(boolean transacted,
- int acknowledgmentMode, boolean isXA) throws JMSException;
+ int acknowledgmentMode, boolean isXA) throws JMSException;
String getClientID() throws JMSException;
int getServerID();
-
void setClientID(String id) throws JMSException;
void start() throws JMSException;
@@ -59,11 +58,6 @@
ServerSessionPool sessionPool,
int maxMessages) throws JMSException;
-// void registerFailoverListener(FailoverListener failoverListener);
-//
-// boolean unregisterFailoverListener(FailoverListener failoverListener);
-
-
public void setRemotingConnection(JMSRemotingConnection conn);
public Client getClient();
@@ -77,7 +71,4 @@
public String getID();
public byte getVersion();
-
-
-
}
Copied: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java (from rev 3604, trunk/src/main/org/jboss/jms/client/api/Consumer.java)
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.jms.client.api;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.jboss.jms.client.Closeable;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.messaging.core.Destination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public interface ClientConsumer extends Closeable
+{
+ String getID();
+
+ void changeRate(float newRate) throws JMSException;
+
+ MessageListener getMessageListener() throws JMSException;
+
+ void setMessageListener(MessageListener listener) throws JMSException;
+
+ Destination getDestination() throws JMSException;
+
+ boolean getNoLocal() throws JMSException;
+
+ String getMessageSelector() throws JMSException;
+
+ Message receive(long timeout) throws JMSException;
+
+ int getMaxDeliveries();
+
+ boolean isShouldAck();
+
+ void handleMessage(JBossMessage message) throws Exception;
+
+ void addToFrontOfBuffer(JBossMessage message) throws JMSException;
+
+ long getRedeliveryDelay();
+}
Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -17,7 +17,6 @@
import org.jboss.jms.client.Closeable;
import org.jboss.jms.client.impl.Ack;
import org.jboss.jms.client.impl.Cancel;
-import org.jboss.jms.client.impl.ClientConsumer;
import org.jboss.jms.client.impl.DeliveryInfo;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
@@ -38,21 +37,13 @@
*/
public interface ClientSession extends Closeable
{
-
-
ClientConnection getConnection();
- ClientConsumer getCallbackHandler(String consumerID);
-
- void addCallbackHandler(ClientConsumer handler);
-
- void removeCallbackHandler(ClientConsumer handler);
-
String getID();
/// Methods that will perform a server invocation ----------------------------------------------------------
- Consumer createConsumerDelegate(Destination destination, String selector,
+ ClientConsumer createConsumerDelegate(Destination destination, String selector,
boolean noLocal, String subscriptionName,
boolean isCC) throws JMSException;
@@ -187,8 +178,4 @@
public Object getCurrentTxId();
public void setCurrentTxId(Object currentTxId);
-
-
-
-
}
Deleted: trunk/src/main/org/jboss/jms/client/api/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/Consumer.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/Consumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,47 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.jms.client.api;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.jboss.jms.client.Closeable;
-import org.jboss.messaging.core.Destination;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public interface Consumer extends Closeable
-{
- void changeRate(float newRate) throws JMSException;
-
- // ConsumerEndpoint -------------------------------------------------------------------------
-
- // ConsumerDelegate --------------------------------------------------------------------------
- MessageListener getMessageListener() throws JMSException;
-
- void setMessageListener(MessageListener listener) throws JMSException;
-
- Destination getDestination() throws JMSException;
-
- boolean getNoLocal() throws JMSException;
-
- String getMessageSelector() throws JMSException;
-
- Message receive(long timeout) throws JMSException;
-
- String getConsumerID();
-
- int getMaxDeliveries();
-
- boolean isShouldAck();
-
-}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -232,7 +232,6 @@
clientID = ((GetClientIDResponse) sendBlocking(new GetClientIDRequest())).getClientID();
}
return clientID;
-
}
/**
@@ -382,9 +381,10 @@
{
log.trace("Failed to close", t);
}
- }
-
+ }
}
+
+ children.clear();
}
Deleted: trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,933 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.api.Consumer;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.PriorityLinkedList;
-import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
-import org.jboss.messaging.util.Future;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision: 2774 $</tt>
- *
- * $Id: MessageCallbackHandler.java 2774 2007-06-12 22:43:54Z timfox $
- */
-public class ClientConsumer
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log;
-
- // Static ---------------------------------------------------------------------------------------
-
- private static boolean trace;
-
- private static final int WAIT_TIMEOUT = 30000;
-
-
- static
- {
- log = Logger.getLogger(ClientConsumer.class);
- trace = log.isTraceEnabled();
- }
-
- private static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
- ClientSession del,
- int maxDeliveries, boolean shouldCancel)
- {
- Message msg = jbm.getCoreMessage();
-
- boolean expired = msg.isExpired();
-
- boolean reachedMaxDeliveries = jbm.getDeliveryCount() == maxDeliveries;
-
- if (expired || reachedMaxDeliveries)
- {
- if (trace)
- {
- if (expired)
- {
- log.trace(msg + " has expired, cancelling to server");
- }
- else
- {
- log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
- }
- }
-
- if (shouldCancel)
- {
- final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
- expired, reachedMaxDeliveries);
- try
- {
- del.cancelDelivery(cancel);
- }
- catch (JMSException e)
- {
- log.error("Failed to cancel delivery", e);
- }
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- //This is static so it can be called by the asf layer too
- public static void callOnMessage(ClientSession sess,
- MessageListener listener,
- String consumerID,
- boolean isConnectionConsumer,
- JBossMessage m,
- int ackMode,
- int maxDeliveries,
- ClientSession connectionConsumerSession,
- boolean shouldAck)
- throws JMSException
- {
- if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
- {
- //Message has been cancelled
- return;
- }
-
- DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
-
- m.incDeliveryCount();
-
- // If this is the callback-handler for a connection consumer we don't want to acknowledge or
- // add anything to the tx for this session.
- if (!isConnectionConsumer)
- {
- //We need to call preDeliver, deliver the message then call postDeliver - this is because
- //it is legal to call session.recover(), or session.rollback() from within the onMessage()
- //method in which case the last message needs to be delivered so it needs to know about it
- sess.preDeliver(deliveryInfo);
- }
-
- try
- {
- if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
-
- listener.onMessage(m);
-
- if (trace) { log.trace("listener's onMessage() finished"); }
- }
- catch (RuntimeException e)
- {
- log.error("RuntimeException was thrown from onMessage, " + m.getJMSMessageID() + " will be redelivered", e);
-
- // See JMS 1.1 spec 4.5.2
-
- if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- sess.recover();
- }
- }
-
- // If this is the callback-handler for a connection consumer we don't want to acknowledge
- // or add anything to the tx for this session
- if (!isConnectionConsumer)
- {
- if (trace) { log.trace("Calling postDeliver"); }
-
- sess.postDeliver();
-
- if (trace) { log.trace("Called postDeliver"); }
- }
- }
-
- // Attributes -----------------------------------------------------------------------------------
-
- /*
- * The buffer is now a priority linked list
- * This resolves problems whereby messages are delivered from the server side queue in
- * correct priority order, but because the old consumer list was not a priority list
- * then if messages were sitting waiting to be consumed on the client side, then higher
- * priority messages might be behind lower priority messages and thus get consumed out of order
- */
- private PriorityLinkedList<JBossMessage> buffer;
- private ClientSession sessionDelegate;
- private Consumer consumerDelegate;
- private String consumerID;
- private boolean isConnectionConsumer;
- private volatile Thread receiverThread;
- private MessageListener listener;
- private int ackMode;
- private boolean closed;
- private Object mainLock;
- private QueuedExecutor sessionExecutor;
- private boolean listenerRunning;
- private int maxDeliveries;
- private long lastDeliveryId = -1;
- private boolean waitingForLastDelivery;
- private boolean shouldAck;
- private long redeliveryDelay;
- private boolean paused;
- private int consumeCount;
- private boolean firstTime = true;
- private int bufferSize;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public ClientConsumer(boolean isCC, int ackMode,
- ClientSession sess, Consumer cons, String consumerID,
- String queueName,
- int bufferSize, QueuedExecutor sessionExecutor,
- int maxDeliveries, boolean shouldAck,
- long redeliveryDelay)
- {
- if (bufferSize < 1)
- {
- throw new IllegalArgumentException(this + " bufferSize must be > 0");
- }
-
- buffer = new PriorityLinkedListImpl<JBossMessage>(10);
- isConnectionConsumer = isCC;
- this.ackMode = ackMode;
- this.sessionDelegate = sess;
- this.consumerDelegate = cons;
- this.consumerID = consumerID;
- mainLock = new Object();
- this.sessionExecutor = sessionExecutor;
- this.maxDeliveries = maxDeliveries;
- this.shouldAck = shouldAck;
- this.redeliveryDelay = redeliveryDelay;
- this.bufferSize = bufferSize;
- }
-
- // Public ---------------------------------------------------------------------------------------
-
-
- public boolean isClosed()
- {
- return closed;
- }
-
- /**
- * Handles a message sent from the server.
- *
- * @param message The message
- */
- public void handleMessage(final JBossMessage message) throws Exception
- {
- synchronized (mainLock)
- {
- if (closed)
- {
- // Sanity - this should never happen - we should always wait for all deliveries to arrive
- // when closing
- throw new IllegalStateException(this + " is closed, so ignoring message");
- }
-
- message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
- message.doBeforeReceive();
-
- //Add it to the buffer
- buffer.addLast(message, message.getJMSPriority());
-
- lastDeliveryId = message.getDeliveryId();
-
- if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
- messageAdded();
- }
- }
-
- public void setMessageListener(MessageListener listener) throws JMSException
- {
- synchronized (mainLock)
- {
- if (receiverThread != null)
- {
- // Should never happen
- throw new IllegalStateException("Consumer is currently in receive(..). " +
- "Cannot set MessageListener");
- }
-
- this.listener = listener;
-
- if (listener != null && !buffer.isEmpty())
- {
- listenerRunning = true;
-
- this.queueRunner(new ListenerRunner());
- }
- }
- }
-
- public void cancelBuffer() throws JMSException
- {
- if (trace) { log.trace("Cancelling buffer: " + buffer.size()); }
-
- synchronized (mainLock)
- {
- // Now we cancel anything left in the buffer. The reason we do this now is that otherwise
- // the deliveries wouldn't get cancelled until session close (since we don't cancel
- // consumer's deliveries until then), which is too late - since we need to preserve the
- // order of messages delivered in a session.
-
- if (shouldAck && !buffer.isEmpty())
- {
- // Now we cancel any deliveries that might be waiting in our buffer. This is because
- // otherwise the messages wouldn't get cancelled until the corresponding session died.
- // So if another consumer in another session tried to consume from the channel before
- // that session died it wouldn't receive those messages.
- // We can't just cancel all the messages in the SCE since some of those messages might
- // have actually been delivered (unlike these) and we may want to acknowledge them
- // later, after this consumer has been closed
-
- List cancels = new ArrayList();
-
- for(Iterator i = buffer.iterator(); i.hasNext();)
- {
- JBossMessage mp = (JBossMessage)i.next();
-
- CancelImpl cancel =
- new CancelImpl(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
-
- cancels.add(cancel);
- }
-
- if (trace) { log.trace("Calling cancelDeliveries"); }
- sessionDelegate.cancelDeliveries(cancels);
- if (trace) { log.trace("Done call"); }
-
- buffer.clear();
- }
- }
- }
-
- public void close(long lastDeliveryId) throws JMSException
- {
- log.trace(this + " close");
-
- //Wait for the last delivery to arrive
- waitForLastDelivery(lastDeliveryId);
-
- //Important! We set the listener to null so the next ListenerRunner won't run
- if (listener != null)
- {
- setMessageListener(null);
- }
-
- //Now we wait for any current listener runners to run.
- waitForOnMessageToComplete();
-
- synchronized (mainLock)
- {
- if (closed)
- {
- return;
- }
-
- closed = true;
-
- if (receiverThread != null)
- {
- // Wake up any receive() thread that might be waiting
- mainLock.notify();
- }
-
- this.listener = null;
- }
-
- if (trace) { log.trace(this + " closed"); }
- }
-
- /**
- * Method used by the client thread to get a Message, if available.
- *
- * @param timeout - the timeout value in milliseconds. A zero timeount never expires, and the
- * call blocks indefinitely. A -1 timeout means receiveNoWait(): return the next message
- * or null if one is not immediately available. Returns null if the consumer is
- * concurrently closed.
- */
- public JBossMessage receive(long timeout) throws JMSException
- {
- JBossMessage m = null;
-
- synchronized (mainLock)
- {
- if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
-
- if (closed)
- {
- // If consumer is closed or closing calling receive returns null
- if (trace) { log.trace(this + " closed, returning null"); }
- return null;
- }
-
- if (listener != null)
- {
- throw new JMSException("The consumer has a MessageListener set, " +
- "cannot call receive(..)");
- }
-
- receiverThread = Thread.currentThread();
-
- long startTimestamp = System.currentTimeMillis();
-
- try
- {
- while(true)
- {
- if (timeout == 0)
- {
- if (trace) { log.trace(this + ": receive, no timeout"); }
-
- m = getMessage(0);
-
- if (m == null)
- {
- return null;
- }
- }
- else if (timeout == -1)
- {
- //ReceiveNoWait
- if (trace) { log.trace(this + ": receive, noWait"); }
-
- m = getMessage(-1);
-
- if (m == null)
- {
- if (trace) { log.trace(this + ": no message available"); }
- return null;
- }
- }
- else
- {
- if (trace) { log.trace(this + ": receive, timeout " + timeout + " ms, blocking poll on queue"); }
-
- m = getMessage(timeout);
-
- if (m == null)
- {
- // timeout expired
- if (trace) { log.trace(this + ": " + timeout + " ms timeout expired"); }
-
- return null;
- }
- }
-
- if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
-
- boolean ignore =
- checkExpiredOrReachedMaxdeliveries(m, sessionDelegate, maxDeliveries, shouldAck);
-
- if (!isConnectionConsumer && !ignore)
- {
- DeliveryInfo info = new DeliveryInfo(m, consumerID, null, shouldAck);
-
- sessionDelegate.preDeliver(info);
-
- //If post deliver didn't succeed and acknowledgement mode is auto_ack
- //That means the ref wasn't acked since it couldn't be found.
- //In order to maintain at most once semantics we must therefore not return
- //the message
-
- ignore = !sessionDelegate.postDeliver();
-
- if (trace)
- {
- log.trace("Post deliver returned " + !ignore);
- }
-
- if (!ignore)
- {
- m.incDeliveryCount();
- }
- }
-
- if (!ignore)
- {
- if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
-
- break;
- }
-
- if (trace)
- {
- log.trace("Discarding message " + m);
- }
-
- // the message expired, so discard the message, adjust timeout and reenter the buffer
- if (timeout != 0)
- {
- timeout -= System.currentTimeMillis() - startTimestamp;
- if (timeout == 0)
- {
- // As 0 means waitForever, we make it noWait
- timeout = -1;
- }
-
- }
- }
- }
- finally
- {
- receiverThread = null;
- }
- }
-
- if (trace) { log.trace(this + " receive() returning " + m); }
-
- return m;
- }
-
- public MessageListener getMessageListener()
- {
- return listener;
- }
-
- public String toString()
- {
- return "ClientConsumer[" + consumerID + "]";
- }
-
- public String getConsumerId()
- {
- return consumerID;
- }
-
- public void setConsumerId(String consumerId)
- {
- this.consumerID = consumerId;
- }
-
- public void addToFrontOfBuffer(JBossMessage proxy) throws JMSException
- {
- synchronized (mainLock)
- {
- buffer.addFirst(proxy, proxy.getJMSPriority());
-
- consumeCount--;
-
- messageAdded();
- }
- }
-
- public long getRedeliveryDelay()
- {
- return redeliveryDelay;
- }
-
- public void pause()
- {
- synchronized (mainLock)
- {
- paused = true;
-
- sendChangeRateMessage(0f);
- }
- }
-
- public void resume()
- {
- synchronized (mainLock)
- {
- paused = false;
-
- if (firstTime)
- {
- consumeCount = 0;
-
- firstTime = false;
- }
- else
- {
- consumeCount = bufferSize / 3 - buffer.size();
- }
-
- sendChangeRateMessage(1f);
- }
- }
-
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private void checkSendChangeRate()
- {
- consumeCount++;
-
- if (!paused && consumeCount == bufferSize)
- {
- consumeCount = 0;
-
- sendChangeRateMessage(1.0f);
- }
- }
-
- /*
- * Wait for the last delivery to arrive
- */
- private void waitForLastDelivery(long id)
- {
- if (trace) { log.trace("Waiting for last delivery id " + id); }
-
- if (id == -1)
- {
- //No need to wait - nothing to wait for
- return;
- }
-
- synchronized (mainLock)
- {
- waitingForLastDelivery = true;
- try
- {
- long wait = WAIT_TIMEOUT;
- while (lastDeliveryId != id && wait > 0)
- {
- long start = System.currentTimeMillis();
- try
- {
- mainLock.wait(wait);
- }
- catch (InterruptedException e)
- {
- }
- wait -= (System.currentTimeMillis() - start);
- }
- if (trace && lastDeliveryId == id)
- {
- log.trace("Got last delivery");
- }
-
- if (lastDeliveryId != id)
- {
- log.warn("Timed out waiting for last delivery " + id + " got " + lastDeliveryId);
- }
- }
- finally
- {
- waitingForLastDelivery = false;
- }
- }
- }
-
- private void sendChangeRateMessage(float newRate)
- {
- try
- {
- // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
- // job to detect it and turn it into a remoting one way invocation.
- consumerDelegate.changeRate(newRate);
- }
- catch (JMSException e)
- {
- log.error("Failed to send changeRate message", e);
- }
- }
-
- private void waitForOnMessageToComplete()
- {
- // Wait for any onMessage() executions to complete
-
- if (Thread.currentThread().equals(sessionExecutor.getThread()))
- {
- // the current thread already closing this ClientConsumer (this happens when the
- // session is closed from within the MessageListener.onMessage(), for example), so no need
- // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
- return;
- }
-
- Future result = new Future();
-
- try
- {
- sessionExecutor.execute(new Closer(result));
-
- if (trace) { log.trace(this + " blocking wait for Closer execution"); }
- result.getResult();
- if (trace) { log.trace(this + " got Closer result"); }
- }
- catch (InterruptedException e)
- {
- }
- }
-
- private void queueRunner(ListenerRunner runner)
- {
- try
- {
- this.sessionExecutor.execute(runner);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- private void messageAdded()
- {
- boolean notified = false;
-
- if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning +
- " sessionExecutor:" + sessionExecutor); }
-
- // If we have a thread waiting on receive() we notify it
- if (receiverThread != null)
- {
- if (trace) { log.trace(this + " notifying receiver/waiter thread"); }
-
- mainLock.notifyAll();
-
- notified = true;
- }
- else if (listener != null)
- {
- // We have a message listener
- if (!listenerRunning)
- {
- listenerRunning = true;
-
- if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
-
- this.queueRunner(new ListenerRunner());
- }
-
- //TODO - Execute onMessage on same thread for even better throughput
- }
-
- // Make sure we notify any thread waiting for last delivery
- if (waitingForLastDelivery && !notified)
- {
- if (trace) { log.trace("Notifying"); }
-
- mainLock.notifyAll();
- }
- }
-
- private long waitOnLock(Object lock, long waitTime) throws InterruptedException
- {
- long start = System.currentTimeMillis();
-
- // Wait for last message to arrive
- lock.wait(waitTime);
-
- long waited = System.currentTimeMillis() - start;
-
- if (waited < waitTime)
- {
- waitTime = waitTime - waited;
-
- return waitTime;
- }
- else
- {
- return 0;
- }
- }
-
- private JBossMessage getMessage(long timeout)
- {
- if (timeout == -1)
- {
- // receiveNoWait so don't wait
- }
- else
- {
- try
- {
- if (timeout == 0)
- {
- // wait for ever potentially
- while (!closed && buffer.isEmpty())
- {
- if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
-
- mainLock.wait();
-
- if (trace) { log.trace(this + " done waiting on main lock"); }
- }
- }
- else
- {
- // wait with timeout
- long toWait = timeout;
-
- while (!closed && buffer.isEmpty() && toWait > 0)
- {
- if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
-
- toWait = waitOnLock(mainLock, toWait);
-
- if (trace) { log.trace(this + " done waiting on lock, buffer is " + (buffer.isEmpty() ? "" : "NOT ") + "empty"); }
- }
- }
- }
- catch (InterruptedException e)
- {
- if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning null"); }
- return null;
- }
- }
-
- JBossMessage m = null;
-
- if (!closed && !buffer.isEmpty())
- {
- m = (JBossMessage)buffer.removeFirst();
-
- checkSendChangeRate();
- }
-
- return m;
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
- /*
- * This class is used to put on the listener executor to wait for onMessage
- * invocations to complete when closing
- */
- private class Closer implements Runnable
- {
- Future result;
-
- Closer(Future result)
- {
- this.result = result;
- }
-
- public void run()
- {
- if (trace) { log.trace("Closer starts running"); }
-
- result.setResult(null);
-
- if (trace) { log.trace("Closer finished run"); }
- }
- }
-
- /*
- * This class handles the execution of onMessage methods
- */
- private class ListenerRunner implements Runnable
- {
- public void run()
- {
- JBossMessage msg = null;
-
- MessageListener theListener = null;
-
- synchronized (mainLock)
- {
- if (listener == null || buffer.isEmpty())
- {
- listenerRunning = false;
-
- if (trace) { log.trace("no listener or buffer is empty, returning"); }
-
- return;
- }
-
- theListener = listener;
-
- // remove a message from the buffer
-
- msg = (JBossMessage)buffer.removeFirst();
-
- checkSendChangeRate();
- }
-
- /*
- * Bug here is as follows:
- * The next runner gets scheduled BEFORE the on message is executed
- * so if the onmessage fails on acking it will be put on hold
- * and failover will kick in, this will clear the executor
- * so the next queud one disappears at everything grinds to a halt
- *
- * Solution - don't use a session executor - have a sesion thread instead much nicer
- */
-
- if (msg != null)
- {
- try
- {
- callOnMessage(sessionDelegate, theListener, consumerID,
- false, msg, ackMode, maxDeliveries, null, shouldAck);
-
- if (trace) { log.trace("Called callonMessage"); }
- }
- catch (Throwable t)
- {
- log.error("Failed to deliver message", t);
- }
- }
-
- synchronized (mainLock)
- {
- if (!buffer.isEmpty())
- {
- //Queue up the next runner to run
-
- if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
-
- queueRunner(this);
-
- if (trace) { log.trace("Queued next onMessage to run"); }
- }
- else
- {
- if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
-
- listenerRunning = false;
- }
- }
-
- if (trace) { log.trace("Exiting run()"); }
- }
- }
-}
-
-
-
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -23,27 +23,37 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Session;
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.exception.MessagingShutdownException;
-import org.jboss.messaging.util.Logger;
+import org.jboss.jms.message.JBossMessage;
import org.jboss.messaging.core.Destination;
import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.PriorityLinkedList;
+import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
+import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
-import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.Logger;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
/**
- * The client-side Consumer delegate class.
+ * The client-side ClientConsumer delegate class.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -54,13 +64,18 @@
*
* $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
*/
-public class ClientConsumerImpl extends CommunicationSupport<ClientConsumerImpl> implements Consumer
+public class ClientConsumerImpl extends CommunicationSupport<ClientConsumerImpl> implements ClientConsumer
{
// Constants ------------------------------------------------------------------------------------
private static final long serialVersionUID = 3253922610778321868L;
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private static final int WAIT_TIMEOUT = 30000;
+
// Attributes -----------------------------------------------------------------------------------
@@ -68,33 +83,149 @@
private int bufferSize;
private int maxDeliveries;
private long redeliveryDelay;
-
- // State attributes -----------------------------------------------------------------------------
-
- private String consumerID;
private Destination destination;
private String selector;
private String subscriptionName;
private boolean noLocal;
private boolean isConnectionConsumer;
- private ClientConsumer clientConsumer;
private boolean storingDeliveries;
+ private PriorityLinkedList<JBossMessage> buffer = new PriorityLinkedListImpl<JBossMessage>(10);
+ private volatile Thread receiverThread;
+ private MessageListener listener;
+ private int ackMode;
+ private boolean closed;
+ private Object mainLock = new Object();
+ private QueuedExecutor sessionExecutor;
+ private boolean listenerRunning;
+ private long lastDeliveryId = -1;
+ private boolean waitingForLastDelivery;
+ private boolean shouldAck;
+ private boolean paused;
+ private int consumeCount;
+ private boolean firstTime = true;
+
+
// Static ---------------------------------------------------------------------------------------
+
+ private static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
+ ClientSession del,
+ int maxDeliveries, boolean shouldCancel)
+ {
+ Message msg = jbm.getCoreMessage();
- // Constructors ---------------------------------------------------------------------------------
- public ClientConsumerImpl(String objectID, int bufferSize, int maxDeliveries, long redeliveryDelay)
- {
- super(objectID);
- this.bufferSize = bufferSize;
- this.maxDeliveries = maxDeliveries;
- this.redeliveryDelay = redeliveryDelay;
+ boolean expired = msg.isExpired();
+
+ boolean reachedMaxDeliveries = jbm.getDeliveryCount() == maxDeliveries;
+
+ if (expired || reachedMaxDeliveries)
+ {
+ if (trace)
+ {
+ if (expired)
+ {
+ log.trace(msg + " has expired, cancelling to server");
+ }
+ else
+ {
+ log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
+ }
+ }
+
+ if (shouldCancel)
+ {
+ final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
+ expired, reachedMaxDeliveries);
+ try
+ {
+ del.cancelDelivery(cancel);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to cancel delivery", e);
+ }
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- public ClientConsumerImpl(ClientSession session, String objectID, int bufferSize, int maxDeliveries, long redeliveryDelay,
- Destination dest,
- String selector, boolean noLocal, String subscriptionName, String consumerID,
- boolean isCC)
+ // This is static so it can be called by the asf layer too
+ public static void callOnMessage(ClientSession sess,
+ MessageListener listener,
+ String consumerID,
+ boolean isConnectionConsumer,
+ JBossMessage m,
+ int ackMode,
+ int maxDeliveries,
+ ClientSession connectionConsumerSession,
+ boolean shouldAck)
+ throws JMSException
+ {
+ if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ {
+ // Message has been cancelled
+ return;
+ }
+
+ DeliveryInfo deliveryInfo =
+ new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
+
+ m.incDeliveryCount();
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+ // add anything to the tx for this session.
+ if (!isConnectionConsumer)
+ {
+ // We need to call preDeliver, deliver the message then call postDeliver - this is because
+ // it is legal to call session.recover(), or session.rollback() from within the onMessage()
+ // method in which case the last message needs to be delivered so it needs to know about it
+ sess.preDeliver(deliveryInfo);
+ }
+
+ try
+ {
+ if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+ listener.onMessage(m);
+
+ if (trace) { log.trace("listener's onMessage() finished"); }
+ }
+ catch (RuntimeException e)
+ {
+ log.error("RuntimeException was thrown from onMessage, " + m.getJMSMessageID() + " will be redelivered", e);
+
+ // See JMS 1.1 spec 4.5.2
+
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ sess.recover();
+ }
+ }
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge
+ //or add anything to the tx for this session
+ if (!isConnectionConsumer)
+ {
+ if (trace) { log.trace("Calling postDeliver"); }
+
+ sess.postDeliver();
+
+ if (trace) { log.trace("Called postDeliver"); }
+ }
+ }
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ClientConsumerImpl(ClientSession session, String objectID, int bufferSize,
+ int maxDeliveries, long redeliveryDelay,
+ Destination dest,
+ String selector, boolean noLocal, String subscriptionName,
+ boolean isCC, QueuedExecutor sessionExecutor)
{
super(objectID);
this.session = session;
@@ -105,16 +236,11 @@
this.selector = selector;
this.noLocal = noLocal;
this.subscriptionName = subscriptionName;
- this.consumerID = consumerID;
this.isConnectionConsumer = isCC;
+ this.sessionExecutor = sessionExecutor;
+ this.shouldAck = !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);
}
- public ClientConsumerImpl()
- {
- }
-
- // DelegateSupport overrides --------------------------------------------------------------------
-
@Override
protected byte getVersion()
{
@@ -145,17 +271,12 @@
// First we call close on the ClientConsumer which waits for onMessage invocations
// to complete and the last delivery to arrive
- getClientConsumer().close(lastDeliveryId);
+ close(lastDeliveryId);
- session.removeCallbackHandler(getClientConsumer());
+ PacketDispatcher.client.unregister(id);
- CallbackManager cm = session.getConnection().getRemotingConnection().getCallbackManager();
- cm.unregisterHandler(getConsumerID());
-
- PacketDispatcher.client.unregister(getConsumerID());
-
//And then we cancel any messages still in the message callback handler buffer
- getClientConsumer().cancelBuffer();
+ cancelBuffer();
return lastDeliveryId;
@@ -167,11 +288,10 @@
if (proxiedException instanceof MessagingShutdownException /* ||
(connectionState.getFailoverCommandCenter() == null ) */ )
-
{
- if (!getClientConsumer().isClosed())
+ if (!this.isClosed())
{
- getClientConsumer().close(-1);
+ close(-1);
}
}
JMSException ex = new JMSException(proxiedException.toString());
@@ -187,44 +307,25 @@
return response.getID();
}
- // ConsumerDelegate implementation --------------------------------------------------------------
-
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
public void changeRate(float newRate) throws JMSException
{
sendOneWay(new ChangeRateMessage(newRate));
}
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
+
public MessageListener getMessageListener()
{
- return getClientConsumer().getMessageListener();
+ return this.listener;
}
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public Message receive(long timeout) throws JMSException
- {
- return getClientConsumer().receive(timeout);
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void setMessageListener(MessageListener listener) throws JMSException
- {
- getClientConsumer().setMessageListener(listener);
- }
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
public boolean getNoLocal()
{
return this.noLocal;
@@ -294,55 +395,690 @@
return redeliveryDelay;
}
- public String getConsumerID()
+ public boolean isConnectionConsumer()
{
- return consumerID;
+ return isConnectionConsumer;
}
+
+ public String getSubscriptionName()
+ {
+ return subscriptionName;
+ }
- public boolean isConnectionConsumer()
+ public void setSubscriptionName(String subscriptionName)
{
- return isConnectionConsumer;
+ this.subscriptionName = subscriptionName;
}
- public void setClientConsumer(ClientConsumer handler)
+ public boolean isStoringDeliveries()
{
- this.clientConsumer = handler;
+ return storingDeliveries;
}
+
+ // Protected ------------------------------------------------------------------------------------
- public ClientConsumer getClientConsumer()
+ // Package Private ------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner Classes --------------------------------------------------------------------------------
+
+ public void handleMessage(final JBossMessage message) throws Exception
{
- return clientConsumer;
+ synchronized (mainLock)
+ {
+ if (closed)
+ {
+ // Sanity - this should never happen - we should always wait for all deliveries to arrive
+ // when closing
+ throw new IllegalStateException(this + " is closed, so ignoring message");
+ }
+
+ message.setSessionDelegate(session, isConnectionConsumer);
+
+ message.doBeforeReceive();
+
+ //Add it to the buffer
+ buffer.addLast(message, message.getJMSPriority());
+
+ lastDeliveryId = message.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+ messageAdded();
+ }
}
- public String getSubscriptionName()
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+ synchronized (mainLock)
+ {
+ if (receiverThread != null)
+ {
+ // Should never happen
+ throw new IllegalStateException("ClientConsumer is currently in receive(..). " +
+ "Cannot set MessageListener");
+ }
+
+ this.listener = listener;
+
+ if (listener != null && !buffer.isEmpty())
+ {
+ listenerRunning = true;
+
+ this.queueRunner(new ListenerRunner());
+ }
+ }
+ }
+
+ public void cancelBuffer() throws JMSException
{
- return subscriptionName;
+ if (trace) { log.trace("Cancelling buffer: " + buffer.size()); }
+
+ synchronized (mainLock)
+ {
+ // Now we cancel anything left in the buffer. The reason we do this now is that otherwise
+ // the deliveries wouldn't get cancelled until session close (since we don't cancel
+ // consumer's deliveries until then), which is too late - since we need to preserve the
+ // order of messages delivered in a session.
+
+ if (shouldAck && !buffer.isEmpty())
+ {
+ // Now we cancel any deliveries that might be waiting in our buffer. This is because
+ // otherwise the messages wouldn't get cancelled until the corresponding session died.
+ // So if another consumer in another session tried to consume from the channel before
+ // that session died it wouldn't receive those messages.
+ // We can't just cancel all the messages in the SCE since some of those messages might
+ // have actually been delivered (unlike these) and we may want to acknowledge them
+ // later, after this consumer has been closed
+
+ List cancels = new ArrayList();
+
+ for(Iterator i = buffer.iterator(); i.hasNext();)
+ {
+ JBossMessage mp = (JBossMessage)i.next();
+
+ CancelImpl cancel =
+ new CancelImpl(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
+
+ cancels.add(cancel);
+ }
+
+ if (trace) { log.trace("Calling cancelDeliveries"); }
+ session.cancelDeliveries(cancels);
+ if (trace) { log.trace("Done call"); }
+
+ buffer.clear();
+ }
+ }
}
+
+ public void close(long lastDeliveryId) throws JMSException
+ {
+ log.trace(this + " close");
+
+ //Wait for the last delivery to arrive
+ waitForLastDelivery(lastDeliveryId);
+
+ //Important! We set the listener to null so the next ListenerRunner won't run
+ if (listener != null)
+ {
+ setMessageListener(null);
+ }
+
+ //Now we wait for any current listener runners to run.
+ waitForOnMessageToComplete();
+
+ synchronized (mainLock)
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ closed = true;
+
+ if (receiverThread != null)
+ {
+ // Wake up any receive() thread that might be waiting
+ mainLock.notify();
+ }
+
+ this.listener = null;
+ }
+
+ if (trace) { log.trace(this + " closed"); }
+ }
+
+ /**
+ * Method used by the client thread to get a Message, if available.
+ *
+ * @param timeout - the timeout value in milliseconds. A zero timeount never expires, and the
+ * call blocks indefinitely. A -1 timeout means receiveNoWait(): return the next message
+ * or null if one is not immediately available. Returns null if the consumer is
+ * concurrently closed.
+ */
+ public JBossMessage receive(long timeout) throws JMSException
+ {
+ JBossMessage m = null;
+
+ synchronized (mainLock)
+ {
+ if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
+
+ if (closed)
+ {
+ // If consumer is closed or closing calling receive returns null
+ if (trace) { log.trace(this + " closed, returning null"); }
+ return null;
+ }
+
+ if (listener != null)
+ {
+ throw new JMSException("The consumer has a MessageListener set, " +
+ "cannot call receive(..)");
+ }
+
+ receiverThread = Thread.currentThread();
+
+ long startTimestamp = System.currentTimeMillis();
+
+ try
+ {
+ while(true)
+ {
+ if (timeout == 0)
+ {
+ if (trace) { log.trace(this + ": receive, no timeout"); }
+
+ m = getMessage(0);
+
+ if (m == null)
+ {
+ return null;
+ }
+ }
+ else if (timeout == -1)
+ {
+ //ReceiveNoWait
+ if (trace) { log.trace(this + ": receive, noWait"); }
+
+ m = getMessage(-1);
+
+ if (m == null)
+ {
+ if (trace) { log.trace(this + ": no message available"); }
+ return null;
+ }
+ }
+ else
+ {
+ if (trace) { log.trace(this + ": receive, timeout " + timeout + " ms, blocking poll on queue"); }
+
+ m = getMessage(timeout);
+
+ if (m == null)
+ {
+ // timeout expired
+ if (trace) { log.trace(this + ": " + timeout + " ms timeout expired"); }
+
+ return null;
+ }
+ }
+
+ if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
+
+ boolean ignore =
+ checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries, shouldAck);
+
+ if (!isConnectionConsumer && !ignore)
+ {
+ DeliveryInfo info = new DeliveryInfo(m, id, null, shouldAck);
+
+ session.preDeliver(info);
+
+ //If post deliver didn't succeed and acknowledgement mode is auto_ack
+ //That means the ref wasn't acked since it couldn't be found.
+ //In order to maintain at most once semantics we must therefore not return
+ //the message
+
+ ignore = !session.postDeliver();
+
+ if (trace)
+ {
+ log.trace("Post deliver returned " + !ignore);
+ }
+
+ if (!ignore)
+ {
+ m.incDeliveryCount();
+ }
+ }
+
+ if (!ignore)
+ {
+ if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
+
+ break;
+ }
+
+ if (trace)
+ {
+ log.trace("Discarding message " + m);
+ }
+
+ // the message expired, so discard the message, adjust timeout and reenter the buffer
+ if (timeout != 0)
+ {
+ timeout -= System.currentTimeMillis() - startTimestamp;
+ if (timeout == 0)
+ {
+ // As 0 means waitForever, we make it noWait
+ timeout = -1;
+ }
- public void setSubscriptionName(String subscriptionName)
+ }
+ }
+ }
+ finally
+ {
+ receiverThread = null;
+ }
+ }
+
+ if (trace) { log.trace(this + " receive() returning " + m); }
+
+ return m;
+ }
+
+ public void addToFrontOfBuffer(JBossMessage proxy) throws JMSException
{
- this.subscriptionName = subscriptionName;
+ synchronized (mainLock)
+ {
+ buffer.addFirst(proxy, proxy.getJMSPriority());
+
+ consumeCount--;
+
+ messageAdded();
+ }
}
- public boolean isStoringDeliveries()
+ public void pause()
{
- return storingDeliveries;
+ synchronized (mainLock)
+ {
+ paused = true;
+
+ sendChangeRateMessage(0f);
+ }
}
+
+ public void resume()
+ {
+ synchronized (mainLock)
+ {
+ paused = false;
+
+ if (firstTime)
+ {
+ consumeCount = 0;
+
+ firstTime = false;
+ }
+ else
+ {
+ consumeCount = bufferSize / 3 - buffer.size();
+ }
+
+ sendChangeRateMessage(1f);
+ }
+ }
public boolean isShouldAck()
{
- //If e are a non durable subscriber to a topic then there is no need
- //to send acks to the server - we wouldn't have stored them on the server side anyway
+ return this.shouldAck;
+ }
+
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void checkSendChangeRate()
+ {
+ consumeCount++;
- return !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);
+ if (!paused && consumeCount == bufferSize)
+ {
+ consumeCount = 0;
+
+ sendChangeRateMessage(1.0f);
+ }
}
+
+ /*
+ * Wait for the last delivery to arrive
+ */
+ private void waitForLastDelivery(long id)
+ {
+ if (trace) { log.trace("Waiting for last delivery id " + id); }
+
+ if (id == -1)
+ {
+ //No need to wait - nothing to wait for
+ return;
+ }
+
+ synchronized (mainLock)
+ {
+ waitingForLastDelivery = true;
+ try
+ {
+ long wait = WAIT_TIMEOUT;
+ while (lastDeliveryId != id && wait > 0)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ mainLock.wait(wait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ wait -= (System.currentTimeMillis() - start);
+ }
+ if (trace && lastDeliveryId == id)
+ {
+ log.trace("Got last delivery");
+ }
+
+ if (lastDeliveryId != id)
+ {
+ log.warn("Timed out waiting for last delivery " + id + " got " + lastDeliveryId);
+ }
+ }
+ finally
+ {
+ waitingForLastDelivery = false;
+ }
+ }
+ }
+
+ private void sendChangeRateMessage(float newRate)
+ {
+ try
+ {
+ // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+ // job to detect it and turn it into a remoting one way invocation.
+ changeRate(newRate);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to send changeRate message", e);
+ }
+ }
+
+ private void waitForOnMessageToComplete()
+ {
+ // Wait for any onMessage() executions to complete
+
+ if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ {
+ // the current thread already closing this ClientConsumer (this happens when the
+ // session is closed from within the MessageListener.onMessage(), for example), so no need
+ // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+ return;
+ }
+
+ Future result = new Future();
+
+ try
+ {
+ sessionExecutor.execute(new Closer(result));
+
+ if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+ result.getResult();
+ if (trace) { log.trace(this + " got Closer result"); }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ private void queueRunner(ListenerRunner runner)
+ {
+ try
+ {
+ this.sessionExecutor.execute(runner);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ private void messageAdded()
+ {
+ boolean notified = false;
+
+ if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning +
+ " sessionExecutor:" + sessionExecutor); }
+
+ // If we have a thread waiting on receive() we notify it
+ if (receiverThread != null)
+ {
+ if (trace) { log.trace(this + " notifying receiver/waiter thread"); }
+
+ mainLock.notifyAll();
+
+ notified = true;
+ }
+ else if (listener != null)
+ {
+ // We have a message listener
+ if (!listenerRunning)
+ {
+ listenerRunning = true;
+
+ if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+
+ this.queueRunner(new ListenerRunner());
+ }
+
+ //TODO - Execute onMessage on same thread for even better throughput
+ }
+
+ // Make sure we notify any thread waiting for last delivery
+ if (waitingForLastDelivery && !notified)
+ {
+ if (trace) { log.trace("Notifying"); }
+
+ mainLock.notifyAll();
+ }
+ }
+
+ private long waitOnLock(Object lock, long waitTime) throws InterruptedException
+ {
+ long start = System.currentTimeMillis();
+
+ // Wait for last message to arrive
+ lock.wait(waitTime);
- // Protected ------------------------------------------------------------------------------------
+ long waited = System.currentTimeMillis() - start;
+
+ if (waited < waitTime)
+ {
+ waitTime = waitTime - waited;
+
+ return waitTime;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ private JBossMessage getMessage(long timeout)
+ {
+ if (timeout == -1)
+ {
+ // receiveNoWait so don't wait
+ }
+ else
+ {
+ try
+ {
+ if (timeout == 0)
+ {
+ // wait for ever potentially
+ while (!closed && buffer.isEmpty())
+ {
+ if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
- // Package Private ------------------------------------------------------------------------------
+ mainLock.wait();
- // Private --------------------------------------------------------------------------------------
+ if (trace) { log.trace(this + " done waiting on main lock"); }
+ }
+ }
+ else
+ {
+ // wait with timeout
+ long toWait = timeout;
+
+ while (!closed && buffer.isEmpty() && toWait > 0)
+ {
+ if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
- // Inner Classes --------------------------------------------------------------------------------
+ toWait = waitOnLock(mainLock, toWait);
+ if (trace) { log.trace(this + " done waiting on lock, buffer is " + (buffer.isEmpty() ? "" : "NOT ") + "empty"); }
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning null"); }
+ return null;
+ }
+ }
+
+ JBossMessage m = null;
+
+ if (!closed && !buffer.isEmpty())
+ {
+ m = (JBossMessage)buffer.removeFirst();
+
+ checkSendChangeRate();
+ }
+
+ return m;
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ /*
+ * This class is used to put on the listener executor to wait for onMessage
+ * invocations to complete when closing
+ */
+ private class Closer implements Runnable
+ {
+ Future result;
+
+ Closer(Future result)
+ {
+ this.result = result;
+ }
+
+ public void run()
+ {
+ if (trace) { log.trace("Closer starts running"); }
+
+ result.setResult(null);
+
+ if (trace) { log.trace("Closer finished run"); }
+ }
+ }
+
+ /*
+ * This class handles the execution of onMessage methods
+ */
+ private class ListenerRunner implements Runnable
+ {
+ public void run()
+ {
+ JBossMessage msg = null;
+
+ MessageListener theListener = null;
+
+ synchronized (mainLock)
+ {
+ if (listener == null || buffer.isEmpty())
+ {
+ listenerRunning = false;
+
+ if (trace) { log.trace("no listener or buffer is empty, returning"); }
+
+ return;
+ }
+
+ theListener = listener;
+
+ // remove a message from the buffer
+
+ msg = (JBossMessage)buffer.removeFirst();
+
+ checkSendChangeRate();
+ }
+
+ /*
+ * Bug here is as follows:
+ * The next runner gets scheduled BEFORE the on message is executed
+ * so if the onmessage fails on acking it will be put on hold
+ * and failover will kick in, this will clear the executor
+ * so the next queud one disappears at everything grinds to a halt
+ *
+ * Solution - don't use a session executor - have a sesion thread instead much nicer
+ */
+
+ if (msg != null)
+ {
+ try
+ {
+ callOnMessage(session, theListener, id,
+ false, msg, ackMode, maxDeliveries, null, shouldAck);
+
+ if (trace) { log.trace("Called callonMessage"); }
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to deliver message", t);
+ }
+ }
+
+ synchronized (mainLock)
+ {
+ if (!buffer.isEmpty())
+ {
+ //Queue up the next runner to run
+
+ if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+
+ queueRunner(this);
+
+ if (trace) { log.trace("Queued next onMessage to run"); }
+ }
+ else
+ {
+ if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+
+ listenerRunning = false;
+ }
+ }
+
+ if (trace) { log.trace("Exiting run()"); }
+ }
+ }
+
+
}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,5 +1,6 @@
package org.jboss.jms.client.impl;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.message.JBossMessage;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
@@ -15,23 +16,18 @@
*/
public class ClientConsumerPacketHandler implements PacketHandler
{
- /**
- *
- */
- private final ClientConsumer messageHandler;
- /**
- *
- */
+ private final ClientConsumer clientConsumer;
+
private final String consumerID;
/**
* @param messageHandler
* @param consumerID
*/
- public ClientConsumerPacketHandler(ClientConsumer messageHandler,
+ public ClientConsumerPacketHandler(ClientConsumer clientConsumer,
String consumerID)
{
- this.messageHandler = messageHandler;
+ this.clientConsumer = clientConsumer;
this.consumerID = consumerID;
}
@@ -53,7 +49,7 @@
msg.doBeforeReceive();
- messageHandler.handleMessage(msg);
+ clientConsumer.handleMessage(msg);
}
} catch (Exception e)
{
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.IllegalStateException;
@@ -43,10 +42,9 @@
import org.jboss.jms.client.SelectorTranslator;
import org.jboss.jms.client.api.ClientBrowser;
import org.jboss.jms.client.api.ClientConnection;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.client.api.ClientProducer;
import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.api.Consumer;
-import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
@@ -60,7 +58,6 @@
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.messaging.core.Destination;
-import org.jboss.messaging.core.DestinationType;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.PacketDispatcher;
@@ -83,9 +80,7 @@
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
import org.jboss.messaging.util.ClearableQueuedExecutor;
-import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessageQueueNameHelper;
import org.jboss.messaging.util.ProxyFactory;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -120,13 +115,12 @@
private ClientConnection connection;
- // Attributes that used to live on SessionState -------------------------------------------------
+ protected Map<String, Closeable> children = new ConcurrentHashMap<String, Closeable>();
- protected Set<Closeable> children = new ConcurrentHashSet<Closeable>();
-
-
private int acknowledgeMode;
+
private boolean transacted;
+
private boolean xa;
private MessagingXAResource xaResource;
@@ -141,7 +135,7 @@
private List<Ack> clientAckList;
private DeliveryInfo autoAckInfo;
- private Map callbackHandlers = new ConcurrentHashMap();
+ //private Map callbackHandlers = new ConcurrentHashMap();
private LinkedList asfMessages = new LinkedList();
@@ -159,11 +153,6 @@
// Constructors ---------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
public ClientSessionImpl(ClientConnection connection, String objectID, int dupsOKBatchSize)
{
super(objectID);
@@ -204,10 +193,6 @@
{
}
- // DelegateSupport overrides --------------------------------------------------------------------
-
- // Closeable implementation ---------------------------------------------------------------------
-
public void close() throws JMSException
{
sendBlocking(new CloseMessage());
@@ -223,7 +208,6 @@
// We must explicitly shutdown the executor
getExecutor().shutdownNow();
-
}
private long invokeClosing(long sequence) throws JMSException
@@ -236,17 +220,19 @@
private void closeChildren() throws JMSException
{
- for (Closeable child: children)
+ for (Closeable child: children.values())
{
child.closing(-1);
child.close();
}
+
+ children.clear();
}
public long closing(long sequence) throws JMSException
{
if (trace) { log.trace("handleClosing()"); }
-
+
closeChildren();
//Sanity check
@@ -426,7 +412,7 @@
CreateBrowserResponse response = (CreateBrowserResponse) sendBlocking(request);
ClientBrowserImpl delegate = new ClientBrowserImpl(this, response.getBrowserID(), queue, messageSelector);
ClientBrowser proxy = (ClientBrowser)ProxyFactory.proxy(delegate, ClientBrowser.class);
- children.add(proxy);
+ children.put(delegate.getID(), proxy);
return proxy;
}
@@ -441,7 +427,7 @@
}
- public Consumer createConsumerDelegate(Destination destination, String selector,
+ public ClientConsumer createConsumerDelegate(Destination destination, String selector,
boolean noLocal, String subscriptionName,
boolean isCC) throws JMSException
{
@@ -451,46 +437,19 @@
CreateConsumerResponse response = (CreateConsumerResponse) sendBlocking(request);
- ClientConsumerImpl consumerDelegate = new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(), response.getMaxDeliveries(), response.getRedeliveryDelay(),
+ ClientConsumerImpl consumerDelegate =
+ new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),
+ response.getMaxDeliveries(), response.getRedeliveryDelay(),
destination,
- selector, noLocal, subscriptionName, response.getConsumerID(),isCC);
+ selector, noLocal, subscriptionName,
+ isCC, this.getExecutor());
- Consumer proxy = (Consumer)ProxyFactory.proxy(consumerDelegate, Consumer.class);
+ ClientConsumer proxy = (ClientConsumer)ProxyFactory.proxy(consumerDelegate, ClientConsumer.class);
- children.add(proxy);
+ children.put(consumerDelegate.getID(), proxy);
- //We need the queue name for recovering any deliveries after failover
- String queueName = null;
- if (subscriptionName != null)
- {
- // I have to use the clientID from connectionDelegate instead of connectionState...
- // this is because when a pre configured CF is used we need to get the clientID from
- // server side.
- // This was a condition verified by the TCK and it was fixed as part of
- // http://jira.jboss.com/jira/browse/JBMESSAGING-939
- queueName = MessageQueueNameHelper.
- createSubscriptionName(this.getID(),subscriptionName);
- }
- else if (destination.getType() == DestinationType.QUEUE)
- {
- queueName = destination.getName();
- }
+ PacketDispatcher.client.register(new ClientConsumerPacketHandler(consumerDelegate, consumerDelegate.getID()));
- final ClientConsumer messageHandler =
- new ClientConsumer(isCC, this.getAcknowledgeMode(),
- this, consumerDelegate, consumerDelegate.getID(), queueName,
- consumerDelegate.getBufferSize(), this.getExecutor(), consumerDelegate.getMaxDeliveries(), consumerDelegate.isShouldAck(),
- consumerDelegate.getRedeliveryDelay());
-
- this.addCallbackHandler(messageHandler);
-
- PacketDispatcher.client.register(new ClientConsumerPacketHandler(messageHandler, consumerDelegate.getID()));
-
- CallbackManager cm = connection.getRemotingConnection().getCallbackManager();
- cm.registerHandler(consumerDelegate.getID(), messageHandler);
-
- consumerDelegate.setClientConsumer(messageHandler);
-
//Now we have finished creating the client consumer, we can tell the SCD
//we are ready
consumerDelegate.changeRate(1);
@@ -550,7 +509,7 @@
ClientProducerImpl producerDelegate = new ClientProducerImpl(connection, this, destination );
ClientProducer proxy = (ClientProducer) ProxyFactory.proxy(producerDelegate, ClientProducer.class);
- children.add(proxy);
+ children.put(producerDelegate.getID(), proxy);
return proxy;
}
@@ -837,7 +796,7 @@
DeliveryInfo info = (DeliveryInfo)toRedeliver.get(i);
JBossMessage msg = info.getMessage();
- ClientConsumer handler = getCallbackHandler(info.getConsumerId());
+ ClientConsumer handler = (ClientConsumer)children.get(info.getConsumerId());
if (handler == null)
{
@@ -861,23 +820,6 @@
}
- public ClientConsumer getCallbackHandler(String consumerID)
- {
- return (ClientConsumer)callbackHandlers.get(consumerID);
- }
-
- public void addCallbackHandler(ClientConsumer handler)
- {
- callbackHandlers.put(handler.getConsumerId(), handler);
- }
-
- public void removeCallbackHandler(ClientConsumer handler)
- {
- callbackHandlers.remove(handler.getConsumerId());
- }
-
-
-
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
@@ -925,7 +867,7 @@
if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
- ClientConsumer.callOnMessage(this, getDistinguishedListener(), holder.consumerID,
+ ClientConsumerImpl.callOnMessage(this, getDistinguishedListener(), holder.consumerID,
false,
holder.msg, ackMode, holder.maxDeliveries,
holder.connectionConsumerDelegate, holder.shouldAck);
@@ -1294,16 +1236,16 @@
this.autoAckInfo = autoAckInfo;
}
- public Map getCallbackHandlers()
- {
- return callbackHandlers;
- }
+// public Map getCallbackHandlers()
+// {
+// return callbackHandlers;
+// }
+//
+// public void setCallbackHandlers(Map callbackHandlers)
+// {
+// this.callbackHandlers = callbackHandlers;
+// }
- public void setCallbackHandlers(Map callbackHandlers)
- {
- this.callbackHandlers = callbackHandlers;
- }
-
public LinkedList getAsfMessages()
{
return asfMessages;
Modified: trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -58,9 +58,7 @@
{
this("NO_ID_SET");
}
-
-
-
+
// Streamable implementation --------------------------------------------------------------------
public void read(DataInputStream in) throws Exception
Deleted: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,95 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.remoting;
-
-import java.util.Map;
-
-import org.jboss.jms.client.impl.ClientConsumer;
-import org.jboss.messaging.util.Logger;
-
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-/**
- * The CallbackManager is an InvocationHandler used for handling callbacks to message consumers.
- * The callback is received and dispatched off to the relevant consumer.
- *
- * There is one instance of this class per remoting connection - which is to a unique server -
- * therefore there is no need to add the server id to the key when doing look ups.
- *
- * TODO this class should be merged with use of PacketDispatcher.client instance and
- * ClientConsumerPacketHandler should wrap ClientConsumer class
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class CallbackManager
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(CallbackManager.class);
-
- public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
-
- // Static ---------------------------------------------------------------------------------------
-
- private static boolean trace = log.isTraceEnabled();
-
- // Attributes -----------------------------------------------------------------------------------
-
- protected Map<String, ClientConsumer> callbackHandlers;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public CallbackManager()
- {
- callbackHandlers = new ConcurrentReaderHashMap();
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void registerHandler(String consumerID, ClientConsumer handler)
- {
- callbackHandlers.put(consumerID, handler);
- }
-
- public ClientConsumer unregisterHandler(String consumerID)
- {
- return callbackHandlers.remove(consumerID);
- }
-
- public String toString()
- {
- return "CallbackManager[" + Integer.toHexString(hashCode()) + "]";
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -56,7 +56,7 @@
private Client client;
- private CallbackManager callbackManager;
+ //private CallbackManager callbackManager;
// When a failover is performed, this flag is set to true
protected boolean failed = false;
@@ -80,7 +80,7 @@
{
if (log.isTraceEnabled()) { log.trace(this + " created client"); }
- callbackManager = new CallbackManager();
+ //callbackManager = new CallbackManager();
NIOConnector connector = REGISTRY.getConnector(serverLocator);
client = new ClientImpl(connector, serverLocator);
@@ -118,12 +118,12 @@
return client;
}
- public CallbackManager getCallbackManager()
- {
- return callbackManager;
- }
+// public CallbackManager getCallbackManager()
+// {
+// return callbackManager;
+// }
+//
-
public synchronized boolean isFailed()
{
return failed;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -55,7 +55,7 @@
import org.jboss.messaging.util.Logger;
/**
- * Concrete implementation of a Consumer.
+ * Concrete implementation of a ClientConsumer.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -51,12 +51,11 @@
import javax.jms.JMSException;
import org.jboss.jms.client.api.ClientBrowser;
-import org.jboss.jms.client.api.Consumer;
import org.jboss.jms.client.impl.Ack;
+import org.jboss.jms.client.impl.AckImpl;
import org.jboss.jms.client.impl.Cancel;
import org.jboss.jms.client.impl.ClientBrowserImpl;
import org.jboss.jms.client.impl.ClientConsumerImpl;
-import org.jboss.jms.client.impl.AckImpl;
import org.jboss.jms.client.impl.DeliveryInfo;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
@@ -65,7 +64,6 @@
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.container.SecurityAspect;
import org.jboss.jms.server.security.CheckType;
-import org.jboss.messaging.util.Logger;
import org.jboss.messaging.core.Binding;
import org.jboss.messaging.core.Condition;
import org.jboss.messaging.core.Destination;
@@ -106,6 +104,7 @@
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessageQueueNameHelper;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
@@ -234,11 +233,11 @@
}
}
- public Consumer createConsumerDelegate(Destination destination,
- String filterString,
- boolean noLocal,
- String subscriptionName,
- boolean isCC) throws JMSException
+ public CreateConsumerResponse createConsumerDelegate(Destination destination,
+ String filterString,
+ boolean noLocal,
+ String subscriptionName,
+ boolean isCC) throws JMSException
{
checkSecurityCreateConsumerDelegate(destination, subscriptionName);
@@ -1199,10 +1198,10 @@
return false;
}
- private Consumer createConsumerDelegateInternal(Destination destination,
- String filterString,
- boolean noLocal,
- String subscriptionName)
+ private CreateConsumerResponse createConsumerDelegateInternal(Destination destination,
+ String filterString,
+ boolean noLocal,
+ String subscriptionName)
throws Exception
{
if (closed)
@@ -1381,7 +1380,7 @@
}
else
{
- // Consumer on a jms queue
+ // ClientConsumer on a jms queue
List<Binding> bindings = postOffice.getBindingsForQueueName(destination.getName());
@@ -1425,10 +1424,10 @@
// rep.put(queue.getName(), DUR_SUB_STATE_CONSUMERS);
// }
connectionEndpoint.getMessagingServer().getMinaService().getDispatcher().register(ep.newHandler());
+
+ CreateConsumerResponse response = new CreateConsumerResponse(consumerID, prefetchSize,
+ maxDeliveryAttemptsToUse, redeliveryDelayToUse );
- ClientConsumerImpl stub =
- new ClientConsumerImpl(consumerID, prefetchSize, maxDeliveryAttemptsToUse, redeliveryDelayToUse);
-
synchronized (consumers)
{
consumers.put(consumerID, ep);
@@ -1436,7 +1435,7 @@
log.trace(this + " created and registered " + ep);
- return stub;
+ return response;
}
private ClientBrowser createBrowserDelegateInternal(Destination destination,
@@ -1664,14 +1663,10 @@
} else if (type == REQ_CREATECONSUMER)
{
CreateConsumerRequest request = (CreateConsumerRequest) packet;
- ClientConsumerImpl consumer = (ClientConsumerImpl) createConsumerDelegate(
- request.getDestination(), request.getSelector(), request
+ response = createConsumerDelegate(
+ request.getDestination(), request.getSelector(), request
.isNoLocal(), request.getSubscriptionName(), request
.isConnectionConsumer());
-
- response = new CreateConsumerResponse(consumer.getID(), consumer
- .getBufferSize(), consumer.getMaxDeliveries(), consumer
- .getRedeliveryDelay());
} else if (type == REQ_CREATEDESTINATION)
{
CreateDestinationRequest request = (CreateDestinationRequest) packet;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -28,13 +28,13 @@
import org.jboss.jms.client.Closeable;
import org.jboss.jms.client.api.ClientBrowser;
-import org.jboss.jms.client.api.Consumer;
import org.jboss.jms.client.impl.Ack;
import org.jboss.jms.client.impl.Cancel;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.messaging.core.Destination;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -47,7 +47,7 @@
*/
public interface SessionEndpoint extends Closeable
{
- Consumer createConsumerDelegate(Destination destination, String selector,
+ CreateConsumerResponse createConsumerDelegate(Destination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer) throws JMSException;
Modified: trunk/src/main/org/jboss/messaging/core/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Consumer.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/Consumer.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -24,7 +24,7 @@
/**
*
- * A Consumer
+ * A ClientConsumer
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
Modified: trunk/src/main/org/jboss/messaging/core/HandleStatus.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/HandleStatus.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/HandleStatus.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,7 +29,7 @@
*
* NO_MATCH means the MessageReference was rejected by a Filter
*
- * BUSY means the MessageReference was rejected since the Consumer was busy
+ * BUSY means the MessageReference was rejected since the ClientConsumer was busy
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -516,7 +516,7 @@
if (status == null)
{
- throw new IllegalStateException("Consumer.handle() should never return null");
+ throw new IllegalStateException("ClientConsumer.handle() should never return null");
}
if (status == HandleStatus.HANDLED)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -56,7 +56,7 @@
MSG_CANCELDELIVERIES ((byte)56),
MSG_UNSUBSCRIBE ((byte)57),
- // Consumer
+ // ClientConsumer
MSG_CHANGERATE ((byte)70),
// Browser
Modified: trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -43,7 +43,7 @@
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.client.impl.ClientConnectionImpl;
import org.jboss.jms.client.impl.ClientConsumerImpl;
import org.jboss.jms.client.impl.ClientSessionImpl;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -365,7 +365,7 @@
// OK
}
- // Consumer
+ // ClientConsumer
try
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -2708,7 +2708,7 @@
assertEquals(Session.AUTO_ACKNOWLEDGE, sess3.getAcknowledgeMode());
MessageConsumer consumer3 = sess3.createConsumer(topic1, null, true);
- //Consumer 1 should not get the message but consumers 2 and 3 should
+ //ClientConsumer 1 should not get the message but consumers 2 and 3 should
conn1.start();
conn2.start();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2008-01-21 21:01:36 UTC (rev 3606)
@@ -154,7 +154,7 @@
else
if (finished.getWorker() instanceof SeveralClientsStressTest.Consumer)
{
- if (info) log.info("Scheduling new Consumer " + numberOfConsumers);
+ if (info) log.info("Scheduling new ClientConsumer " + numberOfConsumers);
SeveralClientsStressTest.Consumer consumer = new SeveralClientsStressTest.Consumer(numberOfConsumers++, testChannel);
threads.add(consumer);
consumer.start();
@@ -378,7 +378,7 @@
{
public Consumer(int consumerId, LinkedQueue messageQueue)
{
- super("Consumer:" + consumerId, consumerId, messageQueue);
+ super("ClientConsumer:" + consumerId, consumerId, messageQueue);
}
public void run()
@@ -395,7 +395,7 @@
Connection conn = cf.createConnection();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = sess.createConsumer(queue);
- if (info) log.info("Consumer was created");
+ if (info) log.info("ClientConsumer was created");
conn.start();
More information about the jboss-cvs-commits
mailing list