[jboss-cvs] JBoss Messaging SVN: r5122 - in trunk: src/main/org/jboss/messaging/core/client/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 16 13:23:43 EDT 2008
Author: ataylor
Date: 2008-10-16 13:23:43 -0400 (Thu, 16 Oct 2008)
New Revision: 5122
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1426 - first step of queue browser re implementation
Deleted: trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.client;
-
-import org.jboss.messaging.core.exception.MessagingException;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
- */
-public interface ClientBrowser
-{
- long getID();
-
- void reset() throws MessagingException;
-
- ClientMessage nextMessage() throws MessagingException;
-
- boolean hasNextMessage() throws MessagingException;
-
- void close() throws MessagingException;
-
- boolean isClosed();
-
- void cleanUp();
-}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -27,7 +27,7 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface ClientConsumer
{
@@ -45,5 +45,13 @@
boolean isClosed();
- boolean isDirect();
+ boolean isDirect();
+
+ boolean awaitMessage(long timeOut) throws Exception;
+
+ void stop() throws MessagingException;
+
+ void start() throws MessagingException;
+
+ void restart() throws MessagingException;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,21 +22,21 @@
package org.jboss.messaging.core.client;
-import javax.transaction.xa.XAResource;
-
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.XAResource;
+
/*
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface ClientSession extends XAResource
{
@@ -64,11 +64,12 @@
SimpleString filterString,
boolean direct,
int windowSize,
- int maxRate) throws MessagingException;
+ int maxRate,
+ boolean isBrowser) throws MessagingException;
- ClientBrowser createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
+ ClientConsumer createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
- ClientBrowser createBrowser(SimpleString queueName) throws MessagingException;
+ ClientConsumer createBrowser(SimpleString queueName) throws MessagingException;
ClientProducer createProducer(SimpleString address) throws MessagingException;
Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.client.impl;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
- * @version <tt>$Revision: 3602 $</tt> $Id: ClientBrowserImpl.java 3602 2008-01-21 17:48:32Z timfox $
- */
-public class ClientBrowserImpl implements ClientBrowser
-{
- // Constants ------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private final long id;
-
- private final ClientSessionInternal session;
-
- private final Channel channel;
-
- private volatile boolean closed;
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- public ClientBrowserImpl(final ClientSessionInternal session, final long id, final Channel channel)
- {
- this.id = id;
-
- this.session = session;
-
- this.channel = channel;
- }
-
- // ClientBrowser implementation -----------------------------------------------------------------
-
- public long getID()
- {
- return id;
- }
-
- public synchronized void close() throws MessagingException
- {
- if (closed)
- {
- return;
- }
-
- try
- {
- channel.sendBlocking(new SessionBrowserCloseMessage(id));
- }
- finally
- {
- session.removeBrowser(this);
-
- closed = true;
- }
- }
-
- public synchronized void cleanUp()
- {
- session.removeBrowser(this);
-
- closed = true;
- }
-
- public boolean isClosed()
- {
- return closed;
- }
-
- public void reset() throws MessagingException
- {
- checkClosed();
-
- channel.sendBlocking(new SessionBrowserResetMessage(id));
- }
-
- public boolean hasNextMessage() throws MessagingException
- {
- checkClosed();
-
- SessionBrowserHasNextMessageResponseMessage response = (SessionBrowserHasNextMessageResponseMessage)channel.sendBlocking(new SessionBrowserHasNextMessageMessage(id));
-
- return response.hasNext();
- }
-
- public ClientMessage nextMessage() throws MessagingException
- {
- checkClosed();
-
- SessionBrowseMessage response = (SessionBrowseMessage)channel.sendBlocking(new SessionBrowserNextMessageMessage(id));
-
- return response.getClientMessage();
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Package Private ------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private void checkClosed() throws MessagingException
- {
- if (closed)
- {
- throw new MessagingException(MessagingException.OBJECT_CLOSED, "Browser is closed");
- }
- }
-
- // Inner Classes --------------------------------------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -12,10 +12,6 @@
package org.jboss.messaging.core.client.impl;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
@@ -23,14 +19,20 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
import org.jboss.messaging.util.Future;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version <tt>$Revision: 3603 $</tt> $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
*/
public class ClientConsumerImpl implements ClientConsumerInternal
@@ -63,6 +65,8 @@
private final Runner runner = new Runner();
+ private final boolean isBrowser;
+
private volatile Thread receiverThread;
private volatile Thread onMessageThread;
@@ -74,6 +78,8 @@
private volatile int creditsToSend;
private boolean cleared;
+
+ private boolean messagesWaiting = true;
// Constructors
// ---------------------------------------------------------------------------------
@@ -83,7 +89,7 @@
final int clientWindowSize,
final boolean direct,
final Executor executor,
- final Channel channel)
+ final Channel channel, final boolean isBrowser)
{
this.id = id;
@@ -96,6 +102,8 @@
this.clientWindowSize = clientWindowSize;
this.direct = direct;
+
+ this.isBrowser = isBrowser;
}
// ClientConsumer implementation
@@ -202,6 +210,11 @@
{
checkClosed();
+ if(isBrowser)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ "Cannot set MessageHandler - consumer is in browser mode");
+ }
if (receiverThread != null)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,
@@ -239,6 +252,15 @@
}
}
+ public void deliveryComplete()
+ {
+ synchronized (this)
+ {
+ messagesWaiting = false;
+ notify();
+ }
+ }
+
public boolean isClosed()
{
return closed;
@@ -249,6 +271,84 @@
return direct;
}
+ /**
+ * if there are messages in the buffer then we just return true. If we have received all of the messages being sent we
+ * return false. If there are no messages in the buffer and still some in transit then we wait until they have been delivered.
+ * @return
+ * @throws Exception
+ */
+ public boolean awaitMessage(long timeOut) throws Exception
+ {
+ if(!buffer.isEmpty())
+ {
+ return true;
+ }
+ else
+ {
+ //we only need to syncronize if the buffer is empty
+ synchronized (this)
+ {
+ if(!buffer.isEmpty())
+ {
+ return true;
+ }
+ if(messagesWaiting)
+ {
+ wait(timeOut);
+ }
+ return !buffer.isEmpty();
+ }
+ }
+ }
+
+ public void stop() throws MessagingException
+ {
+ if(!isBrowser)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ "Cannot stop Consumer in non browser mode");
+ }
+ synchronized (this)
+ {
+ //if there are still messages in transit tell the server to stop and wait
+ if(messagesWaiting)
+ {
+ //tell the server to stop
+ channel.send(new SessionConsumerStopMessage(id));
+ do
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException(e.getMessage());
+ }
+ }
+ while(messagesWaiting);
+ }
+ buffer.clear();
+ }
+ }
+
+ public void start() throws MessagingException
+ {
+ if(!isBrowser)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ "Cannot stop Consumer in non browser mode");
+ }
+ messagesWaiting = true;
+ channel.send(new SessionConsumerStartMessage(id));
+ }
+
+ public void restart() throws MessagingException
+ {
+ stop();
+ start();
+ }
+
// ClientConsumerInternal implementation
// --------------------------------------------------------------
@@ -337,7 +437,7 @@
{
return creditsToSend;
}
-
+
// Public
// ---------------------------------------------------------------------------------------
@@ -469,6 +569,7 @@
{
synchronized (this)
{
+ messagesWaiting = false;
// Wake up any receive() thread that might be waiting
notify();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -30,6 +30,7 @@
* A ClientConsumerInternal
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="andy.taylor at jboss.org">Andy Taylor</a>
*
*/
public interface ClientConsumerInternal extends ClientConsumer
@@ -49,4 +50,6 @@
int getCreditsToSend();
void cleanUp() throws Exception;
+
+ void deliveryComplete();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -21,20 +21,6 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -56,7 +42,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
@@ -90,6 +75,18 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -99,7 +96,7 @@
*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* @version <tt>$Revision: 3603 $</tt> $Id: ClientSessionImpl.java 3603 2008-01-21 18:49:20Z timfox $
*
@@ -132,8 +129,6 @@
private volatile RemotingConnection remotingConnection;
- private final Map<Long, ClientBrowser> browsers = new ConcurrentHashMap<Long, ClientBrowser>();
-
private final Map<Long, ClientProducerInternal> producers = new ConcurrentHashMap<Long, ClientProducerInternal>();
private final Map<Long, ClientConsumerInternal> consumers = new ConcurrentHashMap<Long, ClientConsumerInternal>();
@@ -296,21 +291,24 @@
filterString,
direct,
connectionFactory.getConsumerWindowSize(),
- connectionFactory.getConsumerMaxRate());
+ connectionFactory.getConsumerMaxRate(),
+ false);
}
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final boolean direct,
final int windowSize,
- final int maxRate) throws MessagingException
+ final int maxRate,
+ final boolean isBrowser) throws MessagingException
{
checkClosed();
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
filterString,
windowSize,
- maxRate);
+ maxRate,
+ isBrowser);
SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)channel.sendBlocking(request);
@@ -348,7 +346,8 @@
clientWindowSize,
direct,
executor,
- channel);
+ channel,
+ isBrowser);
addConsumer(consumer);
@@ -361,24 +360,19 @@
return consumer;
}
- public ClientBrowser createBrowser(final SimpleString queueName) throws MessagingException
+ public ClientConsumer createBrowser(final SimpleString queueName) throws MessagingException
{
return createBrowser(queueName, null);
}
- public ClientBrowser createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
+ public ClientConsumer createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
{
- checkClosed();
-
- SessionCreateBrowserMessage request = new SessionCreateBrowserMessage(queueName, filterString);
-
- channel.sendBlocking(request);
-
- ClientBrowser browser = new ClientBrowserImpl(this, idGenerator.generateID(), channel);
-
- addBrowser(browser);
-
- return browser;
+ return createConsumer(queueName,
+ filterString,
+ false,
+ connectionFactory.getConsumerWindowSize(),
+ connectionFactory.getConsumerMaxRate(),
+ true);
}
public ClientProducer createProducer(final SimpleString address) throws MessagingException
@@ -613,10 +607,6 @@
producers.put(producer.getID(), producer);
}
- public void addBrowser(final ClientBrowser browser)
- {
- browsers.put(browser.getID(), browser);
- }
public void removeConsumer(final ClientConsumerInternal consumer) throws MessagingException
{
@@ -633,11 +623,6 @@
}
}
- public void removeBrowser(final ClientBrowser browser)
- {
- browsers.remove(browser.getID());
- }
-
public Set<ClientProducerInternal> getProducers()
{
return new HashSet<ClientProducerInternal>(producers.values());
@@ -648,10 +633,6 @@
return new HashSet<ClientConsumerInternal>(consumers.values());
}
- public Set<ClientBrowser> getBrowsers()
- {
- return new HashSet<ClientBrowser>(browsers.values());
- }
public Map<SimpleString, ClientProducerInternal> getProducerCache()
{
@@ -668,6 +649,16 @@
}
}
+ public void deliveryComplete(long consumerID)
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+
+ if (consumer != null)
+ {
+ consumer.deliveryComplete();
+ }
+ }
+
public void receiveProducerCredits(final long producerID, final int credits) throws Exception
{
ClientProducerInternal producer = producers.get(producerID);
@@ -1125,13 +1116,6 @@
{
producer.cleanUp();
}
-
- Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers.values());
-
- for (ClientBrowser browser : browsersClone)
- {
- browser.cleanUp();
- }
}
private void closeChildren() throws MessagingException
@@ -1149,13 +1133,6 @@
{
producer.close();
}
-
- Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers.values());
-
- for (ClientBrowser browser : browsersClone)
- {
- browser.close();
- }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -12,20 +12,20 @@
package org.jboss.messaging.core.client.impl;
-import java.util.Map;
-import java.util.Set;
-
-import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.util.SimpleString;
+import java.util.Map;
+import java.util.Set;
+
/**
* A ClientSessionInternal
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public interface ClientSessionInternal extends ClientSession
{
@@ -37,20 +37,14 @@
void addProducer(ClientProducerInternal producer);
- void addBrowser(ClientBrowser browser);
-
void removeConsumer(ClientConsumerInternal consumer) throws MessagingException;
void removeProducer(ClientProducerInternal producer);
- void removeBrowser(ClientBrowser browser);
-
Set<ClientProducerInternal> getProducers();
Set<ClientConsumerInternal> getConsumers();
- Set<ClientBrowser> getBrowsers();
-
Map<SimpleString, ClientProducerInternal> getProducerCache();
//void cleanUp() throws Exception;
@@ -60,4 +54,6 @@
void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
boolean handleFailover(final RemotingConnection backupConnection);
+
+ void deliveryComplete(long consumerID);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,15 +22,15 @@
package org.jboss.messaging.core.client.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELIVERY_COMPLETE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
@@ -39,6 +39,7 @@
* A ClientSessionPacketHandler
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
*/
public class ClientSessionPacketHandler implements ChannelHandler
@@ -76,6 +77,14 @@
break;
}
+ case SESS_DELIVERY_COMPLETE:
+ {
+ SessionDeliveryCompleteMessage message = (SessionDeliveryCompleteMessage) packet;
+
+ clientSession.deliveryComplete(message.getConsumerID());
+
+ break;
+ }
case EXCEPTION:
{
//TODO - we can provide a means for async exceptions to get back to to client
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -12,9 +12,24 @@
package org.jboss.messaging.core.remoting.impl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.INITIAL_BUFFER_SIZE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
@@ -25,21 +40,18 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELIVERY_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -72,39 +84,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -113,22 +92,18 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
@@ -161,9 +136,28 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleIDGenerator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *
* @version <tt>$Revision$</tt> $Id$
*/
public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
@@ -495,7 +489,7 @@
private void doWrite(final Packet packet)
{
- final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ final MessagingBuffer buffer = transportConnection.createBuffer(INITIAL_BUFFER_SIZE);
packet.encode(buffer);
@@ -517,7 +511,7 @@
}
case PONG:
{
- packet = new PacketImpl(PacketImpl.PONG);
+ packet = new PacketImpl(PONG);
break;
}
case EXCEPTION:
@@ -585,11 +579,16 @@
packet = new SessionCreateProducerResponseMessage();
break;
}
- case SESS_CREATEBROWSER:
+ case SESS_CONSUMER_STOP:
{
- packet = new SessionCreateBrowserMessage();
+ packet = new SessionConsumerStopMessage();
break;
}
+ case SESS_CONSUMER_START:
+ {
+ packet = new SessionConsumerStartMessage();
+ break;
+ }
case SESS_PROCESSED:
{
packet = new SessionProcessedMessage();
@@ -597,12 +596,12 @@
}
case SESS_COMMIT:
{
- packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+ packet = new PacketImpl(SESS_COMMIT);
break;
}
case SESS_ROLLBACK:
{
- packet = new PacketImpl(PacketImpl.SESS_ROLLBACK);
+ packet = new PacketImpl(SESS_ROLLBACK);
break;
}
case SESS_QUEUEQUERY:
@@ -645,31 +644,6 @@
packet = new SessionBindingQueryResponseMessage();
break;
}
- case PacketImpl.SESS_BROWSER_MESSAGE:
- {
- packet = new SessionBrowseMessage();
- break;
- }
- case SESS_BROWSER_RESET:
- {
- packet = new SessionBrowserResetMessage();
- break;
- }
- case SESS_BROWSER_HASNEXTMESSAGE:
- {
- packet = new SessionBrowserHasNextMessageMessage();
- break;
- }
- case SESS_BROWSER_HASNEXTMESSAGE_RESP:
- {
- packet = new SessionBrowserHasNextMessageResponseMessage();
- break;
- }
- case SESS_BROWSER_NEXTMESSAGE:
- {
- packet = new SessionBrowserNextMessageMessage();
- break;
- }
case SESS_XA_START:
{
packet = new SessionXAStartMessage();
@@ -707,7 +681,7 @@
}
case SESS_XA_SUSPEND:
{
- packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+ packet = new PacketImpl(SESS_XA_SUSPEND);
break;
}
case SESS_XA_RESUME:
@@ -722,7 +696,7 @@
}
case SESS_XA_INDOUBT_XIDS:
{
- packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+ packet = new PacketImpl(SESS_XA_INDOUBT_XIDS);
break;
}
case SESS_XA_INDOUBT_XIDS_RESP:
@@ -742,7 +716,7 @@
}
case SESS_XA_GET_TIMEOUT:
{
- packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+ packet = new PacketImpl(SESS_XA_GET_TIMEOUT);
break;
}
case SESS_XA_GET_TIMEOUT_RESP:
@@ -752,12 +726,12 @@
}
case SESS_START:
{
- packet = new PacketImpl(PacketImpl.SESS_START);
+ packet = new PacketImpl(SESS_START);
break;
}
case SESS_STOP:
{
- packet = new PacketImpl(PacketImpl.SESS_STOP);
+ packet = new PacketImpl(SESS_STOP);
break;
}
case SESS_FLOWTOKEN:
@@ -790,9 +764,9 @@
packet = new SessionProducerCloseMessage();
break;
}
- case SESS_BROWSER_CLOSE:
+ case SESS_DELIVERY_COMPLETE:
{
- packet = new SessionBrowserCloseMessage();
+ packet = new SessionDeliveryCompleteMessage();
break;
}
case SESS_SCHEDULED_SEND:
@@ -1064,7 +1038,7 @@
"Timed out waiting for response when sending packet " + packet.getType());
}
- if (response.getType() == PacketImpl.EXCEPTION)
+ if (response.getType() == EXCEPTION)
{
final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
@@ -1348,7 +1322,6 @@
if (executor == null)
{
checkConfirmation(packet);
-
handler.handlePacket(packet);
}
else
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -20,6 +20,7 @@
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version <tt>$Revision$</tt>
*/
public class PacketImpl implements Packet
@@ -69,9 +70,9 @@
public static final byte SESS_CREATEPRODUCER_RESP = 43;
- public static final byte SESS_CREATEBROWSER = 44;
+ public static final byte SESS_CONSUMER_STOP = 44;
- public static final byte SESS_CREATEBROWSER_RESP = 45;
+ public static final byte SESS_CONSUMER_START = 45;
public static final byte SESS_PROCESSED = 46;
@@ -95,16 +96,6 @@
public static final byte SESS_BINDINGQUERY_RESP = 56;
- public static final byte SESS_BROWSER_MESSAGE = 57;
-
- public static final byte SESS_BROWSER_RESET = 58;
-
- public static final byte SESS_BROWSER_HASNEXTMESSAGE = 59;
-
- public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 60;
-
- public static final byte SESS_BROWSER_NEXTMESSAGE = 61;
-
public static final byte SESS_XA_START = 62;
public static final byte SESS_XA_END = 63;
@@ -153,8 +144,6 @@
public static final byte SESS_PRODUCER_CLOSE = 85;
- public static final byte SESS_BROWSER_CLOSE = 86;
-
public static final byte SESS_RECEIVE_MSG = 87;
public static final byte SESS_MANAGEMENT_SEND = 88;
@@ -165,6 +154,8 @@
public static final byte SESS_REPLICATE_DELIVERY = 91;
+ public static final byte SESS_DELIVERY_COMPLETE = 92;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.impl.ClientMessageImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionBrowseMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
-
- // Attributes ----------------------------------------------------
-
- private ClientMessage clientMessage;
-
- private ServerMessage serverMessage;
-
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionBrowseMessage(final ServerMessage message)
- {
- super(SESS_BROWSER_MESSAGE);
-
- this.serverMessage = message;
-
- this.clientMessage = null;
- }
-
- public SessionBrowseMessage()
- {
- super(SESS_BROWSER_MESSAGE);
- }
-
- // Public --------------------------------------------------------
-
- public boolean isResponse()
- {
- return true;
- }
-
- public ClientMessage getClientMessage()
- {
- return clientMessage;
- }
-
- public ServerMessage getServerMessage()
- {
- return serverMessage;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- serverMessage.encode(buffer);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- //TODO can be optimised
-
- clientMessage = new ClientMessageImpl();
-
- clientMessage.decode(buffer);
-
- clientMessage.getBody().flip();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionBrowserCloseMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long browserID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionBrowserCloseMessage(final long browserID)
- {
- super(SESS_BROWSER_CLOSE);
-
- this.browserID = browserID;
- }
-
- public SessionBrowserCloseMessage()
- {
- super(SESS_BROWSER_CLOSE);
- }
-
- // Public --------------------------------------------------------
-
- public long getBrowserID()
- {
- return browserID;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(browserID);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- browserID = buffer.getLong();
- }
-
-
- @Override
- public String toString()
- {
- return getParentString() + ", browserID=" + browserID + "]";
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionBrowserCloseMessage == false)
- {
- return false;
- }
-
- SessionBrowserCloseMessage r = (SessionBrowserCloseMessage)other;
-
- return super.equals(other) && this.browserID == r.browserID;
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,88 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class SessionBrowserHasNextMessageMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long browserID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionBrowserHasNextMessageMessage(final long browserID)
- {
- super(SESS_BROWSER_HASNEXTMESSAGE);
-
- this.browserID = browserID;
- }
-
- public SessionBrowserHasNextMessageMessage()
- {
- super(SESS_BROWSER_HASNEXTMESSAGE);
- }
-
- // Public --------------------------------------------------------
-
- public long getBrowserID()
- {
- return browserID;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(browserID);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- browserID = buffer.getLong();
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", browserID=" + browserID + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,88 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class SessionBrowserNextMessageMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long browserID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionBrowserNextMessageMessage(final long browserID)
- {
- super(SESS_BROWSER_NEXTMESSAGE);
-
- this.browserID = browserID;
- }
-
- public SessionBrowserNextMessageMessage()
- {
- super(SESS_BROWSER_NEXTMESSAGE);
- }
-
- // Public --------------------------------------------------------
-
- public long getBrowserID()
- {
- return browserID;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(browserID);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- browserID = buffer.getLong();
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", browserID=" + browserID + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,89 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class SessionBrowserResetMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long browserID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionBrowserResetMessage(final long browserID)
- {
- super(SESS_BROWSER_RESET);
-
- this.browserID = browserID;
- }
-
- public SessionBrowserResetMessage()
- {
- super(SESS_BROWSER_RESET);
- }
-
- // Public --------------------------------------------------------
-
- public long getBrowserID()
- {
- return browserID;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(browserID);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- browserID = buffer.getLong();
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", browserID=" + browserID + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SessionConsumerStartMessage extends PacketImpl
+{
+ private long consumerId;
+
+ public SessionConsumerStartMessage(long consumerId)
+ {
+ super(SESS_CONSUMER_START);
+ this.consumerId = consumerId;
+ }
+
+ public SessionConsumerStartMessage()
+ {
+ super(SESS_CONSUMER_START);
+ }
+
+ public long getConsumerId()
+ {
+ return consumerId;
+ }
+
+ public void encodeBody(MessagingBuffer buffer)
+ {
+ buffer.putLong(consumerId);
+ }
+
+ public void decodeBody(MessagingBuffer buffer)
+ {
+ consumerId = buffer.getLong();
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SessionConsumerStopMessage extends PacketImpl
+{
+ private long consumerId;
+
+ public SessionConsumerStopMessage(long consumerId)
+ {
+ super(SESS_CONSUMER_STOP);
+ this.consumerId = consumerId;
+ }
+
+ public SessionConsumerStopMessage()
+ {
+ super(SESS_CONSUMER_STOP);
+ }
+
+ public long getConsumerId()
+ {
+ return consumerId;
+ }
+
+ public void encodeBody(MessagingBuffer buffer)
+ {
+ buffer.putLong(consumerId);
+ }
+
+ public void decodeBody(MessagingBuffer buffer)
+ {
+ consumerId = buffer.getLong();
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,113 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class SessionCreateBrowserMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString queueName;
-
- private SimpleString filterString;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateBrowserMessage(final SimpleString queueName, final SimpleString filterString)
- {
- super(SESS_CREATEBROWSER);
-
- this.queueName = queueName;
- this.filterString = filterString;
- }
-
- public SessionCreateBrowserMessage()
- {
- super(SESS_CREATEBROWSER);
- }
-
- // Public --------------------------------------------------------
-
- public SimpleString getQueueName()
- {
- return queueName;
- }
-
- public SimpleString getFilterString()
- {
- return filterString;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putSimpleString(queueName);
- buffer.putNullableSimpleString(filterString);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- queueName = buffer.getSimpleString();
- filterString = buffer.getNullableSimpleString();
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", queueName=" + queueName + ", filterString="
- + filterString + "]";
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionCreateBrowserMessage == false)
- {
- return false;
- }
-
- SessionCreateBrowserMessage r = (SessionCreateBrowserMessage)other;
-
- return super.equals(other) && this.queueName.equals(r.queueName) &&
- this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -28,6 +28,7 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* @version <tt>$Revision$</tt>
*/
@@ -44,13 +45,15 @@
private int windowSize;
private int maxRate;
-
+
+ private boolean isBrowser;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public SessionCreateConsumerMessage(final SimpleString queueName, final SimpleString filterString,
- final int windowSize, final int maxRate)
+ final int windowSize, final int maxRate, final boolean isBrowser)
{
super(SESS_CREATECONSUMER);
@@ -58,6 +61,7 @@
this.filterString = filterString;
this.windowSize = windowSize;
this.maxRate = maxRate;
+ this.isBrowser = isBrowser;
}
public SessionCreateConsumerMessage()
@@ -98,13 +102,19 @@
{
return maxRate;
}
-
+
+ public boolean isBrowser()
+ {
+ return isBrowser;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putSimpleString(queueName);
buffer.putNullableSimpleString(filterString);
buffer.putInt(windowSize);
buffer.putInt(maxRate);
+ buffer.putBoolean(isBrowser);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -113,6 +123,7 @@
filterString = buffer.getNullableSimpleString();
windowSize = buffer.getInt();
maxRate = buffer.getInt();
+ isBrowser = buffer.getBoolean();
}
public boolean equals(Object other)
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SessionDeliveryCompleteMessage extends PacketImpl
+{
+ private long consumerID;
+
+ public SessionDeliveryCompleteMessage(long consumerID)
+ {
+ super(SESS_DELIVERY_COMPLETE);
+ this.consumerID = consumerID;
+ }
+
+ public SessionDeliveryCompleteMessage()
+ {
+ super(SESS_DELIVERY_COMPLETE);
+ }
+
+ public void encodeBody(MessagingBuffer buffer)
+ {
+ buffer.putLong(consumerID);
+ }
+
+ public void decodeBody(MessagingBuffer buffer)
+ {
+ consumerID = buffer.getLong();
+ }
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,12 +22,15 @@
package org.jboss.messaging.core.server;
+import java.util.concurrent.Executor;
+
/**
*
* A ServerConsumer
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
*/
public interface ServerConsumer extends Consumer
@@ -49,4 +52,10 @@
void failedOver();
void deliver(final long messageID) throws Exception;
+
+ void deliver(Executor executor);
+
+ void stop() throws Exception;
+
+ void start();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,10 +22,6 @@
package org.jboss.messaging.core.server;
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
@@ -33,9 +29,11 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import java.util.List;
+
/**
*
* A ServerSession
@@ -52,8 +50,6 @@
String getPassword();
- void removeBrowser(ServerBrowserImpl browser) throws Exception;
-
void removeConsumer(ServerConsumer consumer) throws Exception;
void removeProducer(ServerProducer producer) throws Exception;
@@ -113,7 +109,8 @@
SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
SimpleString filterString,
int windowSize,
- int maxRate) throws Exception;
+ int maxRate,
+ boolean isBrowser) throws Exception;
SessionCreateProducerResponseMessage createProducer(SimpleString address,
int windowSize,
@@ -124,26 +121,16 @@
SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
- void createBrowser(SimpleString queueName, SimpleString filterString) throws Exception;
-
void closeConsumer(long consumerID) throws Exception;
void closeProducer(long producerID) throws Exception;
- void closeBrowser(long browserID) throws Exception;
-
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
- boolean browserHasNextMessage(long browserID) throws Exception;
-
- ServerMessage browserNextMessage(long browserID) throws Exception;
-
- void browserReset(long browserID) throws Exception;
-
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
@@ -151,4 +138,10 @@
void failedOver() throws Exception;
void handleReplicatedDelivery(long consumerID, long messageID) throws Exception;
+
+ void promptDelivery(ServerConsumer browser);
+
+ void resetConsumer(long consumerID) throws Exception;
+
+ void reStartConsumer(long consumerId);
}
Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,191 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * Concrete implementation of BrowserEndpoint.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 3778 $</tt>
- *
- * $Id: ServerBrowserImpl.java 3778 2008-02-24 12:15:29Z timfox $
- */
-public class ServerBrowserImpl
-{
- // Constants ------------------------------------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ServerBrowserImpl.class);
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- private final long id;
- private final ServerSession session;
- private final Queue destination;
- private final Filter filter;
- private Iterator<ServerMessage> iterator;
-
- // Constructors ---------------------------------------------------------------------------------
-
- public ServerBrowserImpl(final long id, final ServerSession session,
- final Queue destination, final String messageFilter) throws MessagingException
- {
- this.id = id;
-
- this.session = session;
-
- this.destination = destination;
-
- if (messageFilter != null)
- {
- filter = new FilterImpl(new SimpleString(messageFilter));
- }
- else
- {
- filter = null;
- }
- }
-
- // BrowserEndpoint implementation ---------------------------------------------------------------
-
- public long getID()
- {
- return id;
- }
-
- public void reset() throws Exception
- {
- iterator = createIterator();
- }
-
- public boolean hasNextMessage() throws Exception
- {
- if (iterator == null)
- {
- iterator = createIterator();
- }
-
- boolean has = iterator.hasNext();
-
- return has;
- }
-
- public ServerMessage nextMessage() throws Exception
- {
- if (iterator == null)
- {
- iterator = createIterator();
- }
-
- ServerMessage r = iterator.next();
-
- return r;
- }
-
- public Message[] nextMessageBlock(int maxMessages) throws Exception
- {
- if (maxMessages < 2)
- {
- throw new IllegalArgumentException("maxMessages must be >=2 otherwise use nextMessage");
- }
-
- if (iterator == null)
- {
- iterator = createIterator();
- }
-
- List<ServerMessage> messages = new ArrayList<ServerMessage>(maxMessages);
- int i = 0;
- while (i < maxMessages)
- {
- if (iterator.hasNext())
- {
- ServerMessage m = iterator.next();
- messages.add(m);
- i++;
- }
- else
- {
- break;
- }
- }
- return messages.toArray(new Message[messages.size()]);
- }
-
- public void close() throws Exception
- {
- iterator = null;
-
- session.removeBrowser(this);
-
- log.trace(this + " closed");
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- @Override
- public String toString()
- {
- return "BrowserEndpoint[" + id + "]";
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private Iterator<ServerMessage> createIterator()
- {
- List<MessageReference> refs = destination.list(filter);
-
- List<ServerMessage> msgs = new ArrayList<ServerMessage>();
-
- for (MessageReference ref: refs)
- {
- msgs.add(ref.getMessage());
- }
-
- return msgs.iterator();
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,14 +22,12 @@
package org.jboss.messaging.core.server.impl;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.server.HandleStatus;
@@ -43,11 +41,18 @@
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Concrete implementation of a ClientConsumer.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783 2008-02-25 12:15:14Z timfox $
*/
@@ -70,13 +75,13 @@
private final Queue messageQueue;
- private final Filter filter;
+ protected final Filter filter;
- private final ServerSession session;
+ protected final ServerSession session;
private final Object startStopLock = new Object();
- private final AtomicInteger availableCredits;
+ protected final AtomicInteger availableCredits;
private boolean started;
@@ -88,7 +93,17 @@
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
- private final Channel channel;
+ protected final Channel channel;
+
+ private boolean browseOnly;
+
+ private Iterator<MessageReference> iterator;
+
+ private DeliveryRunner deliveryRunner = new DeliveryRunner();
+
+ private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
+
+ private boolean delivering = false;
// Constructors
// ---------------------------------------------------------------------------------
@@ -103,10 +118,13 @@
final StorageManager storageManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
- final Channel channel)
+ final Channel channel,
+ final boolean browseOnly)
{
this.id = id;
-
+
+ this.browseOnly = browseOnly;
+
this.messageQueue = messageQueue;
this.filter = filter;
@@ -131,8 +149,11 @@
this.postOffice = postOffice;
this.channel = channel;
-
- messageQueue.addConsumer(this);
+
+ if(!browseOnly)
+ {
+ messageQueue.addConsumer(this);
+ }
}
// ServerConsumer implementation
@@ -294,6 +315,26 @@
}
}
+ public void stop() throws Exception
+ {
+ delivering = false;
+ }
+
+ public void start()
+ {
+ iterator = getQueue().list(filter).iterator();
+ delivering = true;
+ promptDelivery();
+ }
+
+ public void deliver(Executor executor)
+ {
+ if (delivering && waitingToDeliver.compareAndSet(false, true))
+ {
+ executor.execute(deliveryRunner);
+ }
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -302,10 +343,44 @@
private void promptDelivery()
{
- session.promptDelivery(messageQueue);
+ if(browseOnly)
+ {
+ session.promptDelivery(this);
+ }
+ else
+ {
+ session.promptDelivery(messageQueue);
+ }
}
// Inner classes
// ------------------------------------------------------------------------
+ private class DeliveryRunner implements Runnable
+ {
+ public void run()
+ {
+ waitingToDeliver.set(false);
+
+ synchronized (ServerConsumerImpl.this)
+ {
+ while (delivering && iterator.hasNext() && !(availableCredits != null && availableCredits.get() <= 0))
+ {
+ MessageReference ref = iterator.next();
+ if (availableCredits != null)
+ {
+ availableCredits.addAndGet(-ref.getMessage().getEncodeSize());
+ }
+ channel.send(new SessionReceiveMessage(id, ref.getMessage(), 1));
+ }
+ //inform the client there are no more messages
+ if(!iterator.hasNext() || !delivering)
+ {
+ channel.send(new SessionDeliveryCompleteMessage(id));
+ iterator = null;
+ }
+ }
+
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -12,21 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
@@ -69,6 +54,20 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.SimpleStringIdGenerator;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
/*
* Session implementation
*
@@ -104,8 +103,6 @@
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
- private final Map<Long, ServerBrowserImpl> browsers = new ConcurrentHashMap<Long, ServerBrowserImpl>();
-
private final Map<Long, ServerProducer> producers = new ConcurrentHashMap<Long, ServerProducer>();
private final Executor executor;
@@ -222,14 +219,6 @@
return id;
}
- public void removeBrowser(final ServerBrowserImpl browser) throws Exception
- {
- if (browsers.remove(browser.getID()) == null)
- {
- throw new IllegalStateException("Cannot find browser with id " + browser.getID() + " to remove");
- }
- }
-
public void removeConsumer(final ServerConsumer consumer) throws Exception
{
if (consumers.remove(consumer.getID()) == null)
@@ -285,15 +274,6 @@
consumers.clear();
- Set<ServerBrowserImpl> browsersClone = new HashSet<ServerBrowserImpl>(browsers.values());
-
- for (ServerBrowserImpl browser : browsersClone)
- {
- browser.close();
- }
-
- browsers.clear();
-
Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers.values());
for (ServerProducer producer : producersClone)
@@ -313,6 +293,21 @@
queue.deliverAsync(executor);
}
+ public void promptDelivery(ServerConsumer consumer)
+ {
+ consumer.deliver(executor);
+ }
+
+ public void resetConsumer(long consumerID) throws Exception
+ {
+ consumers.get(consumerID).stop();
+ }
+
+ public void reStartConsumer(long consumerID)
+ {
+ consumers.get(consumerID).start();
+ }
+
public void send(final ServerMessage msg) throws Exception
{
// check the user has write access to this address.
@@ -906,7 +901,8 @@
public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
final SimpleString filterString,
int windowSize,
- int maxRate) throws Exception
+ int maxRate,
+ boolean isBrowser) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -947,7 +943,7 @@
storageManager,
queueSettingsRepository,
postOffice,
- channel);
+ channel, isBrowser);
SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
@@ -1013,25 +1009,6 @@
return new SessionBindingQueryResponseMessage(exists, queueNames);
}
- public void createBrowser(final SimpleString queueName, final SimpleString filterString) throws Exception
- {
- Binding binding = postOffice.getBinding(queueName);
-
- if (binding == null)
- {
- throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
- }
-
- securityStore.check(binding.getAddress(), CheckType.READ, this);
-
- ServerBrowserImpl browser = new ServerBrowserImpl(idGenerator.generateID(),
- this,
- binding.getQueue(),
- filterString == null ? null : filterString.toString());
-
- browsers.put(browser.getID(), browser);
- }
-
/**
* Create a producer for the specified address
*
@@ -1083,26 +1060,6 @@
return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
- public boolean browserHasNextMessage(final long browserID) throws Exception
- {
- return browsers.get(browserID).hasNextMessage();
- }
-
- public ServerMessage browserNextMessage(final long browserID) throws Exception
- {
- return browsers.get(browserID).nextMessage();
- }
-
- public void browserReset(final long browserID) throws Exception
- {
- browsers.get(browserID).reset();
- }
-
- public void closeBrowser(final long browserID) throws Exception
- {
- browsers.get(browserID).close();
- }
-
public void closeConsumer(final long consumerID) throws Exception
{
consumers.get(consumerID).close();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -12,16 +12,21 @@
package org.jboss.messaging.core.server.impl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
@@ -33,6 +38,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -50,31 +56,12 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
@@ -102,6 +89,9 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
+import javax.transaction.xa.Xid;
+import java.util.List;
+
/**
* A ServerSessionPacketHandler
*
@@ -180,7 +170,8 @@
response = session.createConsumer(request.getQueueName(),
request.getFilterString(),
request.getWindowSize(),
- request.getMaxRate());
+ request.getMaxRate(),
+ request.isBrowser());
break;
}
case SESS_CREATEQUEUE:
@@ -213,13 +204,6 @@
response = session.executeBindingQuery(request.getAddress());
break;
}
- case SESS_CREATEBROWSER:
- {
- SessionCreateBrowserMessage request = (SessionCreateBrowserMessage)packet;
- session.createBrowser(request.getQueueName(), request.getFilterString());
- response = new NullResponseMessage();
- break;
- }
case SESS_CREATEPRODUCER:
{
SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
@@ -229,6 +213,20 @@
request.isAutoGroupId());
break;
}
+ case SESS_CONSUMER_STOP:
+ {
+ SessionConsumerStopMessage request = (SessionConsumerStopMessage) packet;
+ session.resetConsumer(request.getConsumerId());
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_CONSUMER_START:
+ {
+ SessionConsumerStartMessage request = (SessionConsumerStartMessage) packet;
+ session.reStartConsumer(request.getConsumerId());
+ response = new NullResponseMessage();
+ break;
+ }
case SESS_PROCESSED:
{
SessionProcessedMessage message = (SessionProcessedMessage)packet;
@@ -371,13 +369,6 @@
response = new NullResponseMessage();
break;
}
- case SESS_BROWSER_CLOSE:
- {
- SessionBrowserCloseMessage message = (SessionBrowserCloseMessage)packet;
- session.closeBrowser(message.getBrowserID());
- response = new NullResponseMessage();
- break;
- }
case SESS_FLOWTOKEN:
{
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
@@ -406,33 +397,13 @@
}
break;
}
- case SESS_BROWSER_HASNEXTMESSAGE:
- {
- SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;
- response = new SessionBrowserHasNextMessageResponseMessage(session.browserHasNextMessage(message.getBrowserID()));
- break;
- }
- case SESS_BROWSER_NEXTMESSAGE:
- {
- SessionBrowserNextMessageMessage message = (SessionBrowserNextMessageMessage)packet;
- ServerMessage smsg = session.browserNextMessage(message.getBrowserID());
- response = new SessionBrowseMessage(smsg);
- break;
- }
- case SESS_BROWSER_RESET:
- {
- SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
- session.browserReset(message.getBrowserID());
- response = new NullResponseMessage();
- break;
- }
case SESS_MANAGEMENT_SEND:
{
SessionSendManagementMessage message = (SessionSendManagementMessage)packet;
session.handleManagementMessage(message);
break;
}
- case PacketImpl.SESS_REPLICATE_DELIVERY:
+ case SESS_REPLICATE_DELIVERY:
{
SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;
session.handleReplicatedDelivery(message.getConsumerID(), message.getMessageID());
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,19 +22,20 @@
package org.jboss.messaging.jms.client;
-import java.util.Enumeration;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* $Id$
*/
@@ -44,19 +45,22 @@
private static final Logger log = Logger.getLogger(JBossQueueBrowser.class);
+ private static final long NEXT_MESSAGE_TIMEOUT = 5000;
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
- private ClientBrowser browser;
+ private ClientConsumer consumer;
private Queue queue;
private String messageSelector;
+ private boolean firstTime = true;
// Constructors ---------------------------------------------------------------------------------
- public JBossQueueBrowser(Queue queue, String messageSelector, ClientBrowser browser)
+ public JBossQueueBrowser(Queue queue, String messageSelector, ClientConsumer consumer)
{
- this.browser = browser;
+ this.consumer = consumer;
this.queue = queue;
this.messageSelector = messageSelector;
}
@@ -67,7 +71,7 @@
{
try
{
- browser.close();
+ consumer.close();
}
catch (MessagingException e)
{
@@ -79,12 +83,20 @@
{
try
{
- browser.reset();
+ if(firstTime)
+ {
+ consumer.start();
+ firstTime = false;
+ }
+ else
+ {
+ consumer.restart();
+ }
return new BrowserEnumeration();
}
catch (MessagingException e)
{
- throw JMSExceptionHelper.convertFromMessagingException(e);
+ throw JMSExceptionHelper.convertFromMessagingException(e);
}
}
@@ -102,7 +114,7 @@
public String toString()
{
- return "JBossQueueBrowser->" + browser;
+ return "JBossQueueBrowser->" + consumer;
}
// Package protected ----------------------------------------------------------------------------
@@ -119,9 +131,9 @@
{
try
{
- return browser.hasNextMessage();
+ return consumer.awaitMessage(NEXT_MESSAGE_TIMEOUT);
}
- catch (MessagingException e)
+ catch (Exception e)
{
throw new IllegalStateException(e.getMessage());
}
@@ -131,8 +143,18 @@
{
try
{
- ClientMessage message = browser.nextMessage();
+ if(!hasMoreElements())
+ {
+ throw new NoSuchElementException();
+ }
+ ClientMessage message = consumer.receiveImmediate();
+
+ if(message == null)
+ {
+ throw new NoSuchElementException();
+ }
+
JBossMessage jbm = JBossMessage.createMessage(message, null);
jbm.doBeforeReceive();
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,10 +22,19 @@
package org.jboss.messaging.jms.client;
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.util.SimpleString;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@@ -58,22 +67,11 @@
import javax.jms.XASession;
import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.jms.JBossQueue;
-import org.jboss.messaging.jms.JBossTemporaryQueue;
-import org.jboss.messaging.jms.JBossTemporaryTopic;
-import org.jboss.messaging.jms.JBossTopic;
-import org.jboss.messaging.util.SimpleString;
-
/**
*
* Note that we *do not* support JMS ASF (Application Server Facilities) optional
@@ -81,7 +79,7 @@
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* @version <tt>$Revision$</tt>
*
@@ -640,7 +638,7 @@
{
String coreSelector = SelectorTranslator.convertToJBMFilterString(filterString);
- ClientBrowser browser = session.createBrowser(jbq.getSimpleAddress(),
+ ClientConsumer browser = session.createBrowser(jbq.getSimpleAddress(),
coreSelector == null ? null : new SimpleString(coreSelector));
return new JBossQueueBrowser(queue, filterString, browser);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -0,0 +1,162 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.consumer;
+
+import junit.framework.TestCase;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ConsumerTest extends TestCase
+{
+ public void testConsumerAsSimpleBrowser() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("CoreClientTestQueue");
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createBrowser(QUEUE);
+ consumer.start();
+ int i = 0;
+ while(consumer.awaitMessage(0))
+ {
+ ClientMessage m = consumer.receiveImmediate();
+ assertEquals(m.getBody().getString(), "m" + i);
+ i++;
+ }
+ assertEquals(100, i);
+ consumer.restart();
+ i = 0;
+ while(consumer.awaitMessage(0))
+ {
+ ClientMessage m = consumer.receiveImmediate();
+ assertEquals(m.getBody().getString(), "m" + i);
+ i++;
+ }
+ assertEquals(100, i);
+ session.close();
+ messagingService.stop();
+ }
+
+ public void testConsumerAsSimpleBrowserReset() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("CoreClientTestQueue");
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createBrowser(QUEUE);
+ consumer.start();
+ int i = 0;
+ while(consumer.awaitMessage(0))
+ {
+ ClientMessage m = consumer.receiveImmediate();
+ assertEquals(m.getBody().getString(), "m" + i);
+ i++;
+ if(i == 50)
+ {
+ break;
+ }
+ }
+ assertEquals(50, i);
+ consumer.restart();
+ i = 0;
+ while(consumer.awaitMessage(0))
+ {
+ ClientMessage m = consumer.receiveImmediate();
+ assertEquals(m.getBody().getString(), "m" + i);
+ i++;
+ }
+ assertEquals(100, i);
+ session.close();
+ messagingService.stop();
+ }
+
+ private ClientMessage createMessage(ClientSession session, String body)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString(body);
+ message.getBody().flip();
+ return message;
+ }
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -1,230 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.timing.core.client;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.util.id.GUID;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.util.Calendar;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ScheduledMessageTest extends UnitTestCase
-{
- private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
-
- private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
-
- private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
-
- private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
-
- private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
-
- private SimpleString atestq = new SimpleString("ascheduledtestq");
-
- private MessagingService messagingService;
-
- private ConfigurationImpl configuration;
-
- protected void setUp() throws Exception
- {
- File file = new File(journalDir);
- File file2 = new File(bindingsDir);
- File file3 = new File(pageDir);
- deleteDirectory(file);
- file.mkdirs();
- deleteDirectory(file2);
- file2.mkdirs();
- deleteDirectory(file3);
- file3.mkdirs();
- configuration = new ConfigurationImpl();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setPagingDirectory(pageDir);
- }
-
- protected void tearDown() throws Exception
- {
- if (messagingService != null)
- {
- try
- {
- messagingService.stop();
- }
- catch (Exception e)
- {
- //ignore
- }
- }
- new File(journalDir).delete();
- new File(bindingsDir).delete();
- new File(pageDir).delete();
- }
-
- public void testRecoveredMessageDeliveredCorrectly() throws Exception
- {
-
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- //start the server
- messagingService.start();
- //then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
- ClientSession session = sessionFactory.createSession(false, true, false, false);
- session.createQueue(atestq, atestq, null, true, true);
- ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().putString("testINVMCoreClient");
- message.getBody().flip();
- message.setDurable(true);
- Calendar cal = Calendar.getInstance();
- cal.roll(Calendar.SECOND, 10);
- producer.send(message, cal.getTimeInMillis());
-
- producer.close();
- session.close();
- messagingService.stop();
- messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- messagingService.start();
-
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
-
- session = sessionFactory.createSession(false, true, true, false);
-
- ClientConsumer consumer = session.createConsumer(atestq);
-
- session.start();
-
- ClientMessage message2 = consumer.receive(10000);
- assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
- assertEquals("testINVMCoreClient", message2.getBody().getString());
-
- message2.processed();
- session.close();
- }
-
- public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
- {
- Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- //start the server
- messagingService.start();
- //then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
- ClientSession session = sessionFactory.createSession(true, false, false, false);
- session.createQueue(atestq, atestq, null, true, true);
- session.start(xid, XAResource.TMNOFLAGS);
- ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().putString("testINVMCoreClient");
- message.getBody().flip();
- message.setDurable(true);
- Calendar cal = Calendar.getInstance();
- cal.roll(Calendar.SECOND, 10);
- producer.send(message, cal.getTimeInMillis());
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- producer.close();
- session.close();
- messagingService.stop();
- messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- messagingService.start();
-
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
-
- session = sessionFactory.createSession(true, false, false, false);
- session.commit(xid, true);
- ClientConsumer consumer = session.createConsumer(atestq);
-
- session.start();
-
- ClientMessage message2 = consumer.receive(10000);
- assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
- assertEquals("testINVMCoreClient", message2.getBody().getString());
-
- message2.processed();
- session.close();
- }
-
- public void testPagedMessageDeliveredCorrectly() throws Exception
- {
-
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- configuration.setPagingMaxGlobalSizeBytes(0);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- //start the server
- messagingService.start();
- //then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
- ClientSession session = sessionFactory.createSession(false, true, false, false);
- session.createQueue(atestq, atestq, null, true, true);
- ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().putString("testINVMCoreClient");
- message.getBody().flip();
- message.setDurable(true);
- Calendar cal = Calendar.getInstance();
- cal.roll(Calendar.SECOND, 10);
- producer.send(message, cal.getTimeInMillis());
-
- producer.close();
-
-
- ClientConsumer consumer = session.createConsumer(atestq);
-
- session.start();
-
- ClientMessage message2 = consumer.receive(10000);
- assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
- assertEquals("testINVMCoreClient", message2.getBody().getString());
-
- message2.processed();
- session.close();
- }
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,29 +22,26 @@
package org.jboss.messaging.tests.unit.jms.client;
+import junit.framework.TestCase;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import java.util.Enumeration;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.messaging.jms.client.JBossQueueBrowser;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import java.util.Enumeration;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -67,7 +64,7 @@
{
String messageSelector = "color = 'green'";
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
replay(clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, messageSelector,
@@ -80,7 +77,7 @@
public void testGetQueue() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
replay(clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -93,7 +90,7 @@
public void testClose() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
clientBrowser.close();
replay(clientBrowser);
@@ -108,7 +105,7 @@
public void testCloseThrowsException() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
clientBrowser.close();
expectLastCall().andThrow(new MessagingException());
@@ -131,8 +128,8 @@
public void testGetEnumeration() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
- clientBrowser.reset();
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+ clientBrowser.start();
replay(clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -144,40 +141,18 @@
verify(clientBrowser);
}
- public void testGetEnumerationThrowsException() throws Exception
- {
- Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
- clientBrowser.reset();
- expectLastCall().andThrow(new MessagingException());
- replay(clientBrowser);
-
- JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
- clientBrowser);
-
- try
- {
- browser.getEnumeration();
- fail("JMSException");
- } catch (JMSException e)
- {
- }
-
- verify(clientBrowser);
- }
-
public void testGetEnumerationWithOneMessage() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
ClientMessage clientMessage = createStrictMock(ClientMessage.class);
MessagingBuffer buffer = createStrictMock(MessagingBuffer.class);
- clientBrowser.reset();
- expect(clientBrowser.hasNextMessage()).andReturn(true);
+ clientBrowser.start();
+ expect(clientBrowser.awaitMessage(5000)).andStubReturn(true);
expect(clientMessage.getType()).andReturn(JBossMessage.TYPE);
expect(clientMessage.getBody()).andStubReturn(buffer);
- expect(clientBrowser.nextMessage()).andReturn(clientMessage);
- expect(clientBrowser.hasNextMessage()).andReturn(false);
+ expect(clientBrowser.receiveImmediate()).andReturn(clientMessage);
+ expect(clientBrowser.awaitMessage(5000)).andReturn(false);
replay(clientMessage, clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -196,9 +171,9 @@
throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
- clientBrowser.reset();
- expect(clientBrowser.hasNextMessage()).andThrow(new MessagingException());
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+ clientBrowser.start();
+ expect(clientBrowser.awaitMessage(5000)).andThrow(new MessagingException());
replay(clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -221,10 +196,10 @@
public void testGetEnumerationWithNextThrowsException() throws Exception
{
Queue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
- clientBrowser.reset();
- expect(clientBrowser.hasNextMessage()).andReturn(true);
- expect(clientBrowser.nextMessage()).andThrow(new MessagingException());
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+ clientBrowser.start();
+ expect(clientBrowser.awaitMessage(5000)).andStubReturn(true);
+ expect(clientBrowser.receiveImmediate()).andThrow(new MessagingException());
replay(clientBrowser);
JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java 2008-10-16 17:20:40 UTC (rev 5121)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java 2008-10-16 17:23:43 UTC (rev 5122)
@@ -22,6 +22,8 @@
package org.jboss.messaging.tests.unit.jms.client;
+import junit.framework.TestCase;
+import org.easymock.EasyMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -30,13 +32,28 @@
import static org.easymock.EasyMock.isNull;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.jms.client.JBossConnection;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossSession;
import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+import org.jboss.messaging.util.SimpleString;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -63,31 +80,10 @@
import javax.jms.TopicSession;
import javax.jms.TransactionInProgressException;
import javax.transaction.xa.XAResource;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
-import junit.framework.TestCase;
-
-import org.easymock.EasyMock;
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientMessageImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.jms.JBossQueue;
-import org.jboss.messaging.jms.JBossTemporaryQueue;
-import org.jboss.messaging.jms.JBossTemporaryTopic;
-import org.jboss.messaging.jms.JBossTopic;
-import org.jboss.messaging.jms.client.JBossConnection;
-import org.jboss.messaging.jms.client.JBossMessage;
-import org.jboss.messaging.jms.client.JBossSession;
-import org.jboss.messaging.util.SimpleString;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -1219,7 +1215,7 @@
public void testCreateBrowser() throws Exception
{
JBossQueue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
.andReturn(clientBrowser);
@@ -1240,7 +1236,7 @@
public void testCreateBrowserThrowsException() throws Exception
{
JBossQueue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
.andThrow(new MessagingException());
@@ -1266,7 +1262,7 @@
public void testCreateBrowserWithEmptyFilter() throws Exception
{
JBossQueue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
.andReturn(clientBrowser);
@@ -1288,7 +1284,7 @@
{
String filter = "color = 'red'";
JBossQueue queue = new JBossQueue(randomString());
- ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+ ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
expect(
mockClientSession.createBrowser(queue.getSimpleAddress(),
new SimpleString(filter))).andReturn(clientBrowser);
More information about the jboss-cvs-commits
mailing list