Author: ataylor
Date: 2010-07-06 12:45:43 -0400 (Tue, 06 Jul 2010)
New Revision: 9380
Added:
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Log:
HORNETQ-140 - JCA Integration tests and a couple of tweaks
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-07-02 12:01:33 UTC
(rev 9379)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-07-06 16:45:43 UTC
(rev 9380)
@@ -105,10 +105,6 @@
// Whether we are in the failure recovery loop
private AtomicBoolean inFailure = new AtomicBoolean(false);
-
- private final int setupAttempts = 10;
-
- private final long setupInterval = 2 * 1000;
static
{
@@ -520,13 +516,13 @@
return;
try
{
- while (deliveryActive.get() && reconnectCount < setupAttempts)
+ while (deliveryActive.get() && reconnectCount <
spec.getSetupAttempts())
{
teardown();
try
{
- Thread.sleep(setupInterval);
+ Thread.sleep(spec.getSetupInterval());
}
catch (InterruptedException e)
{
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-07-02 12:01:33
UTC (rev 9379)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-07-06 16:45:43
UTC (rev 9380)
@@ -40,6 +40,10 @@
{
private static final int DEFAULT_MAX_SESSION = 15;
+ private static final int DEFAULT_SETUP_ATTEMPTS = 10;
+
+ private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
/** The logger */
private static final Logger log = Logger.getLogger(HornetQActivationSpec.class);
@@ -89,6 +93,10 @@
/* use local tx instead of XA*/
private Boolean localTx;
+ private int setupAttempts;
+
+ private long setupInterval;
+
/**
* Constructor
*/
@@ -110,6 +118,8 @@
password = null;
maxSession = DEFAULT_MAX_SESSION;
transactionTimeout = 0;
+ setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+ setupInterval = DEFAULT_SETUP_INTERVAL;
}
/**
@@ -530,6 +540,26 @@
this.localTx = localTx;
}
+ public int getSetupAttempts()
+ {
+ return setupAttempts;
+ }
+
+ public void setSetupAttempts(int setupAttempts)
+ {
+ this.setupAttempts = setupAttempts;
+ }
+
+ public long getSetupInterval()
+ {
+ return setupInterval;
+ }
+
+ public void setSetupInterval(long setupInterval)
+ {
+ this.setupInterval = setupInterval;
+ }
+
/**
* Validate
* @exception InvalidPropertyException Thrown if a validation exception occurs
Added:
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java 2010-07-06
16:45:43 UTC (rev 9380)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+
+import javax.resource.ResourceException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Jul 6, 2010
+ */
+public class HornetQMessageHandlerSecurityTest extends HornetQRATestBase
+{
+ @Override
+ public boolean isSecure()
+ {
+ return true;
+ }
+
+ public void testSimpleMessageReceivedOnQueueWithSecurityFails() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ spec.setUser("dodgyuser");
+ spec.setPassword("dodgypassword");
+ spec.setSetupAttempts(0);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+ assertEquals(((LocalQueueBinding)binding).getQueue().getConsumerCount(), 0);
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ public void testSimpleMessageReceivedOnQueueWithSecuritySucceeds() throws Exception
+ {
+ server.getSecurityManager().addUser("testuser",
"testpassword");
+ server.getSecurityManager().addRole("testuser", "arole");
+ Role role = new Role("arole", false, true, false, false, false, false,
false);
+ Set<Role> roles = new HashSet<Role>();
+ roles.add(role);
+ server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ spec.setUser("testuser");
+ spec.setPassword("testpassword");
+ spec.setSetupAttempts(0);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+ assertEquals(((LocalQueueBinding)binding).getQueue().getConsumerCount(), 15);
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2010-07-02
12:01:33 UTC (rev 9379)
+++
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2010-07-06
16:45:43 UTC (rev 9380)
@@ -15,25 +15,10 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
-import org.hornetq.tests.util.ServiceTestBase;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.resource.ResourceException;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.UnavailableException;
-import javax.resource.spi.XATerminator;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.*;
-import javax.transaction.xa.XAResource;
-import java.lang.reflect.Method;
-import java.util.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -41,65 +26,172 @@
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created May 20, 2010
*/
-public class HornetQMessageHandlerTest extends ServiceTestBase
+public class HornetQMessageHandlerTest extends HornetQRATestBase
{
- private Configuration configuration;
+ @Override
+ public boolean isSecure()
+ {
+ return false;
+ }
- private HornetQServer server;
+ public void testSimpleMessageReceivedOnQueue() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("teststring");
+ clientProducer.send(message);
+ session.close();
+ latch.await(5, TimeUnit.SECONDS);
- @Override
- protected void setUp() throws Exception
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"teststring");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ public void testSimpleMessageReceivedOnQueueWithSelector() throws Exception
{
- super.setUp();
- clearData();
- configuration = createDefaultConfig();
- configuration.setSecurityEnabled(false);
- server = createServer(true, configuration);
- server.start();
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ spec.setMessageSelector("color='red'");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("blue");
+ message.putStringProperty("color", "blue");
+ clientProducer.send(message);
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeString("red");
+ message.putStringProperty("color", "red");
+ clientProducer.send(message);
+ session.close();
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"red");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
}
- @Override
- protected void tearDown() throws Exception
+ public void testEndpointDeactivated() throws Exception
{
- if (server != null)
- {
- try
- {
- server.stop();
- server = null;
- }
- catch (Exception e)
- {
- // ignore
- }
- }
- super.tearDown();
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+ assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 15);
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 0);
+ assertTrue(endpoint.released);
}
- public void testSelectorChangedWithTopic() throws Exception
+ public void testMaxSessions() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setMaxSession(1);
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
+ assertEquals(((LocalQueueBinding) binding).getQueue().getConsumerCount(), 1);
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ public void testSimpleTopic() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("mdbTopic");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer =
session.createProducer("jms.topic.mdbTopic");
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("test");
+ clientProducer.send(message);
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"test");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ public void testDurableSubscription() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Topic");
+ spec.setDestination("mdbTopic");
spec.setSubscriptionDurability("Durable");
spec.setSubscriptionName("durable-mdb");
spec.setClientID("id-1");
- spec.setMessageSelector("foo='bar'");
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
- DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = createFactory(false).createSession();
ClientProducer clientProducer =
session.createProducer("jms.topic.mdbTopic");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("1");
- message.putStringProperty("foo", "bar");
clientProducer.send(message);
latch.await(5, TimeUnit.SECONDS);
@@ -111,18 +203,20 @@
message = session.createMessage(true);
message.getBodyBuffer().writeString("2");
- message.putStringProperty("foo", "bar");
clientProducer.send(message);
latch = new CountDownLatch(1);
endpoint = new DummyMessageEndpoint(latch);
- //change the selector forcing the queue to be recreated
- spec.setMessageSelector("foo='abar'");
- endpointFactory = new DummyMessageEndpointFactory(endpoint);
+ endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"2");
+ latch = new CountDownLatch(1);
+ endpoint.reset(latch);
message = session.createMessage(true);
message.getBodyBuffer().writeString("3");
- message.putStringProperty("foo", "abar");
clientProducer.send(message);
latch.await(5, TimeUnit.SECONDS);
@@ -131,8 +225,8 @@
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
}
- public void testSelectorNotChangedWithTopic() throws Exception
- {
+ public void testNonDurableSubscription() throws Exception
+ {
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
@@ -141,20 +235,15 @@
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("mdbTopic");
- spec.setSubscriptionDurability("Durable");
- spec.setSubscriptionName("durable-mdb");
- spec.setClientID("id-1");
- spec.setMessageSelector("foo='bar'");
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
- DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = createFactory(false).createSession();
ClientProducer clientProducer =
session.createProducer("jms.topic.mdbTopic");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("1");
- message.putStringProperty("foo", "bar");
clientProducer.send(message);
latch.await(5, TimeUnit.SECONDS);
@@ -166,123 +255,126 @@
message = session.createMessage(true);
message.getBodyBuffer().writeString("2");
- message.putStringProperty("foo", "bar");
clientProducer.send(message);
latch = new CountDownLatch(1);
endpoint = new DummyMessageEndpoint(latch);
- endpointFactory = new DummyMessageEndpointFactory(endpoint);
+ endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeString("3");
+ clientProducer.send(message);
latch.await(5, TimeUnit.SECONDS);
assertNotNull(endpoint.lastMessage);
- assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"2");
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"3");
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
-
}
- class DummyMessageEndpointFactory implements MessageEndpointFactory
+ public void testSelectorChangedWithTopic() throws Exception
{
- private DummyMessageEndpoint endpoint;
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Topic");
+ spec.setDestination("mdbTopic");
+ spec.setSubscriptionDurability("Durable");
+ spec.setSubscriptionName("durable-mdb");
+ spec.setClientID("id-1");
+ spec.setMessageSelector("foo='bar'");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer =
session.createProducer("jms.topic.mdbTopic");
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("1");
+ message.putStringProperty("foo", "bar");
+ clientProducer.send(message);
- public DummyMessageEndpointFactory(DummyMessageEndpoint endpoint)
- {
- this.endpoint = endpoint;
- }
+ latch.await(5, TimeUnit.SECONDS);
- public MessageEndpoint createEndpoint(XAResource xaResource) throws
UnavailableException
- {
- return endpoint;
- }
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"1");
- public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
- {
- return false;
- }
- }
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
- class DummyMessageEndpoint implements MessageEndpoint, MessageListener
- {
- public CountDownLatch latch;
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeString("2");
+ message.putStringProperty("foo", "bar");
+ clientProducer.send(message);
- private HornetQMessage lastMessage;
+ latch = new CountDownLatch(1);
+ endpoint = new DummyMessageEndpoint(latch);
+ //change the selector forcing the queue to be recreated
+ spec.setMessageSelector("foo='abar'");
+ endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeString("3");
+ message.putStringProperty("foo", "abar");
+ clientProducer.send(message);
+ latch.await(5, TimeUnit.SECONDS);
- public DummyMessageEndpoint(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void beforeDelivery(Method method) throws NoSuchMethodException,
ResourceException
- {
-
- }
-
- public void afterDelivery() throws ResourceException
- {
- if(latch != null)
- {
- latch.countDown();
- }
- }
-
- public void release()
- {
-
- }
-
- public void onMessage(Message message)
- {
- lastMessage = (HornetQMessage) message;
- }
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"3");
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
}
- class MyBootstrapContext implements BootstrapContext
+ public void testSelectorNotChangedWithTopic() throws Exception
{
- WorkManager workManager = new DummyWorkManager();
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Topic");
+ spec.setDestination("mdbTopic");
+ spec.setSubscriptionDurability("Durable");
+ spec.setSubscriptionName("durable-mdb");
+ spec.setClientID("id-1");
+ spec.setMessageSelector("foo='bar'");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer =
session.createProducer("jms.topic.mdbTopic");
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("1");
+ message.putStringProperty("foo", "bar");
+ clientProducer.send(message);
- public Timer createTimer() throws UnavailableException
- {
- return null;
- }
+ latch.await(5, TimeUnit.SECONDS);
- public WorkManager getWorkManager()
- {
- return workManager;
- }
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"1");
- public XATerminator getXATerminator()
- {
- return null;
- }
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
- class DummyWorkManager implements WorkManager
- {
- public void doWork(Work work) throws WorkException
- {
- }
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeString("2");
+ message.putStringProperty("foo", "bar");
+ clientProducer.send(message);
- public void doWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
- {
- }
+ latch = new CountDownLatch(1);
+ endpoint = new DummyMessageEndpoint(latch);
+ endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ latch.await(5, TimeUnit.SECONDS);
- public long startWork(Work work) throws WorkException
- {
- return 0;
- }
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"2");
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
- public long startWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
- {
- return 0;
- }
+ }
- public void scheduleWork(Work work) throws WorkException
- {
- work.run();
- }
- public void scheduleWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
- {
- }
- }
- }
}
Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java 2010-07-06
16:45:43 UTC (rev 9380)
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.utils.UUIDGenerator;
+
+import javax.resource.ResourceException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Jul 6, 2010
+ */
+public class HornetQMessageHandlerXATest extends HornetQRATestBase
+{
+ @Override
+ public boolean isSecure()
+ {
+ return false;
+ }
+
+ public void testXACommit() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ XADummyEndpoint endpoint = new XADummyEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, true);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("teststring");
+ clientProducer.send(message);
+ session.close();
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"teststring");
+ endpoint.prepare();
+ endpoint.commit();
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ public void testXARollback() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setMaxSession(1);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ XADummyEndpoint endpoint = new XADummyEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, true);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+ ClientSession session = createFactory(false).createSession();
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("teststring");
+ clientProducer.send(message);
+ session.close();
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"teststring");
+ latch = new CountDownLatch(1);
+ endpoint.reset(latch);
+ endpoint.rollback();
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(),
"teststring");
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ }
+
+ class XADummyEndpoint extends DummyMessageEndpoint
+ {
+ private Xid xid;
+
+ public XADummyEndpoint(CountDownLatch latch)
+ {
+ super(latch);
+ xid = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ @Override
+ public void beforeDelivery(Method method) throws NoSuchMethodException,
ResourceException
+ {
+ super.beforeDelivery(method);
+ try
+ {
+ xaResource.start(xid, XAResource.TMNOFLAGS);
+ }
+ catch (XAException e)
+ {
+ throw new ResourceException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void afterDelivery() throws ResourceException
+ {
+ try
+ {
+ xaResource.end(xid, XAResource.TMSUCCESS);
+ }
+ catch (XAException e)
+ {
+ throw new ResourceException(e.getMessage(), e);
+ }
+
+ super.afterDelivery();
+ }
+
+ public void rollback() throws XAException
+ {
+ xaResource.rollback(xid);
+ }
+
+ public void prepare() throws XAException
+ {
+ xaResource.prepare(xid);
+ }
+
+ public void commit() throws XAException
+ {
+ xaResource.commit(xid, false);
+ }
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java 2010-07-06
16:45:43 UTC (rev 9380)
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.*;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Jul 6, 2010
+ */
+public abstract class HornetQRATestBase extends ServiceTestBase
+{
+ protected Configuration configuration;
+
+ protected HornetQServer server;
+
+ protected static final String MDBQUEUE = "mdbQueue";
+ protected static final String MDBQUEUEPREFIXED = "jms.queue.mdbQueue";
+ protected static final SimpleString MDBQUEUEPREFIXEDSIMPLE = new
SimpleString("jms.queue.mdbQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ clearData();
+ configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(isSecure());
+ server = createServer(true, configuration);
+ server.start();
+ server.createQueue(MDBQUEUEPREFIXEDSIMPLE, MDBQUEUEPREFIXEDSIMPLE, null, true,
false);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (server != null)
+ {
+ try
+ {
+ server.stop();
+ server = null;
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ }
+ super.tearDown();
+ }
+ public abstract boolean isSecure();
+
+ class DummyMessageEndpointFactory implements MessageEndpointFactory
+ {
+ private DummyMessageEndpoint endpoint;
+
+ private final boolean isDeliveryTransacted;
+
+ public DummyMessageEndpointFactory(DummyMessageEndpoint endpoint, boolean
deliveryTransacted)
+ {
+ this.endpoint = endpoint;
+ isDeliveryTransacted = deliveryTransacted;
+ }
+
+ public MessageEndpoint createEndpoint(XAResource xaResource) throws
UnavailableException
+ {
+ if(xaResource != null)
+ {
+ endpoint.setXAResource(xaResource);
+ }
+ return endpoint;
+ }
+
+ public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
+ {
+ return isDeliveryTransacted;
+ }
+ }
+
+ class DummyMessageEndpoint implements MessageEndpoint, MessageListener
+ {
+ public CountDownLatch latch;
+
+ public HornetQMessage lastMessage;
+
+ public boolean released = false;
+
+ public XAResource xaResource;
+
+ public DummyMessageEndpoint(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void beforeDelivery(Method method) throws NoSuchMethodException,
ResourceException
+ {
+
+ }
+
+ public void afterDelivery() throws ResourceException
+ {
+ if(latch != null)
+ {
+ latch.countDown();
+ }
+ System.out.println("lastMessage = " + lastMessage);
+ }
+
+ public void release()
+ {
+ released = true;
+ }
+
+ public void onMessage(Message message)
+ {
+ lastMessage = (HornetQMessage) message;
+ }
+
+ public void reset(CountDownLatch latch)
+ {
+ this.latch = latch;
+ lastMessage = null;
+ }
+
+ public void setXAResource(XAResource xaResource)
+ {
+ this.xaResource = xaResource;
+ }
+ }
+
+ class MyBootstrapContext implements BootstrapContext
+ {
+ WorkManager workManager = new DummyWorkManager();
+
+ public Timer createTimer() throws UnavailableException
+ {
+ return null;
+ }
+
+ public WorkManager getWorkManager()
+ {
+ return workManager;
+ }
+
+ public XATerminator getXATerminator()
+ {
+ return null;
+ }
+
+ class DummyWorkManager implements WorkManager
+ {
+ public void doWork(Work work) throws WorkException
+ {
+ }
+
+ public void doWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
+ {
+ }
+
+ public long startWork(Work work) throws WorkException
+ {
+ return 0;
+ }
+
+ public long startWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
+ {
+ return 0;
+ }
+
+ public void scheduleWork(Work work) throws WorkException
+ {
+ work.run();
+ }
+
+ public void scheduleWork(Work work, long l, ExecutionContext executionContext,
WorkListener workListener) throws WorkException
+ {
+ }
+ }
+ }
+}