[jboss-cvs] JBoss Messaging SVN: r6059 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 11 08:56:14 EDT 2009
Author: ataylor
Date: 2009-03-11 08:56:14 -0400 (Wed, 11 Mar 2009)
New Revision: 6059
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateQueueTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateConsumerTest.java
Log:
ClientSession tests and improved exception handling
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-10 19:30:29 UTC (rev 6058)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -22,19 +22,9 @@
package org.jboss.messaging.core.postoffice.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
@@ -60,14 +50,25 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.Transaction.State;
import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
-import org.jboss.messaging.core.transaction.Transaction.State;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.TypedProperties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* A PostOfficeImpl
*
@@ -512,6 +513,10 @@
public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
{
Binding binding = addressManager.removeBinding(uniqueName);
+ if(binding == null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-03-10 19:30:29 UTC (rev 6058)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -24,12 +24,9 @@
import org.jboss.messaging.core.postoffice.AddressManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
-import org.jboss.messaging.utils.ConcurrentHashSet;
-import org.jboss.messaging.utils.ConcurrentSet;
import org.jboss.messaging.utils.SimpleString;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -61,7 +58,7 @@
if (binding == null)
{
- throw new IllegalStateException("Queue is not bound " + uniqueName);
+ return null;
}
removeBindingInternal(binding.getAddress(), uniqueName);
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java (from rev 6055, trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -0,0 +1,163 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.impl.SoloQueueImpl;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientSessionCreateAndDeleteQueueTest extends ServiceTestBase
+{
+ private MessagingService messagingService;
+
+ private SimpleString address = new SimpleString("address");
+
+ private SimpleString queueName = new SimpleString("queue");
+
+
+ public void testDurableFalse() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ session.createQueue(address, queueName, false);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ Queue q = (Queue) binding.getBindable();
+ assertFalse(q.isDurable());
+
+ session.close();
+ }
+
+ public void testDurableTrue() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ session.createQueue(address, queueName, true);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ Queue q = (Queue) binding.getBindable();
+ assertTrue(q.isDurable());
+
+ session.close();
+ }
+
+ public void testTemporaryFalse() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ session.createQueue(address, queueName, false, false);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ Queue q = (Queue) binding.getBindable();
+ assertFalse(q.isTemporary());
+
+ session.close();
+ }
+
+ public void testTemporaryTrue() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ session.createQueue(address, queueName, true, true);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ Queue q = (Queue) binding.getBindable();
+ assertTrue(q.isTemporary());
+
+ session.close();
+ }
+
+ public void testcreateWithFilter() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ SimpleString filterString = new SimpleString("x=y");
+ session.createQueue(address, queueName, filterString, false, false);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ Queue q = (Queue) binding.getBindable();
+ assertEquals(q.getFilter().getFilterString(), filterString);
+
+ session.close();
+ }
+
+ public void testAddressSettingUSed() throws Exception
+ {
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setSoloQueue(true);
+ messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ SimpleString filterString = new SimpleString("x=y");
+ session.createQueue(address, queueName, filterString, false, false);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ assertTrue(binding.getBindable() instanceof SoloQueueImpl);
+
+ session.close();
+ }
+
+ public void testDeleteQueue() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ session.createQueue(address, queueName, false);
+ Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ assertNotNull(binding);
+ session.deleteQueue(queueName);
+ binding = messagingService.getServer().getPostOffice().getBinding(queueName);
+ assertNull(binding);
+ session.close();
+ }
+
+ public void testDeleteQueueNotExist() throws Exception
+ {
+ ClientSession session = createInVMFactory().createSession(false, true, true);
+ try
+ {
+ session.deleteQueue(queueName);
+ fail("should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.QUEUE_DOES_NOT_EXIST, e.getCode());
+ }
+ session.close();
+ }
+
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ messagingService = createService(false);
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if(messagingService != null && messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+
+ super.tearDown();
+
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateConsumerTest.java 2009-03-10 19:30:29 UTC (rev 6058)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateConsumerTest.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -28,6 +28,8 @@
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.tests.util.ServiceTestBase;
+import java.io.File;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -158,7 +160,7 @@
}
}
- public void testCreateConsumerWithBrowseOnly2() throws Exception
+ public void testCreateConsumerWithOverrides() throws Exception
{
MessagingService service = createService(false);
try
@@ -179,4 +181,149 @@
service.stop();
}
}
+
+ public void testCreateFileConsumerTest() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientConsumer consumer = clientSession.createFileConsumer(new File(""), queueName);
+ assertNotNull(consumer);
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+
+ public void testCreateFileConsumerNoQ() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ try
+ {
+ clientSession.createFileConsumer(new File(""), queueName);
+ fail("should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(e.getCode(), MessagingException.QUEUE_DOES_NOT_EXIST);
+ }
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+
+ public void testCreateFileConsumerWithFilter() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientConsumer consumer = clientSession.createFileConsumer(new File(""), queueName, "foo=bar");
+ assertNotNull(consumer);
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+
+ public void testCreateFileConsumerWithInvalidFilter() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ try
+ {
+ clientSession.createFileConsumer(new File(""), queueName, "foobar");
+ fail("should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(e.getCode(), MessagingException.INVALID_FILTER_EXPRESSION);
+ }
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+
+ public void testCreateFileConsumerWithBrowseOnly() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientConsumer consumer = clientSession.createFileConsumer(new File(""), queueName, null, true);
+ assertNotNull(consumer);
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+
+ public void testCreateFileConsumerWithOverrides() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerMaxRate(99);
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSessionInternal clientSession = (ClientSessionInternal) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientConsumer consumer = clientSession.createFileConsumer(new File(""), queueName, null, 100, 100, false);
+ assertNotNull(consumer);
+ clientSession.close();
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateQueueTest.java 2009-03-10 19:30:29 UTC (rev 6058)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateQueueTest.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -1,134 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.impl.SoloQueueImpl;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ClientSessionCreateQueueTest extends ServiceTestBase
-{
- private MessagingService messagingService;
-
- private SimpleString address = new SimpleString("address");
-
- private SimpleString queueName = new SimpleString("queue");
-
-
- public void testDurableFalse() throws Exception
- {
- ClientSession session = createInVMFactory().createSession(false, true, true);
- session.createQueue(address, queueName, false);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- Queue q = (Queue) binding.getBindable();
- assertFalse(q.isDurable());
-
- session.close();
- }
-
- public void testDurableTrue() throws Exception
- {
- ClientSession session = createInVMFactory().createSession(false, true, true);
- session.createQueue(address, queueName, true);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- Queue q = (Queue) binding.getBindable();
- assertTrue(q.isDurable());
-
- session.close();
- }
-
- public void testTemporaryFalse() throws Exception
- {
- ClientSession session = createInVMFactory().createSession(false, true, true);
- session.createQueue(address, queueName, false, false);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- Queue q = (Queue) binding.getBindable();
- assertFalse(q.isTemporary());
-
- session.close();
- }
-
- public void testTemporaryTrue() throws Exception
- {
- ClientSession session = createInVMFactory().createSession(false, true, true);
- session.createQueue(address, queueName, true, true);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- Queue q = (Queue) binding.getBindable();
- assertTrue(q.isTemporary());
-
- session.close();
- }
-
- public void testcreateWithFilter() throws Exception
- {
- ClientSession session = createInVMFactory().createSession(false, true, true);
- SimpleString filterString = new SimpleString("x=y");
- session.createQueue(address, queueName, filterString, false, false);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- Queue q = (Queue) binding.getBindable();
- assertEquals(q.getFilter().getFilterString(), filterString);
-
- session.close();
- }
-
- public void testAddressSettingUSed() throws Exception
- {
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setSoloQueue(true);
- messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
- ClientSession session = createInVMFactory().createSession(false, true, true);
- SimpleString filterString = new SimpleString("x=y");
- session.createQueue(address, queueName, filterString, false, false);
- Binding binding = messagingService.getServer().getPostOffice().getBinding(queueName);
- assertTrue(binding.getBindable() instanceof SoloQueueImpl);
-
- session.close();
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- messagingService = createService(false);
- messagingService.start();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if(messagingService != null && messagingService.isStarted())
- {
- messagingService.stop();
- }
-
- super.tearDown();
-
- }
-}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionTest.java 2009-03-11 12:56:14 UTC (rev 6059)
@@ -0,0 +1,670 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientFileMessageInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This test covers the API for ClientSession altho XA tests are tested seperately.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientSessionTest extends ServiceTestBase
+{
+ private String queueName = "ClientSessionTestQ";
+
+ public void testFailureListener() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ final CountDownLatch latch = new CountDownLatch(1);
+ clientSession.addFailureListener(new FailureListener()
+ {
+ public boolean connectionFailed(MessagingException me)
+ {
+ latch.countDown();
+ return false;
+ }
+ });
+
+ service.stop();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testFailureListenerRemoved() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ class MyFailureListener implements FailureListener
+ {
+ boolean called = false;
+
+ public boolean connectionFailed(MessagingException me)
+ {
+ called = true;
+ return false;
+ }
+ }
+
+ MyFailureListener listener = new MyFailureListener();
+ clientSession.addFailureListener(listener);
+
+ assertTrue(clientSession.removeFailureListener(listener));
+ service.stop();
+ assertFalse(listener.called);
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testBindingQuery() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ clientSession.createQueue("a1", "q1", false);
+ clientSession.createQueue("a1", "q2", false);
+ clientSession.createQueue("a2", "q3", false);
+ clientSession.createQueue("a2", "q4", false);
+ clientSession.createQueue("a2", "q5", false);
+ SessionBindingQueryResponseMessage resp = clientSession.bindingQuery(new SimpleString("a"));
+ List<SimpleString> queues = resp.getQueueNames();
+ assertTrue(queues.isEmpty());
+ resp = clientSession.bindingQuery(new SimpleString("a1"));
+ queues = resp.getQueueNames();
+ assertEquals(queues.size(), 2);
+ assertTrue(queues.contains(new SimpleString("q1")));
+ assertTrue(queues.contains(new SimpleString("q2")));
+ resp = clientSession.bindingQuery(new SimpleString("a2"));
+ queues = resp.getQueueNames();
+ assertEquals(queues.size(), 3);
+ assertTrue(queues.contains(new SimpleString("q3")));
+ assertTrue(queues.contains(new SimpleString("q4")));
+ assertTrue(queues.contains(new SimpleString("q5")));
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testQueueQuery() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ clientSession.createQueue("a1", queueName, false);
+ clientSession.createConsumer(queueName);
+ clientSession.createConsumer(queueName);
+ ClientProducer cp = clientSession.createProducer("a1");
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+ assertEquals(new SimpleString("a1"), resp.getAddress());
+ assertEquals(2, resp.getConsumerCount());
+ assertEquals(2, resp.getMessageCount());
+ assertEquals(null, resp.getFilterString());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testQueueQueryWithFilter() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ clientSession.createQueue("a1", queueName, "foo=bar", false, false);
+ clientSession.createConsumer(queueName);
+ clientSession.createConsumer(queueName);
+ SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+ assertEquals(new SimpleString("a1"), resp.getAddress());
+ assertEquals(2, resp.getConsumerCount());
+ assertEquals(0, resp.getMessageCount());
+ assertEquals(new SimpleString("foo=bar"), resp.getFilterString());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testQueueQueryNoQ() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+ assertFalse(resp.isExists());
+ assertEquals(null, resp.getAddress());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testClose() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientProducer p = clientSession.createProducer();
+ ClientProducer p1 = clientSession.createProducer(queueName);
+ ClientConsumer c = clientSession.createConsumer(queueName);
+ ClientConsumer c1 = clientSession.createConsumer(queueName);
+ clientSession.close();
+ assertTrue(clientSession.isClosed());
+ assertTrue(p.isClosed());
+ assertTrue(p1.isClosed());
+ assertTrue(c.isClosed());
+ assertTrue(c1.isClosed());
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientMessageNonDurable() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientMessage clientMessage = clientSession.createClientMessage(false);
+ assertFalse(clientMessage.isDurable());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientMessageDurable() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientMessage clientMessage = clientSession.createClientMessage(true);
+ assertTrue(clientMessage.isDurable());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientMessageType() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientMessage clientMessage = clientSession.createClientMessage((byte) 99, false);
+ assertEquals((byte) 99, clientMessage.getType());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientMessageOverrides() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientMessage clientMessage = clientSession.createClientMessage((byte) 88, false, 100l, 300l, (byte) 33);
+ assertEquals((byte) 88, clientMessage.getType());
+ assertEquals(100l, clientMessage.getExpiration());
+ assertEquals(300l, clientMessage.getTimestamp());
+ assertEquals((byte) 33, clientMessage.getPriority());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientFileMessageNonDurable() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientFileMessageInternal clientMessage = (ClientFileMessageInternal) clientSession.createFileMessage(false);
+ assertEquals(false, clientMessage.isDurable());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCreateClientFileMessageDurable() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ ClientFileMessageInternal clientMessage = (ClientFileMessageInternal) clientSession.createFileMessage(true);
+ assertEquals(true, clientMessage.isDurable());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testGetVersion() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession clientSession = cf.createSession(false, true, true);
+ assertEquals(service.getServer().getVersion().getIncrementingVersion(), clientSession.getVersion());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testStart() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ clientSession.start();
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testStop() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ clientSession.start();
+ clientSession.stop();
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCommitWithSend() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, false, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientProducer cp = clientSession.createProducer(queueName);
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ Queue q = (Queue) service.getServer().getPostOffice().getBinding(new SimpleString(queueName)).getBindable();
+ assertEquals(0, q.getMessageCount());
+ clientSession.commit();
+ assertEquals(10, q.getMessageCount());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testRollbackWithSend() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, false, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientProducer cp = clientSession.createProducer(queueName);
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ Queue q = (Queue) service.getServer().getPostOffice().getBinding(new SimpleString(queueName)).getBindable();
+ assertEquals(0, q.getMessageCount());
+ clientSession.rollback();
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ clientSession.commit();
+ assertEquals(2, q.getMessageCount());
+ clientSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testCommitWithReceive() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnPersistentSend(true);
+ ClientSessionImpl sendSession = (ClientSessionImpl) cf.createSession(false, true, true);
+ ClientProducer cp = sendSession.createProducer(queueName);
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, true, false);
+ clientSession.createQueue(queueName, queueName, false);
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ Queue q = (Queue) service.getServer().getPostOffice().getBinding(new SimpleString(queueName)).getBindable();
+ assertEquals(10, q.getMessageCount());
+ ClientConsumer cc = clientSession.createConsumer(queueName);
+ clientSession.start();
+ ClientMessage m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ clientSession.commit();
+ assertEquals(0, q.getMessageCount());
+ clientSession.close();
+ sendSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testRollbackWithReceive() throws Exception
+ {
+ MessagingService service = createService(false);
+ try
+ {
+ service.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnPersistentSend(true);
+ ClientSessionImpl sendSession = (ClientSessionImpl) cf.createSession(false, true, true);
+ ClientProducer cp = sendSession.createProducer(queueName);
+ ClientSessionImpl clientSession = (ClientSessionImpl) cf.createSession(false, true, false);
+ clientSession.createQueue(queueName, queueName, false);
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ cp.send(clientSession.createClientMessage(false));
+ Queue q = (Queue) service.getServer().getPostOffice().getBinding(new SimpleString(queueName)).getBindable();
+ assertEquals(10, q.getMessageCount());
+ ClientConsumer cc = clientSession.createConsumer(queueName);
+ clientSession.start();
+ ClientMessage m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ m = cc.receive(5);
+ assertNotNull(m);
+ m.acknowledge();
+ clientSession.rollback();
+ assertEquals(10, q.getMessageCount());
+ clientSession.close();
+ sendSession.close();
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list