[jboss-cvs] JBoss Messaging SVN: r6067 - in trunk: tests/jms-tests/src/org/jboss/test/messaging/jms and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 11 16:01:08 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-11 16:01:08 -0400 (Wed, 11 Mar 2009)
New Revision: 6067
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
Log:
New tests and fixes
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-11 20:01:08 UTC (rev 6067)
@@ -49,6 +49,7 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -59,7 +60,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -125,7 +125,7 @@
private final boolean xa;
- private ClientXAState state = null;
+ private final ClientXAState state = null;
private final Executor executor;
@@ -173,9 +173,9 @@
private volatile boolean started;
private SendAcknowledgementHandler sendAckHandler;
-
+
private volatile boolean closedSent;
-
+
// Constructors ----------------------------------------------------------------------------
public ClientSessionImpl(final ConnectionManager connectionManager,
@@ -317,7 +317,7 @@
return response;
}
-
+
public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
{
return createConsumer(queueName, null, false);
@@ -528,12 +528,12 @@
{
consumer.clear();
}
-
- //Acks must be flushed here *after connection is stopped and all onmessages finished executing
+
+ // Acks must be flushed here *after connection is stopped and all onmessages finished executing
flushAcks();
channel.sendBlocking(new RollbackMessage(isLastMessageAsDelived));
-
+
if (wasStarted)
{
start();
@@ -564,16 +564,15 @@
return new ClientMessageImpl(durable, body);
}
-
+
/* (non-Javadoc)
* @see org.jboss.messaging.core.client.impl.ClientSessionInternal#createBuffer(int)
*/
- public MessagingBuffer createBuffer(int size)
+ public MessagingBuffer createBuffer(final int size)
{
- return ChannelBuffers.dynamicBuffer(size);
+ return ChannelBuffers.dynamicBuffer(size);
}
-
public ClientFileMessage createFileMessage(final boolean durable)
{
return new ClientFileMessageImpl(durable);
@@ -659,7 +658,7 @@
{
return;
}
-
+
checkClosed();
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
@@ -710,9 +709,9 @@
if (consumer != null)
{
ClientMessageInternal clMessage = message.getClientMessage();
-
+
clMessage.setFlowControlSize(clMessage.getEncodeSize());
-
+
consumer.handleMessage(message.getClientMessage());
}
}
@@ -737,27 +736,27 @@
consumer.handleLargeMessageContinuation(continuation);
}
}
-
+
public void close() throws MessagingException
{
if (closed)
{
return;
}
-
+
try
{
closeChildren();
-
+
closedSent = true;
-
- channel.sendBlocking(new SessionCloseMessage());
+
+ channel.sendBlocking(new SessionCloseMessage());
}
catch (Throwable ignore)
{
// Session close should always return without exception
}
-
+
doCleanup();
}
@@ -767,7 +766,7 @@
{
return;
}
-
+
cleanUpChildren();
doCleanup();
@@ -775,9 +774,9 @@
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
{
- this.channel.setCommandConfirmationHandler(this);
+ channel.setCommandConfirmationHandler(this);
- this.sendAckHandler = handler;
+ sendAckHandler = handler;
}
// Needs to be synchronized to prevent issues with occurring concurrently with close()
@@ -787,7 +786,7 @@
{
return true;
}
-
+
boolean ok = false;
// We lock the channel to prevent any packets to be added to the resend
@@ -815,9 +814,9 @@
ok = true;
}
else
- {
+ {
if (closedSent)
- {
+ {
// a session re-attach may fail, if the session close was sent before failover started, hit the server,
// processed, then before the response was received back, failover occurred, re-attach was attempted. in
// this case it's ok - we don't want to call any failure listeners and we don't want to halt the rest of
@@ -828,11 +827,11 @@
ok = true;
}
else
- {
+ {
log.warn(System.identityHashCode(this) + " Session not found on server when attempting to re-attach");
}
-
- channel.returnBlocking();
+
+ channel.returnBlocking();
}
}
catch (Throwable t)
@@ -848,7 +847,7 @@
return ok;
}
-
+
public void returnBlocking()
{
channel.returnBlocking();
@@ -987,7 +986,7 @@
ClientSessionImpl other = (ClientSessionImpl)xares;
- return remotingConnection == other.remotingConnection;
+ return connectionManager == other.connectionManager;
}
public int prepare(final Xid xid) throws XAException
@@ -1052,7 +1051,7 @@
public void rollback(final Xid xid) throws XAException
{
checkXA();
-
+
try
{
boolean wasStarted = started;
@@ -1067,12 +1066,12 @@
{
consumer.clear();
}
-
+
flushAcks();
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet);
+ SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
if (wasStarted)
{
@@ -1361,7 +1360,7 @@
int state;
- public ClientXAState(Xid xid)
+ public ClientXAState(final Xid xid)
{
this.xid = xid;
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java 2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java 2009-03-11 20:01:08 UTC (rev 6067)
@@ -2171,6 +2171,24 @@
}
+
+ public void testIsSamRM() throws Exception
+ {
+ XAConnection conn = null;
+
+ conn = xacf.createXAConnection();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ XAResource res1 = sess1.getXAResource();
+
+ //Create a session
+ XASession sess2 = conn.createXASession();
+ XAResource res2 = sess2.getXAResource();
+
+ assertTrue(res1.isSameRM(res2));
+ }
+
public void testOneSessionTwoTransactionsRollbackSend() throws Exception
{
XAConnection conn = null;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-11 20:01:08 UTC (rev 6067)
@@ -30,8 +30,6 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -74,7 +72,7 @@
clearData();
addressSettings.clear();
- configuration = createDefaultConfig();
+ configuration = createDefaultConfig(true);
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
configuration.setPagingDirectory(getPageDir());
@@ -121,16 +119,35 @@
super.tearDown();
}
- public void transactionManagerIntegration() throws Exception
+ public void testIsSameRM() throws Exception
{
- TransactionManagerImple tm = new TransactionManagerImple();
- tm.begin();
+ ClientSessionFactory nettyFactory = createNettyFactory();
+ validateRM(nettyFactory, nettyFactory);
+ validateRM(sessionFactory, sessionFactory);
+ validateRM(nettyFactory, sessionFactory);
+ }
+
+ private void validateRM(ClientSessionFactory factory1, ClientSessionFactory factory2) throws Exception
+ {
+ ClientSession session1 = factory1.createSession(true, false, false);
+ ClientSession session2 = factory2.createSession(true, false, false);
+ if (factory1 == factory2)
+ {
+ assertTrue(session1.isSameRM(session2));
+ }
+ else
+ {
+ assertFalse(session1.isSameRM(session2));
+ }
+
+ session1.close();
+ session2.close();
}
public void testSendPrepareDoesntRollbackOnClose() throws Exception
{
- Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ Xid xid = newXID();
ClientMessage m1 = createTextMessage(clientSession, "m1");
ClientMessage m2 = createTextMessage(clientSession, "m2");
@@ -168,7 +185,7 @@
public void testReceivePrepareDoesntRollbackOnClose() throws Exception
{
- Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ Xid xid = newXID();
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
@@ -249,30 +266,61 @@
public void testSendMultipleQueues() throws Exception
{
- multipleQueuesInternalTest(false, false);
+ multipleQueuesInternalTest(true, false, false, false);
}
+ public void testSendMultipleQueuesOnePhase() throws Exception
+ {
+ multipleQueuesInternalTest(true, false, false, true);
+ multipleQueuesInternalTest(false, false, true, true);
+ }
+
public void testSendMultipleQueuesRecreate() throws Exception
{
- multipleQueuesInternalTest(false, true);
+ multipleQueuesInternalTest(true, false, true, false);
}
public void testSendMultipleSuspend() throws Exception
{
- multipleQueuesInternalTest(true, false);
+ multipleQueuesInternalTest(true, true, false, false);
}
public void testSendMultipleSuspendRecreate() throws Exception
{
- multipleQueuesInternalTest(true, true);
+ multipleQueuesInternalTest(true, true, true, false);
}
+ public void testSendMultipleSuspendErrorCheck() throws Exception
+ {
+ ClientSession session = null;
+
+ session = sessionFactory.createSession(true, false, false);
+
+ Xid xid = newXID();
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ try
+ {
+ session.start(xid, XAResource.TMRESUME);
+ fail("XAException expected");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_PROTO, e.errorCode);
+ }
+
+ session.close();
+ }
+
/**
* @throws MessagingException
* @throws XAException
*/
- protected void multipleQueuesInternalTest(boolean suspend, boolean recreateSession) throws MessagingException,
- XAException
+ protected void multipleQueuesInternalTest(boolean createQueues,
+ boolean suspend,
+ boolean recreateSession,
+ boolean onePhase) throws MessagingException, XAException
{
int NUMBER_OF_MSGS = 100;
int NUMBER_OF_QUEUES = 10;
@@ -285,15 +333,18 @@
session = sessionFactory.createSession(true, false, false);
- for (int i = 0; i < NUMBER_OF_QUEUES; i++)
+ if (createQueues)
{
- session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+ for (int i = 0; i < NUMBER_OF_QUEUES; i++)
+ {
+ session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+ }
}
for (int tr = 0; tr < 2; tr++)
{
- Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ Xid xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
@@ -316,7 +367,10 @@
session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
+ if (!onePhase)
+ {
+ session.prepare(xid);
+ }
if (recreateSession)
{
@@ -330,7 +384,7 @@
}
else
{
- session.commit(xid, false);
+ session.commit(xid, onePhase);
}
}
@@ -338,7 +392,7 @@
for (int i = 0; i < 2; i++)
{
- Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ Xid xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
@@ -362,6 +416,7 @@
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
+ msg.acknowledge();
if (suspend)
{
@@ -406,6 +461,14 @@
}
}
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
class TxMessageHandler implements MessageHandler
{
boolean failedToAck = false;
More information about the jboss-cvs-commits
mailing list