[jboss-cvs] JBoss Messaging SVN: r2141 - in trunk: src/main/org/jboss/jms/client/delegate and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 2 04:45:42 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-02-02 04:45:42 -0500 (Fri, 02 Feb 2007)
New Revision: 2141
Added:
trunk/src/main/org/jboss/jms/wireformat/BrowserResetRequest.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-802
http://jira.jboss.org/jira/browse/JBMESSAGING-803
Modified: trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/client/JBossQueueBrowser.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -37,22 +37,20 @@
*/
public class JBossQueueBrowser implements QueueBrowser, Serializable
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final long serialVersionUID = 4245650830082712281L;
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
private BrowserDelegate delegate;
private Queue queue;
private String messageSelector;
- private BrowserEnumeration enumeration = new BrowserEnumeration();
+ // Constructors ---------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
-
JBossQueueBrowser(Queue queue, String messageSelector, BrowserDelegate delegate)
{
this.delegate = delegate;
@@ -60,7 +58,7 @@
this.messageSelector = messageSelector;
}
- // QueueBrowser implementation ------------------------------------
+ // QueueBrowser implementation -------------------------------------------------------------------
public void close() throws JMSException
{
@@ -69,8 +67,9 @@
}
public Enumeration getEnumeration() throws JMSException
- {
- return enumeration;
+ {
+ delegate.reset();
+ return new BrowserEnumeration();
}
public String getMessageSelector() throws JMSException
@@ -83,7 +82,7 @@
return queue;
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String toString()
{
@@ -95,13 +94,13 @@
return delegate;
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
private class BrowserEnumeration implements Enumeration
{
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -36,6 +36,7 @@
import org.jboss.jms.wireformat.CloseRequest;
import org.jboss.jms.wireformat.ClosingRequest;
import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.jms.wireformat.BrowserResetRequest;
/**
* The client-side Browser delegate class.
@@ -64,9 +65,9 @@
super(objectID);
this.channelID = channelID;
}
-
+
public ClientBrowserDelegate()
- {
+ {
}
// DelegateSupport overrides --------------------------------------------------------------------
@@ -82,72 +83,78 @@
// synchronize (recursively) the client-side state
state.synchronizeWith(newDelegate.getState());
-
+
client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
getRemotingClient();
}
-
+
public void setState(HierarchicalState state)
{
super.setState(state);
-
+
client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
getRemotingClient();
}
-
-
+
+
// Closeable implementation ---------------------------------------------------------------------
-
+
public void close() throws JMSException
{
RequestSupport req = new CloseRequest(id, version);
-
+
doInvoke(client, req);
}
-
+
public void closing() throws JMSException
{
RequestSupport req = new ClosingRequest(id, version);
-
+
doInvoke(client, req);
}
// BrowserDelegate implementation ---------------------------------------------------------------
+ public void reset() throws JMSException
+ {
+ RequestSupport req = new BrowserResetRequest(id, version);
+ doInvoke(client, req);
+ }
+
public boolean hasNextMessage() throws JMSException
{
RequestSupport req = new BrowserHasNextMessageRequest(id, version);
-
- return ((Boolean)doInvoke(client, req)).booleanValue();
+
+ return ((Boolean)doInvoke(client, req)).booleanValue();
}
public JBossMessage nextMessage() throws JMSException
{
RequestSupport req = new BrowserNextMessageRequest(id, version);
-
- return (JBossMessage)doInvoke(client, req);
+
+ return (JBossMessage)doInvoke(client, req);
}
public JBossMessage[] nextMessageBlock(int maxMessages) throws JMSException
{
RequestSupport req = new BrowserNextMessageBlockRequest(id, version, maxMessages);
-
+
return (JBossMessage[])doInvoke(client, req);
}
-
+
// Streamable implementation ----------------------------------------------------------
-
+
public void read(DataInputStream in) throws Exception
{
super.read(in);
-
+
channelID = in.readLong();
}
public void write(DataOutputStream out) throws Exception
{
super.write(out);
-
+
out.writeLong(channelID);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -31,12 +31,20 @@
* of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
public interface BrowserEndpoint extends Closeable
-{
+{
+ /**
+ * Reset the internal state of the browser endpoint so the following
+ * nextMessage()/hasNextMessage()/nextMessageBlock() invocations would reflect the state of the
+ * queue at the moment of the reset.
+ */
+ void reset() throws JMSException;
+
JBossMessage nextMessage() throws JMSException;
boolean hasNextMessage() throws JMSException;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -41,6 +41,7 @@
* Concrete implementation of BrowserEndpoint.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -59,31 +60,47 @@
private int id;
private boolean closed;
+ private ServerSessionEndpoint session;
+ private Channel destination;
+ private Filter filter;
private Iterator iterator;
- private ServerSessionEndpoint session;
// Constructors ---------------------------------------------------------------------------------
ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
- Channel destination, String messageSelector)
- throws JMSException
+ Channel destination, String messageSelector) throws JMSException
{
this.session = session;
-
this.id = id;
-
- Filter filter = null;
-
+ this.destination = destination;
+
if (messageSelector != null)
{
filter = new Selector(messageSelector);
}
-
- iterator = destination.browse(filter).iterator();
}
// BrowserEndpoint implementation ---------------------------------------------------------------
+ public void reset() throws JMSException
+ {
+ try
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Browser is closed");
+ }
+
+ log.debug(this + " is being resetted");
+
+ iterator = createIterator();
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " hasNextMessage");
+ }
+ }
+
public boolean hasNextMessage() throws JMSException
{
try
@@ -93,6 +110,11 @@
throw new IllegalStateException("Browser is closed");
}
+ if (iterator == null)
+ {
+ iterator = createIterator();
+ }
+
boolean has = iterator.hasNext();
if (trace) { log.trace(this + (has ? " has": " DOESN'T have") + " a next message"); }
return has;
@@ -112,6 +134,11 @@
throw new IllegalStateException("Browser is closed");
}
+ if (iterator == null)
+ {
+ iterator = createIterator();
+ }
+
Routable r = (Routable)iterator.next();
if (trace) { log.trace(this + " returning " + r); }
@@ -140,7 +167,12 @@
{
throw new IllegalArgumentException("maxMessages must be >=2 otherwise use nextMessage");
}
-
+
+ if (iterator == null)
+ {
+ iterator = createIterator();
+ }
+
ArrayList messages = new ArrayList(maxMessages);
int i = 0;
while (i < maxMessages)
@@ -207,6 +239,11 @@
// Private --------------------------------------------------------------------------------------
+ private Iterator createIterator()
+ {
+ return destination.browse(filter).iterator();
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -38,22 +38,22 @@
*/
public class BrowserAdvised extends AdvisedSupport implements BrowserEndpoint
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
protected BrowserEndpoint endpoint;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public BrowserAdvised(BrowserEndpoint endpoint)
{
this.endpoint = endpoint;
}
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // BrowserAdvised implementation ---------------------------------
+ // BrowserAdvised implementation ----------------------------------------------------------------
public void close() throws JMSException
{
@@ -65,6 +65,11 @@
endpoint.closing();
}
+ public void reset() throws JMSException
+ {
+ endpoint.reset();
+ }
+
public boolean hasNextMessage() throws JMSException
{
return endpoint.hasNextMessage();
@@ -80,25 +85,25 @@
return endpoint.nextMessageBlock(maxMessages);
}
- // AdvisedSupport overrides --------------------------------------
+ // AdvisedSupport overrides ---------------------------------------------------------------------
public Object getEndpoint()
{
return endpoint;
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String toString()
{
return "BrowserAdvised->" + endpoint;
}
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Package Private -----------------------------------------------
+ // Package Private ------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
- // Inner Classes -------------------------------------------------
+ // Inner Classes --------------------------------------------------------------------------------
}
Added: trunk/src/main/org/jboss/jms/wireformat/BrowserResetRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/BrowserResetRequest.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/BrowserResetRequest.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -0,0 +1,74 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.wireformat;
+
+import org.jboss.jms.server.endpoint.BrowserEndpoint;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class BrowserResetRequest extends RequestSupport
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public BrowserResetRequest()
+ {
+ }
+
+ public BrowserResetRequest(int objectId, byte version)
+ {
+ super(objectId, PacketSupport.REQ_BROWSER_RESET, version);
+ }
+
+ // RequestSupport overrides ---------------------------------------------------------------------
+
+ public ResponseSupport serverInvoke() throws Exception
+ {
+ BrowserEndpoint endpoint = (BrowserEndpoint)Dispatcher.instance.getTarget(objectId);
+
+ if (endpoint == null)
+ {
+ throw new IllegalStateException("Cannot find object in dispatcher with ID " + objectId);
+ }
+
+ endpoint.reset();
+ return null;
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ super.read(is);
+ }
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+ os.flush();
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/wireformat/BrowserResetRequest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -118,6 +118,7 @@
public static final int REQ_BROWSER_NEXTMESSAGE = 501;
public static final int REQ_BROWSER_HASNEXTMESSAGE = 502;
public static final int REQ_BROWSER_NEXTMESSAGEBLOCK = 503;
+ public static final int REQ_BROWSER_RESET = 504;
// Closeable
// ---------
@@ -277,6 +278,10 @@
case REQ_BROWSER_NEXTMESSAGEBLOCK:
packet = new BrowserNextMessageBlockRequest();
break;
+ case REQ_BROWSER_RESET:
+ packet = new BrowserResetRequest();
+ break;
+
// Closeable
@@ -403,9 +408,7 @@
}
else
{
- String s = is.readUTF();
-
- return s;
+ return is.readUTF();
}
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2007-02-02 09:44:23 UTC (rev 2140)
+++ trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2007-02-02 09:45:42 UTC (rev 2141)
@@ -32,6 +32,7 @@
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -50,15 +51,14 @@
public class BrowserTest extends MessagingTestCase
{
- // Constants -----------------------------------------------------
+ // Constants -----------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
protected InitialContext initialContext;
-
-
+
protected JBossConnectionFactory cf;
protected Queue queue;
protected Topic topic;
@@ -66,48 +66,15 @@
protected Session session;
protected MessageProducer producer;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public BrowserTest(String name)
{
super(name);
}
- // TestCase overrides -------------------------------------------
-
- public void setUp() throws Exception
- {
+ // Public ---------------------------------------------------------------------------------------
- super.setUp();
- ServerManagement.start("all");
-
-
- initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
- cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
-
- ServerManagement.undeployQueue("Queue");
-
- ServerManagement.deployQueue("Queue");
- queue = (Queue)initialContext.lookup("/queue/Queue");
-
- connection = cf.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(queue);
-
- }
-
- public void tearDown() throws Exception
- {
- ServerManagement.undeployQueue("Queue");
-
- connection.close();
-
- super.tearDown();
- }
-
- // Public --------------------------------------------------------
-
-
public void testCreateBrowserOnNullDestination() throws Exception
{
try
@@ -147,9 +114,7 @@
public void testBrowse() throws Exception
{
-
- log.trace("Starting testBrowse()");
-
+ log.trace("Starting testBrowse()");
final int numMessages = 10;
@@ -189,8 +154,8 @@
assertNotNull(m);
}
- //Need to pause here since delivery is done on a different thread
- //Message might not be removed from in memory state by this point
+ // Need to pause here since delivery is done on a different thread. Message might not be
+ // removed from in memory state by this point.
Thread.sleep(2000);
@@ -210,9 +175,7 @@
assertEquals(0, count);
}
-
-
-
+
public void testBrowseWithSelector() throws Exception
{
@@ -240,14 +203,80 @@
}
assertEquals(70, count);
}
+
+ public void testGetEnumeration() throws Exception
+ {
+ // send a message to the queue
+
+ Message m = session.createTextMessage("A");
+ producer.send(m);
+
+ // make sure we can browse it
+
+ QueueBrowser browser = session.createBrowser(queue);
+
+ Enumeration en = browser.getEnumeration();
+
+ assertTrue(en.hasMoreElements());
+
+ TextMessage rm = (TextMessage)en.nextElement();
+
+ assertNotNull(rm);
+ assertEquals("A", rm.getText());
+
+ assertFalse(en.hasMoreElements());
+
+ // create a *new* enumeration, that should reset it
+
+ en = browser.getEnumeration();
+
+ assertTrue(en.hasMoreElements());
+
+ rm = (TextMessage)en.nextElement();
+
+ assertNotNull(rm);
+ assertEquals("A", rm.getText());
+
+ assertFalse(en.hasMoreElements());
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+
+ super.setUp();
+ ServerManagement.start("all");
+
+ initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ ServerManagement.undeployQueue("Queue");
+
+ ServerManagement.deployQueue("Queue");
+ queue = (Queue)initialContext.lookup("/queue/Queue");
+
+ drainDestination(cf, queue);
+
+ connection = cf.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(queue);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ ServerManagement.undeployQueue("Queue");
+
+ connection.close();
+
+ super.tearDown();
+ }
+
+ // Private --------------------------------------------------------------------------------------
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list