[hornetq-commits] JBoss hornetq SVN: r9380 - in trunk: tests/src/org/hornetq/tests/integration/ra and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 6 12:45:44 EDT 2010


Author: ataylor
Date: 2010-07-06 12:45:43 -0400 (Tue, 06 Jul 2010)
New Revision: 9380

Added:
   trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
   trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
   trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
Modified:
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
   trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Log:
HORNETQ-140 - JCA Integration tests and a couple of tweaks


Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-07-02 12:01:33 UTC (rev 9379)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -105,10 +105,6 @@
 
    // Whether we are in the failure recovery loop
    private AtomicBoolean inFailure = new AtomicBoolean(false);
-
-   private final int setupAttempts = 10;
-
-   private final long setupInterval = 2 * 1000;
    
    static
    {
@@ -520,13 +516,13 @@
          return;
       try
       {
-         while (deliveryActive.get() && reconnectCount < setupAttempts)
+         while (deliveryActive.get() && reconnectCount < spec.getSetupAttempts())
          {
             teardown();
 
             try
             {
-               Thread.sleep(setupInterval);
+               Thread.sleep(spec.getSetupInterval());
             }
             catch (InterruptedException e)
             {

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java	2010-07-02 12:01:33 UTC (rev 9379)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -40,6 +40,10 @@
 {
    private static final int DEFAULT_MAX_SESSION = 15;
 
+   private static final int DEFAULT_SETUP_ATTEMPTS = 10;
+
+   private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
    /** The logger */
    private static final Logger log = Logger.getLogger(HornetQActivationSpec.class);
 
@@ -89,6 +93,10 @@
    /* use local tx instead of XA*/
    private Boolean localTx;
 
+   private int setupAttempts;
+
+   private long setupInterval;
+
    /**
     * Constructor
     */
@@ -110,6 +118,8 @@
       password = null;
       maxSession = DEFAULT_MAX_SESSION;
       transactionTimeout = 0;
+      setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+      setupInterval = DEFAULT_SETUP_INTERVAL;
    }
 
    /**
@@ -530,6 +540,26 @@
       this.localTx = localTx;
    }
 
+   public int getSetupAttempts()
+   {
+      return setupAttempts;
+   }
+
+   public void setSetupAttempts(int setupAttempts)
+   {
+      this.setupAttempts = setupAttempts;
+   }
+
+   public long getSetupInterval()
+   {
+      return setupInterval;
+   }
+
+   public void setSetupInterval(long setupInterval)
+   {
+      this.setupInterval = setupInterval;
+   }
+
    /**
     * Validate
     * @exception InvalidPropertyException Thrown if a validation exception occurs

Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+
+import javax.resource.ResourceException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Jul 6, 2010
+ */
+public class HornetQMessageHandlerSecurityTest extends HornetQRATestBase
+{
+   @Override
+   public boolean isSecure()
+   {
+      return true;
+   }
+
+   public void testSimpleMessageReceivedOnQueueWithSecurityFails() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      spec.setUser("dodgyuser");
+      spec.setPassword("dodgypassword");
+      spec.setSetupAttempts(0);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+      assertEquals(((LocalQueueBinding)binding).getQueue().getConsumerCount(), 0);
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testSimpleMessageReceivedOnQueueWithSecuritySucceeds() throws Exception
+   {
+      server.getSecurityManager().addUser("testuser", "testpassword");
+      server.getSecurityManager().addRole("testuser", "arole");
+      Role role = new Role("arole", false, true, false, false, false, false, false);
+         Set<Role> roles = new HashSet<Role>();
+         roles.add(role);
+       server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      spec.setUser("testuser");
+      spec.setPassword("testpassword");
+      spec.setSetupAttempts(0);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+      assertEquals(((LocalQueueBinding)binding).getQueue().getConsumerCount(), 15);
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	2010-07-02 12:01:33 UTC (rev 9379)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -15,25 +15,10 @@
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.ra.HornetQResourceAdapter;
 import org.hornetq.ra.inflow.HornetQActivationSpec;
-import org.hornetq.tests.util.ServiceTestBase;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.resource.ResourceException;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.UnavailableException;
-import javax.resource.spi.XATerminator;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.*;
-import javax.transaction.xa.XAResource;
-import java.lang.reflect.Method;
-import java.util.Timer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -41,65 +26,172 @@
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *         Created May 20, 2010
  */
-public class HornetQMessageHandlerTest  extends ServiceTestBase
+public class HornetQMessageHandlerTest extends HornetQRATestBase
 {
-   private Configuration configuration;
+   @Override
+   public boolean isSecure()
+   {
+      return false;
+   }
 
-   private HornetQServer server;
+   public void testSimpleMessageReceivedOnQueue() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("teststring");
+      clientProducer.send(message);
+      session.close();
+      latch.await(5, TimeUnit.SECONDS);
 
-   @Override
-   protected void setUp() throws Exception
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testSimpleMessageReceivedOnQueueWithSelector() throws Exception
    {
-      super.setUp();
-      clearData();
-      configuration = createDefaultConfig();
-      configuration.setSecurityEnabled(false);
-      server = createServer(true, configuration);
-      server.start();
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      spec.setMessageSelector("color='red'");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("blue");
+      message.putStringProperty("color", "blue");
+      clientProducer.send(message);
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("red");
+      message.putStringProperty("color", "red");
+      clientProducer.send(message);
+      session.close();
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "red");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
    }
 
-   @Override
-   protected void tearDown() throws Exception
+   public void testEndpointDeactivated() throws Exception
    {
-      if (server != null)
-      {
-         try
-         {
-            server.stop();
-            server = null;
-         }
-         catch (Exception e)
-         {
-            // ignore
-         }
-      }
-      super.tearDown();
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+      assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 15);
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+      assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 0);
+      assertTrue(endpoint.released);
    }
 
-   public void testSelectorChangedWithTopic() throws Exception
+   public void testMaxSessions() throws Exception
    {
       HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
       MyBootstrapContext ctx = new MyBootstrapContext();
       qResourceAdapter.start(ctx);
       HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setMaxSession(1);
       spec.setResourceAdapter(qResourceAdapter);
       spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+      assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 1);
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testSimpleTopic() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
       spec.setDestinationType("javax.jms.Topic");
       spec.setDestination("mdbTopic");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("test");
+      clientProducer.send(message);
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testDurableSubscription() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
       spec.setSubscriptionDurability("Durable");
       spec.setSubscriptionName("durable-mdb");
       spec.setClientID("id-1");
-      spec.setMessageSelector("foo='bar'");
       qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
       CountDownLatch latch = new CountDownLatch(1);
       DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
-      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
       qResourceAdapter.endpointActivation(endpointFactory, spec);
       ClientSession session = createFactory(false).createSession();
       ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
       ClientMessage message = session.createMessage(true);
       message.getBodyBuffer().writeString("1");
-      message.putStringProperty("foo", "bar");
       clientProducer.send(message);
 
       latch.await(5, TimeUnit.SECONDS);
@@ -111,18 +203,20 @@
 
       message = session.createMessage(true);
       message.getBodyBuffer().writeString("2");
-      message.putStringProperty("foo", "bar");
       clientProducer.send(message);
 
       latch = new CountDownLatch(1);
       endpoint = new DummyMessageEndpoint(latch);
-      //change the selector forcing the queue to be recreated
-      spec.setMessageSelector("foo='abar'");
-      endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
       qResourceAdapter.endpointActivation(endpointFactory, spec);
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "2");
+      latch = new CountDownLatch(1);
+      endpoint.reset(latch);
       message = session.createMessage(true);
       message.getBodyBuffer().writeString("3");
-      message.putStringProperty("foo", "abar");
       clientProducer.send(message);
       latch.await(5, TimeUnit.SECONDS);
 
@@ -131,8 +225,8 @@
       qResourceAdapter.endpointDeactivation(endpointFactory, spec);
    }
 
-   public void testSelectorNotChangedWithTopic() throws Exception
-   {      
+   public void testNonDurableSubscription() throws Exception
+   {
       HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
       MyBootstrapContext ctx = new MyBootstrapContext();
       qResourceAdapter.start(ctx);
@@ -141,20 +235,15 @@
       spec.setUseJNDI(false);
       spec.setDestinationType("javax.jms.Topic");
       spec.setDestination("mdbTopic");
-      spec.setSubscriptionDurability("Durable");
-      spec.setSubscriptionName("durable-mdb");
-      spec.setClientID("id-1");
-      spec.setMessageSelector("foo='bar'");
       qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
       CountDownLatch latch = new CountDownLatch(1);
       DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
-      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
       qResourceAdapter.endpointActivation(endpointFactory, spec);
       ClientSession session = createFactory(false).createSession();
       ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
       ClientMessage message = session.createMessage(true);
       message.getBodyBuffer().writeString("1");
-      message.putStringProperty("foo", "bar");
       clientProducer.send(message);
 
       latch.await(5, TimeUnit.SECONDS);
@@ -166,123 +255,126 @@
 
       message = session.createMessage(true);
       message.getBodyBuffer().writeString("2");
-      message.putStringProperty("foo", "bar");
       clientProducer.send(message);
 
       latch = new CountDownLatch(1);
       endpoint = new DummyMessageEndpoint(latch);
-      endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
       qResourceAdapter.endpointActivation(endpointFactory, spec);
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("3");
+      clientProducer.send(message);
       latch.await(5, TimeUnit.SECONDS);
 
       assertNotNull(endpoint.lastMessage);
-      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "2");
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "3");
       qResourceAdapter.endpointDeactivation(endpointFactory, spec);
-
    }
 
-   class DummyMessageEndpointFactory implements MessageEndpointFactory
+   public void testSelectorChangedWithTopic() throws Exception
    {
-      private DummyMessageEndpoint endpoint;
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
+      spec.setSubscriptionDurability("Durable");
+      spec.setSubscriptionName("durable-mdb");
+      spec.setClientID("id-1");
+      spec.setMessageSelector("foo='bar'");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("1");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
 
-      public DummyMessageEndpointFactory(DummyMessageEndpoint endpoint)
-      {
-         this.endpoint = endpoint;
-      }
+      latch.await(5, TimeUnit.SECONDS);
 
-      public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException
-      {
-         return endpoint;
-      }
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
 
-      public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
-      {
-         return false;
-      }
-   }
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
 
-   class DummyMessageEndpoint implements MessageEndpoint, MessageListener
-   {
-      public CountDownLatch latch;
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("2");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
 
-      private HornetQMessage lastMessage;
+      latch = new CountDownLatch(1);
+      endpoint = new DummyMessageEndpoint(latch);
+      //change the selector forcing the queue to be recreated
+      spec.setMessageSelector("foo='abar'");
+      endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("3");
+      message.putStringProperty("foo", "abar");
+      clientProducer.send(message);
+      latch.await(5, TimeUnit.SECONDS);
 
-      public DummyMessageEndpoint(CountDownLatch latch)
-      {
-         this.latch = latch;
-      }
-
-      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
-      {
-
-      }
-
-      public void afterDelivery() throws ResourceException
-      {      
-         if(latch != null)
-         {
-            latch.countDown();
-         }
-      }
-
-      public void release()
-      {
-
-      }
-
-      public void onMessage(Message message)
-      {
-         lastMessage = (HornetQMessage) message;
-      }
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "3");
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
    }
 
-   class MyBootstrapContext implements BootstrapContext
+   public void testSelectorNotChangedWithTopic() throws Exception
    {
-      WorkManager workManager = new DummyWorkManager();
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
+      spec.setSubscriptionDurability("Durable");
+      spec.setSubscriptionName("durable-mdb");
+      spec.setClientID("id-1");
+      spec.setMessageSelector("foo='bar'");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("1");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
 
-      public Timer createTimer() throws UnavailableException
-      {
-         return null;
-      }
+      latch.await(5, TimeUnit.SECONDS);
 
-      public WorkManager getWorkManager()
-      {
-         return workManager;
-      }
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
 
-      public XATerminator getXATerminator()
-      {
-         return null;
-      }
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
 
-      class DummyWorkManager implements WorkManager
-      {
-         public void doWork(Work work) throws WorkException
-         {
-         }
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("2");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
 
-         public void doWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
-         {
-         }
+      latch = new CountDownLatch(1);
+      endpoint = new DummyMessageEndpoint(latch);
+      endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      latch.await(5, TimeUnit.SECONDS);
 
-         public long startWork(Work work) throws WorkException
-         {
-            return 0;
-         }
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "2");
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
 
-         public long startWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
-         {
-            return 0;
-         }
+   }
 
-         public void scheduleWork(Work work) throws WorkException
-         {
-            work.run();
-         }
 
-         public void scheduleWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
-         {
-         }
-      }
-   }
 }

Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.utils.UUIDGenerator;
+
+import javax.resource.ResourceException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Jul 6, 2010
+ */
+public class HornetQMessageHandlerXATest extends HornetQRATestBase
+{
+   @Override
+   public boolean isSecure()
+   {
+      return false;
+   }
+
+   public void testXACommit() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      XADummyEndpoint endpoint = new XADummyEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("teststring");
+      clientProducer.send(message);
+      session.close();
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+      endpoint.prepare();
+      endpoint.commit();
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testXARollback() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setMaxSession(1);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      XADummyEndpoint endpoint = new XADummyEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("teststring");
+      clientProducer.send(message);
+      session.close();
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+      latch = new CountDownLatch(1);
+      endpoint.reset(latch);
+      endpoint.rollback();
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   class XADummyEndpoint extends DummyMessageEndpoint
+   {
+      private Xid xid;
+
+      public XADummyEndpoint(CountDownLatch latch)
+      {
+         super(latch);
+         xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+      }
+
+      @Override
+      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
+      {
+         super.beforeDelivery(method);
+         try
+         {
+            xaResource.start(xid, XAResource.TMNOFLAGS);
+         }
+         catch (XAException e)
+         {
+            throw new ResourceException(e.getMessage(), e);
+         }
+      }
+
+      @Override
+      public void afterDelivery() throws ResourceException
+      {
+         try
+         {
+            xaResource.end(xid, XAResource.TMSUCCESS);
+         }
+         catch (XAException e)
+         {
+            throw new ResourceException(e.getMessage(), e);
+         }
+
+         super.afterDelivery();
+      }
+
+      public void rollback() throws XAException
+      {
+         xaResource.rollback(xid);
+      }
+
+      public void prepare() throws XAException
+      {
+         xaResource.prepare(xid);
+      }
+
+      public void commit() throws XAException
+      {
+         xaResource.commit(xid, false);
+      }
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java	2010-07-06 16:45:43 UTC (rev 9380)
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.*;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Jul 6, 2010
+ */
+public abstract class HornetQRATestBase  extends ServiceTestBase
+{
+   protected Configuration configuration;
+
+   protected HornetQServer server;
+
+   protected static final String MDBQUEUE = "mdbQueue";
+   protected static final String MDBQUEUEPREFIXED = "jms.queue.mdbQueue";
+   protected static final SimpleString MDBQUEUEPREFIXEDSIMPLE = new SimpleString("jms.queue.mdbQueue");
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      clearData();
+      configuration = createDefaultConfig();
+      configuration.setSecurityEnabled(isSecure());
+      server = createServer(true, configuration);
+      server.start();
+      server.createQueue(MDBQUEUEPREFIXEDSIMPLE, MDBQUEUEPREFIXEDSIMPLE, null, true, false);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (server != null)
+      {
+         try
+         {
+            server.stop();
+            server = null;
+         }
+         catch (Exception e)
+         {
+            // ignore
+         }
+      }
+      super.tearDown();
+   }
+    public abstract boolean isSecure();
+
+   class DummyMessageEndpointFactory implements MessageEndpointFactory
+   {
+      private DummyMessageEndpoint endpoint;
+
+      private final boolean isDeliveryTransacted;
+
+      public DummyMessageEndpointFactory(DummyMessageEndpoint endpoint, boolean deliveryTransacted)
+      {
+         this.endpoint = endpoint;
+         isDeliveryTransacted = deliveryTransacted;
+      }
+
+      public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException
+      {
+         if(xaResource != null)
+         {
+            endpoint.setXAResource(xaResource);
+         }
+         return endpoint;
+      }
+
+      public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
+      {
+         return isDeliveryTransacted;
+      }
+   }
+
+   class DummyMessageEndpoint implements MessageEndpoint, MessageListener
+   {
+      public CountDownLatch latch;
+
+      public HornetQMessage lastMessage;
+
+      public boolean released = false;
+
+      public XAResource xaResource;
+
+      public DummyMessageEndpoint(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
+      {
+
+      }
+
+      public void afterDelivery() throws ResourceException
+      {
+         if(latch != null)
+         {
+            latch.countDown();
+         }
+         System.out.println("lastMessage = " + lastMessage);
+      }
+
+      public void release()
+      {
+         released = true;   
+      }
+
+      public void onMessage(Message message)
+      {
+         lastMessage = (HornetQMessage) message;
+      }
+
+      public void reset(CountDownLatch latch)
+      {
+         this.latch = latch;
+         lastMessage = null;
+      }
+
+      public void setXAResource(XAResource xaResource)
+      {
+         this.xaResource = xaResource;
+      }
+   }
+
+   class MyBootstrapContext implements BootstrapContext
+   {
+      WorkManager workManager = new DummyWorkManager();
+
+      public Timer createTimer() throws UnavailableException
+      {
+         return null;
+      }
+
+      public WorkManager getWorkManager()
+      {
+         return workManager;
+      }
+
+      public XATerminator getXATerminator()
+      {
+         return null;
+      }
+
+      class DummyWorkManager implements WorkManager
+      {
+         public void doWork(Work work) throws WorkException
+         {
+         }
+
+         public void doWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+         }
+
+         public long startWork(Work work) throws WorkException
+         {
+            return 0;
+         }
+
+         public long startWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+            return 0;
+         }
+
+         public void scheduleWork(Work work) throws WorkException
+         {
+            work.run();
+         }
+
+         public void scheduleWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+         }
+      }
+   }
+}



More information about the hornetq-commits mailing list