[jboss-cvs] JBoss Messaging SVN: r4452 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 12 14:21:55 EDT 2008
Author: timfox
Date: 2008-06-12 14:21:54 -0400 (Thu, 12 Jun 2008)
New Revision: 4452
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More tests
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -78,7 +78,5 @@
int getLazyAckBatchSize();
- boolean isXA();
-
- ClientConnection getConnection();
+ boolean isXA();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -139,8 +139,7 @@
//For testing only
private boolean forceNotSameRM;
-
-
+
// Constructors ---------------------------------------------------------------------------------
public ClientSessionImpl(final ClientConnectionInternal connection, final long serverTargetID,
@@ -247,12 +246,16 @@
public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
{
+ checkClosed();
+
return createConsumer(queueName, null, false, false, false);
}
public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean noLocal,
final boolean autoDeleteQueue, final boolean direct) throws MessagingException
{
+ checkClosed();
+
return createConsumer(queueName, filterString, noLocal, autoDeleteQueue, direct,
connection.getConnectionFactory().getDefaultConsumerWindowSize(),
connection.getConnectionFactory().getDefaultConsumerMaxRate());
@@ -335,17 +338,23 @@
public ClientProducer createProducer(final SimpleString address) throws MessagingException
{
+ checkClosed();
+
return createProducer(address, connection.getConnectionFactory().getDefaultProducerWindowSize(),
connection.getConnectionFactory().getDefaultProducerMaxRate());
}
public ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException
{
+ checkClosed();
+
return createProducer(address, -1, rate);
}
public ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException
{
+ checkClosed();
+
return createProducer(address, windowSize, -1);
}
@@ -407,6 +416,7 @@
{
checkClosed();
+ //Flush any acks to the server
acknowledgeInternal(false);
remotingConnection.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.SESS_COMMIT));
@@ -431,8 +441,7 @@
consumer.recover(lastCommittedID + 1);
}
- //We flush any remaining acks
-
+ //Flush any acks to the server
acknowledgeInternal(false);
toAckCount = 0;
@@ -492,10 +501,9 @@
producerCache.clear();
}
- //Make sure any remaining acks make it to the server
+ //Flush any acks to the server
+ acknowledgeInternal(false);
- acknowledgeInternal(false);
-
remotingConnection.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CLOSE));
}
finally
@@ -632,6 +640,8 @@
checkXA();
try
{
+ //Note - don't need to flush acks since the previous end would have done this
+
SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
@@ -754,6 +764,8 @@
checkXA();
try
{
+ //Note - don't need to flush acks since the previous end would have done this
+
SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
@@ -775,18 +787,25 @@
}
}
- public Xid[] recover(final int flag) throws XAException
+ public Xid[] recover(final int flags) throws XAException
{
checkXA();
try
{
- SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.SESS_XA_INDOUBT_XIDS));
-
- List<Xid> xids = response.getXids();
-
- Xid[] xidArray = xids.toArray(new Xid[xids.size()]);
-
- return xidArray;
+ if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN)
+ {
+ SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.SESS_XA_INDOUBT_XIDS));
+
+ List<Xid> xids = response.getXids();
+
+ Xid[] xidArray = xids.toArray(new Xid[xids.size()]);
+
+ return xidArray;
+ }
+ else
+ {
+ return new Xid[0];
+ }
}
catch (MessagingException e)
{
@@ -800,6 +819,8 @@
checkXA();
try
{
+ //Note - don't need to flush acks since the previous end would have done this
+
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
@@ -857,6 +878,7 @@
}
else if (flags == XAResource.TMNOFLAGS)
{
+ //Don't need to flush since the previous end will have done this
packet = new SessionXAStartMessage(xid);
}
else
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -6,14 +6,10 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.apache.mina.common.IdleStatus.BOTH_IDLE;
-import static org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler.EXCEPTION;
-
import javax.net.ssl.SSLContext;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -73,6 +73,19 @@
return getParentString() + ", queueName=" + queueName + ", filterString="
+ filterString + "]";
}
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionCreateBrowserMessage == false)
+ {
+ return false;
+ }
+
+ SessionCreateBrowserMessage r = (SessionCreateBrowserMessage)other;
+
+ return this.queueName.equals(r.queueName) &&
+ this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString);
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -75,6 +75,19 @@
return getParentString() + ", address=" + address + ", temp=" + temporary + "]";
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionRemoveDestinationMessage == false)
+ {
+ return false;
+ }
+
+ SessionRemoveDestinationMessage r = (SessionRemoveDestinationMessage)other;
+
+ return this.address.equals(r.address) &&
+ this.temporary == r.temporary;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -57,6 +57,18 @@
xid = XidCodecSupport.decodeXid(buffer);
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXAJoinMessage == false)
+ {
+ return false;
+ }
+
+ SessionXAJoinMessage r = (SessionXAJoinMessage)other;
+
+ return this.xid.equals(r.xid);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -56,6 +56,18 @@
{
xid = XidCodecSupport.decodeXid(buffer);
}
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXAPrepareMessage == false)
+ {
+ return false;
+ }
+
+ SessionXAPrepareMessage r = (SessionXAPrepareMessage)other;
+
+ return this.xid.equals(r.xid);
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -57,6 +57,18 @@
xid = XidCodecSupport.decodeXid(buffer);
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXAResumeMessage == false)
+ {
+ return false;
+ }
+
+ SessionXAResumeMessage r = (SessionXAResumeMessage)other;
+
+ return this.xid.equals(r.xid);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -57,6 +57,18 @@
xid = XidCodecSupport.decodeXid(buffer);
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXARollbackMessage == false)
+ {
+ return false;
+ }
+
+ SessionXARollbackMessage r = (SessionXARollbackMessage)other;
+
+ return this.xid.equals(r.xid);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -56,6 +56,18 @@
{
timeoutSeconds = buffer.getInt();
}
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXASetTimeoutMessage == false)
+ {
+ return false;
+ }
+
+ SessionXASetTimeoutMessage r = (SessionXASetTimeoutMessage)other;
+
+ return this.timeoutSeconds == r.timeoutSeconds;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -57,6 +57,17 @@
xid = XidCodecSupport.decodeXid(buffer);
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionXAStartMessage == false)
+ {
+ return false;
+ }
+
+ SessionXAStartMessage r = (SessionXAStartMessage)other;
+
+ return this.xid.equals(r.xid);
+ }
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-06-12 17:10:40 UTC (rev 4451)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-06-12 18:21:54 UTC (rev 4452)
@@ -23,6 +23,8 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
+import java.util.Arrays;
+
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -31,6 +33,7 @@
import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientConnectionFactory;
import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
import org.jboss.messaging.core.client.impl.ClientConsumerPacketHandler;
@@ -38,11 +41,13 @@
import org.jboss.messaging.core.client.impl.ClientProducerPacketHandler;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.client.impl.ClientSessionInternal;
+import org.jboss.messaging.core.client.impl.LocationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -50,6 +55,8 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
@@ -62,7 +69,16 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -572,6 +588,14 @@
assertFalse(producer1 == producer3);
}
+ public void testCreateBrowser() throws Exception
+ {
+ testCreateBrowser(true);
+ testCreateBrowser(false);
+ }
+
+
+
public void testGetXAResource() throws Exception
{
ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
@@ -836,9 +860,190 @@
EasyMock.verify(conn, rc, prod1, prod2, cons1, cons2, browser1, browser2);
- assertTrue(session.isClosed());
+ assertTrue(session.isClosed());
+
+ try
+ {
+ session.createQueue(new SimpleString("trtr"), new SimpleString("iuasij"), null, false, false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.deleteQueue(new SimpleString("trtr"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.addDestination(new SimpleString("trtr"), false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.removeDestination(new SimpleString("trtr"), false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.queueQuery(new SimpleString("trtr"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.bindingQuery(new SimpleString("trtr"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createConsumer(new SimpleString("trtr"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createConsumer(new SimpleString("iasjq"), null, false, false, false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createConsumer(new SimpleString("husuhsuh"), null, false, false, false, 8787, 7162761);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createBrowser(new SimpleString("husuhsuh"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createBrowser(new SimpleString("husuhsuh"), null);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createProducer(new SimpleString("husuhsuh"));
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createProducer(new SimpleString("iashi"), 878778, 8778, false, false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createRateLimitedProducer(new SimpleString("uhsuhs"), 78676);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.createProducerWithWindowSize(new SimpleString("uhsuhs"), 78676);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.commit();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.rollback();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ session.acknowledge();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
}
-
+
public void testAddRemoveConsumer() throws Exception
{
testAddRemoveConsumer(true);
@@ -947,13 +1152,504 @@
testXAForget(true);
}
+ public void testGetTransactionTimeout() throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ Packet packet = new EmptyPacket(EmptyPacket.SESS_XA_GET_TIMEOUT);
+
+ final int timeout = 1098289;
+
+ SessionXAGetTimeoutResponseMessage resp = new SessionXAGetTimeoutResponseMessage(timeout);
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ int timeout2 = session.getTransactionTimeout();
+
+ EasyMock.verify(conn, rc);
+
+ assertEquals(timeout, timeout2);
+ }
+ public void testIsSameRM() throws Exception
+ {
+ Location location1 = new LocationImpl(TransportType.TCP, "blah1");
+ Location location2 = new LocationImpl(TransportType.TCP, "blah2");
+
+ ClientConnectionInternal conn1 = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc1 = EasyMock.createStrictMock(RemotingConnection.class);
+ EasyMock.expect(conn1.getRemotingConnection()).andReturn(rc1);
+
+ ClientConnectionInternal conn2 = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc2 = EasyMock.createStrictMock(RemotingConnection.class);
+ EasyMock.expect(conn2.getRemotingConnection()).andReturn(rc2);
+
+ ClientConnectionInternal conn3 = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc3 = EasyMock.createStrictMock(RemotingConnection.class);
+ EasyMock.expect(conn3.getRemotingConnection()).andReturn(rc3);
+
+ EasyMock.expect(conn1.getRemotingConnection()).andReturn(rc1);
+ EasyMock.expect(rc1.getLocation()).andReturn(location1);
+ EasyMock.expect(conn2.getRemotingConnection()).andReturn(rc2);
+ EasyMock.expect(rc2.getLocation()).andReturn(location1);
+
+ EasyMock.expect(conn2.getRemotingConnection()).andReturn(rc2);
+ EasyMock.expect(rc2.getLocation()).andReturn(location1);
+ EasyMock.expect(conn1.getRemotingConnection()).andReturn(rc1);
+ EasyMock.expect(rc1.getLocation()).andReturn(location1);
+
+ EasyMock.expect(conn1.getRemotingConnection()).andReturn(rc1);
+ EasyMock.expect(rc1.getLocation()).andReturn(location1);
+ EasyMock.expect(conn3.getRemotingConnection()).andReturn(rc3);
+ EasyMock.expect(rc3.getLocation()).andReturn(location2);
+
+ EasyMock.expect(conn3.getRemotingConnection()).andReturn(rc3);
+ EasyMock.expect(rc3.getLocation()).andReturn(location2);
+ EasyMock.expect(conn1.getRemotingConnection()).andReturn(rc1);
+ EasyMock.expect(rc1.getLocation()).andReturn(location1);
+
+ EasyMock.expect(conn2.getRemotingConnection()).andReturn(rc2);
+ EasyMock.expect(rc2.getLocation()).andReturn(location1);
+ EasyMock.expect(conn3.getRemotingConnection()).andReturn(rc3);
+ EasyMock.expect(rc3.getLocation()).andReturn(location2);
+
+ EasyMock.expect(conn3.getRemotingConnection()).andReturn(rc3);
+ EasyMock.expect(rc3.getLocation()).andReturn(location2);
+ EasyMock.expect(conn2.getRemotingConnection()).andReturn(rc2);
+ EasyMock.expect(rc2.getLocation()).andReturn(location1);
+
+ EasyMock.replay(conn1, conn2, conn3, rc1, rc2, rc3);
+
+ ClientSessionInternal session1 = new ClientSessionImpl(conn1, 4343, true, -1, false, false, false, false);
+
+ ClientSessionInternal session2 = new ClientSessionImpl(conn2, 4343, true, -1, false, false, false, false);
+
+ ClientSessionInternal session3 = new ClientSessionImpl(conn3, 4343, true, -1, false, false, false, false);
+
+ assertTrue(session1.isSameRM(session2));
+ assertTrue(session2.isSameRM(session1));
+
+ assertFalse(session1.isSameRM(session3));
+ assertFalse(session3.isSameRM(session1));
+
+ assertFalse(session2.isSameRM(session3));
+ assertFalse(session3.isSameRM(session2));
+
+ EasyMock.verify(conn1, conn1, conn3, rc1, rc2, rc3);
+ }
+ public void testXAPrepare() throws Exception
+ {
+ testXAPrepare(false, false);
+ testXAPrepare(false, true);
+ testXAPrepare(true, false);
+ testXAPrepare(true, true);
+ }
+ public void testXARecover() throws Exception
+ {
+ testXARecover(XAResource.TMNOFLAGS);
+ testXARecover(XAResource.TMSTARTRSCAN);
+ testXARecover(XAResource.TMENDRSCAN);
+ testXARecover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN);
+ }
+ public void testXARollback() throws Exception
+ {
+ testXARollback(true);
+ testXARollback(false);
+ }
+ public void testXASetTransactionTimeout() throws Exception
+ {
+ testXASetTransactionTimeout(false);
+ testXASetTransactionTimeout(true);
+ }
+
+ public void testXAStart() throws Exception
+ {
+ testXAStart(XAResource.TMJOIN, false);
+ testXAStart(XAResource.TMRESUME, false);
+ testXAStart(XAResource.TMNOFLAGS, false);
+ testXAStart(XAResource.TMJOIN, true);
+ testXAStart(XAResource.TMRESUME, true);
+ testXAStart(XAResource.TMNOFLAGS, true);
+ }
+
+ public void notXA() throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+
+ EasyMock.verify(conn, rc);
+
+ EasyMock.reset(conn, rc);
+ try
+ {
+ session.commit(randomXid(), false);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.end(randomXid(), 8778);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.forget(randomXid());
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.getTransactionTimeout();
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.isSameRM(session);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.prepare(randomXid());
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.recover(89787);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.rollback(randomXid());
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.setTransactionTimeout(767);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+
+ try
+ {
+ session.start(randomXid(), 8768);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+ }
+
// Private -------------------------------------------------------------------------------------------
+ private void testXAStart(int flags, boolean error) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ Xid xid = randomXid();
+
+ Packet packet = null;
+ if (flags == XAResource.TMJOIN)
+ {
+ packet = new SessionXAJoinMessage(xid);
+ }
+ else if (flags == XAResource.TMRESUME)
+ {
+ packet = new SessionXAResumeMessage(xid);
+ }
+ else if (flags == XAResource.TMNOFLAGS)
+ {
+ packet = new SessionXAStartMessage(xid);
+ }
+
+ final int numMessages = 10;
+
+ if (flags != XAResource.TMNOFLAGS)
+ {
+ SessionAcknowledgeMessage msg = new SessionAcknowledgeMessage(numMessages - 1, true);
+
+ rc.sendOneWay(sessionTargetID, sessionTargetID, msg);
+ }
+
+ SessionXAResponseMessage resp = new SessionXAResponseMessage(error, XAException.XAER_RMERR, "blah");
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ //Simulate some unflushed messages
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ session.delivered(i, false);
+ session.acknowledge();
+ }
+
+ if (error)
+ {
+ try
+ {
+ session.start(xid, flags);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+ }
+ else
+ {
+ session.start(xid, flags);
+ }
+
+ EasyMock.verify(conn, rc);
+ }
+
+ private void testXASetTransactionTimeout(boolean error) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ final int timeout = 1897217;
+
+ SessionXASetTimeoutMessage packet = new SessionXASetTimeoutMessage(timeout);
+
+ SessionXASetTimeoutResponseMessage resp = new SessionXASetTimeoutResponseMessage(!error);
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ boolean ok = session.setTransactionTimeout(timeout);
+
+ assertTrue(ok == !error);
+
+ EasyMock.verify(conn, rc);
+ }
+
+ private void testXARollback(boolean error) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ Xid xid = randomXid();
+
+ SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
+
+ SessionXAResponseMessage resp = new SessionXAResponseMessage(error, XAException.XAER_RMERR, "blah");
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ if (error)
+ {
+ try
+ {
+ session.rollback(xid);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+ }
+ else
+ {
+ session.rollback(xid);
+ }
+
+ EasyMock.verify(conn, rc);
+ }
+
+ private void testXARecover(final int flags) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ final Xid[] xids = new Xid[] { randomXid(), randomXid(), randomXid() } ;
+
+ if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN)
+ {
+ EmptyPacket packet = new EmptyPacket(EmptyPacket.SESS_XA_INDOUBT_XIDS);
+
+ SessionXAGetInDoubtXidsResponseMessage resp = new SessionXAGetInDoubtXidsResponseMessage(Arrays.asList(xids));
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+ }
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ Xid[] xids2 = session.recover(flags);
+
+ if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN)
+ {
+ assertEquals(xids.length, xids2.length);
+
+ for (int i = 0; i < xids.length; i++)
+ {
+ assertEquals(xids[i], xids2[i]);
+ }
+ }
+ else
+ {
+ assertTrue(xids2.length == 0);
+ }
+
+ EasyMock.verify(conn, rc);
+ }
+
+ private void testXAPrepare(boolean error, boolean readOnly) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 9121892;
+
+ Xid xid = randomXid();
+
+ SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
+
+ SessionXAResponseMessage resp = new SessionXAResponseMessage(error, error ? XAException.XAER_RMERR : readOnly ? XAResource.XA_RDONLY : XAResource.XA_OK, "blah");
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, packet)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, true, -1, false, false, false, false);
+
+ if (error)
+ {
+ try
+ {
+ session.prepare(xid);
+ fail("Should throw exception");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_RMERR, e.errorCode);
+ }
+ }
+ else
+ {
+ int res = session.prepare(xid);
+
+ if (readOnly)
+ {
+ assertEquals(XAResource.XA_RDONLY, res);
+ }
+ else
+ {
+ assertEquals(XAResource.XA_OK, res);
+ }
+ }
+
+ EasyMock.verify(conn, rc);
+ }
+
private void testXAForget(final boolean error) throws Exception
{
ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
@@ -1362,6 +2058,43 @@
EasyMock.verify(conn, rc, pd, cons1, cons2);
}
+ private void testCreateBrowser(boolean filter) throws Exception
+ {
+ ClientConnectionInternal conn = EasyMock.createStrictMock(ClientConnectionInternal.class);
+
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ //In ClientSessionImpl constructor
+ EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+
+ final long sessionTargetID = 7617622;
+ final SimpleString queueName = new SimpleString("gyugg");
+ final SimpleString sfilter = filter ? new SimpleString("ygyg") : null;
+
+ SessionCreateBrowserMessage request =
+ new SessionCreateBrowserMessage(queueName, sfilter);
+
+ SessionCreateBrowserResponseMessage resp =
+ new SessionCreateBrowserResponseMessage(76675765);
+
+ EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+
+ EasyMock.replay(conn, rc);
+
+ ClientSessionInternal session = new ClientSessionImpl(conn, sessionTargetID, false, -1, false, false, false, false);
+
+ if (filter)
+ {
+ ClientBrowser browser = session.createBrowser(queueName, sfilter);
+ }
+ else
+ {
+ ClientBrowser browser = session.createBrowser(queueName);
+ }
+
+ EasyMock.verify(conn, rc);
+ }
+
private void testCreateProducerWithWindowSizeMethod(final SimpleString address,
final int windowSize, final int initialCredits,
final int serverMaxRate,
@@ -1869,7 +2602,7 @@
EasyMock.replay(conn);
EasyMock.replay(rc);
- ClientSession session = new ClientSessionImpl(conn, serverTargetID, xa,
+ ClientSessionInternal session = new ClientSessionImpl(conn, serverTargetID, xa,
lazyAckBatchSize, cacheProducers, autoCommitSends, autoCommitAcks, blockOnAcknowledge);
EasyMock.verify(conn);
More information about the jboss-cvs-commits
mailing list