[jboss-cvs] JBoss Messaging SVN: r4423 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 10 10:36:20 EDT 2008
Author: timfox
Date: 2008-06-10 10:36:20 -0400 (Tue, 10 Jun 2008)
New Revision: 4423
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
Log:
New test and tweak to queue concurrcny
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -65,5 +65,11 @@
boolean isAutoCommitAcks();
+ boolean isBlockOnAcknowledge();
+
+ boolean isCacheProducers();
+
int getLazyAckBatchSize();
+
+ boolean isXA();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -64,38 +64,40 @@
private final RemotingConnectionFactory remotingConnectionFactory;
private final Location location;
+
+ //These attributes are mutable and can be updated by different threads so must be volatile
- private ConnectionParams connectionParams;
+ private volatile ConnectionParams connectionParams;
- private int defaultConsumerWindowSize;
+ private volatile int defaultConsumerWindowSize;
- private int defaultConsumerMaxRate;
+ private volatile int defaultConsumerMaxRate;
- private int defaultProducerWindowSize;
+ private volatile int defaultProducerWindowSize;
- private int defaultProducerMaxRate;
+ private volatile int defaultProducerMaxRate;
- private boolean defaultBlockOnAcknowledge;
+ private volatile boolean defaultBlockOnAcknowledge;
- private boolean defaultBlockOnPersistentSend;
+ private volatile boolean defaultBlockOnPersistentSend;
- private boolean defaultBlockOnNonPersistentSend;
+ private volatile boolean defaultBlockOnNonPersistentSend;
// Static ---------------------------------------------------------------------------------------
- public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
+ public static final int DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
- public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
+ public static final int DEFAULT_DEFAULT_CONSUMER_MAX_RATE = -1;
- public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+ public static final int DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
- public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
+ public static final int DEFAULT_DEFAULT_PRODUCER_MAX_RATE = -1;
- public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
+ public static final boolean DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
- public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+ public static final boolean DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
- public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
+ public static final boolean DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
// Constructors ---------------------------------------------------------------------------------
@@ -145,13 +147,13 @@
private ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
final boolean dummy)
{
- defaultConsumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
- defaultConsumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
- defaultProducerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
- defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
- defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
- defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
- defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ defaultConsumerWindowSize = DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE;
+ defaultConsumerMaxRate = DEFAULT_DEFAULT_CONSUMER_MAX_RATE;
+ defaultProducerWindowSize = DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE;
+ defaultProducerMaxRate = DEFAULT_DEFAULT_PRODUCER_MAX_RATE;
+ defaultBlockOnAcknowledge = DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+ defaultBlockOnPersistentSend = DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+ defaultBlockOnNonPersistentSend = DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
this.location = location;
this.connectionParams = connectionParams;
this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();
@@ -183,11 +185,7 @@
CreateConnectionResponse response =
(CreateConnectionResponse)remotingConnection.sendBlocking(0, 0, request);
- return new ClientConnectionImpl(response.getConnectionTargetID(), remotingConnection,
- defaultConsumerWindowSize, defaultConsumerMaxRate,
- defaultProducerWindowSize, defaultProducerMaxRate,
- defaultBlockOnAcknowledge, defaultBlockOnNonPersistentSend,
- defaultBlockOnPersistentSend, response.getServerVersion());
+ return new ClientConnectionImpl(this, response.getConnectionTargetID(), remotingConnection, response.getServerVersion());
}
catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Set;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
import org.jboss.messaging.core.exception.MessagingException;
@@ -63,58 +64,27 @@
private final Set<ClientSession> sessions = new ConcurrentHashSet<ClientSession>();
- private volatile boolean closed;
-
- private final int defaultConsumerWindowSize;
-
- private final int defaultConsumerMaxRate;
-
- private final int defaultProducerWindowSize;
-
- private final int defaultProducerMaxRate;
-
- private final boolean defaultBlockOnAcknowledge;
-
- private final boolean defaultSendNonPersistentMessagesBlocking;
-
- private final boolean defaultSendPersistentMessagesBlocking;
-
private final Version serverVersion;
-
+ private final ClientConnectionFactory connectionFactory;
+
+ private volatile boolean closed;
+
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
- public ClientConnectionImpl(final long serverTargetID,
+ public ClientConnectionImpl(final ClientConnectionFactory connectionFactory,
+ final long serverTargetID,
final RemotingConnection connection,
- final int defaultConsumerWindowSize,
- final int defaultConsumerMaxRate,
- final int defaultProducerWindowSize,
- final int defaultProducerMaxRate,
- final boolean defaultBlockOnAcknowledge,
- final boolean defaultSendNonPersistentMessagesBlocking,
- final boolean defaultSendPersistentMessagesBlocking,
final Version serverVersion)
{
+ this.connectionFactory = connectionFactory;
+
this.serverTargetID = serverTargetID;
this.remotingConnection = connection;
- this.defaultConsumerWindowSize = defaultConsumerWindowSize;
-
- this.defaultConsumerMaxRate = defaultConsumerMaxRate;
-
- this.defaultProducerWindowSize = defaultProducerWindowSize;
-
- this.defaultProducerMaxRate = defaultProducerMaxRate;
-
- this.defaultBlockOnAcknowledge = defaultBlockOnAcknowledge;
-
- this.defaultSendNonPersistentMessagesBlocking = defaultSendNonPersistentMessagesBlocking;
-
- this.defaultSendPersistentMessagesBlocking = defaultSendPersistentMessagesBlocking;
-
this.serverVersion = serverVersion;
}
@@ -132,11 +102,12 @@
(ConnectionCreateSessionResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
ClientSession session =
- new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers,
- autoCommitSends, autoCommitAcks, blockOnAcknowledge, defaultSendNonPersistentMessagesBlocking,
- defaultSendPersistentMessagesBlocking,
- defaultConsumerWindowSize, defaultConsumerMaxRate, defaultProducerWindowSize,
- defaultProducerMaxRate);
+ new ClientSessionImpl(this, response.getSessionID(), xa, ackBatchSize, cacheProducers,
+ autoCommitSends, autoCommitAcks, blockOnAcknowledge, connectionFactory.isDefaultBlockOnNonPersistentSend(),
+ connectionFactory.isDefaultBlockOnPersistentSend(),
+ connectionFactory.getDefaultConsumerWindowSize(), connectionFactory.getDefaultConsumerMaxRate(),
+ connectionFactory.getDefaultProducerWindowSize(),
+ connectionFactory.getDefaultProducerMaxRate());
sessions.add(session);
@@ -144,10 +115,11 @@
}
public ClientSession createClientSession(final boolean xa, final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final int ackBatchSize) throws MessagingException
+ final boolean autoCommitAcks,
+ final int ackBatchSize) throws MessagingException
{
- return createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, defaultBlockOnAcknowledge, false);
+ return createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize,
+ connectionFactory.isDefaultBlockOnAcknowledge(), false);
}
public void start() throws MessagingException
@@ -209,6 +181,11 @@
{
sessions.remove(session);
}
+
+ public Set<ClientSession> getSessions()
+ {
+ return new HashSet<ClientSession>(this.sessions);
+ }
public Version getServerVersion()
{
@@ -236,7 +213,7 @@
//We copy the set of sessions to prevent ConcurrentModificationException which would occur
//when the child trues to remove itself from its parent
Set<ClientSession> childrenClone = new HashSet<ClientSession>(sessions);
-
+
for (ClientSession session: childrenClone)
{
session.close();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -6,6 +6,8 @@
*/
package org.jboss.messaging.core.client.impl;
+import java.util.Set;
+
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -22,4 +24,6 @@
RemotingConnection getRemotingConnection();
void removeSession(ClientSession session);
+
+ Set<ClientSession> getSessions();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -97,6 +97,8 @@
private final long serverTargetID;
+ private final boolean xa;
+
private final int lazyAckBatchSize;
private final boolean cacheProducers;
@@ -153,6 +155,7 @@
// Constructors ---------------------------------------------------------------------------------
public ClientSessionImpl(final ClientConnectionInternal connection, final long serverTargetID,
+ final boolean xa,
final int lazyAckBatchSize, final boolean cacheProducers,
final boolean autoCommitSends, final boolean autoCommitAcks,
final boolean blockOnAcknowledge,
@@ -186,6 +189,8 @@
executor = Executors.newSingleThreadExecutor();
+ this.xa = xa;
+
this.lazyAckBatchSize = lazyAckBatchSize;
if (cacheProducers)
@@ -523,11 +528,26 @@
return autoCommitAcks;
}
+ public boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public boolean isCacheProducers()
+ {
+ return cacheProducers;
+ }
+
public int getLazyAckBatchSize()
{
return lazyAckBatchSize;
}
+ public boolean isXA()
+ {
+ return xa;
+ }
+
// ClientSessionInternal implementation ------------------------------------------------------------
public long getServerTargetID()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -76,6 +76,22 @@
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
}
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof ConnectionCreateSessionMessage == false)
+ {
+ return false;
+ }
+
+ ConnectionCreateSessionMessage r = (ConnectionCreateSessionMessage)other;
+
+ boolean matches = this.xa == r.xa &&
+ this.autoCommitSends == r.autoCommitSends &&
+ this.autoCommitAcks == r.autoCommitAcks;
+
+ return matches;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -197,6 +197,18 @@
return getParentString() + "]";
}
+ public boolean equals(Object other)
+ {
+ if (other instanceof EmptyPacket == false)
+ {
+ return false;
+ }
+
+ EmptyPacket r = (EmptyPacket)other;
+
+ return r.type == this.type;
+ }
+
// Package protected ---------------------------------------------
protected String getParentString()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -162,7 +162,7 @@
return name;
}
- public synchronized HandleStatus addLast(final MessageReference ref)
+ public HandleStatus addLast(final MessageReference ref)
{
if (locked)
{
@@ -181,7 +181,7 @@
}
}
- public synchronized HandleStatus addFirst(final MessageReference ref)
+ public HandleStatus addFirst(final MessageReference ref)
{
if (locked)
{
@@ -535,7 +535,7 @@
// Private
// ------------------------------------------------------------------------------
- private HandleStatus add(final MessageReference ref, final boolean first)
+ private synchronized HandleStatus add(final MessageReference ref, final boolean first)
{
if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java 2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -304,13 +304,13 @@
private void checkDefaults(final ClientConnectionFactory cf) throws Exception
{
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, cf.getDefaultConsumerWindowSize());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, cf.getDefaultConsumerMaxRate());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, cf.getDefaultProducerWindowSize());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, cf.getDefaultProducerMaxRate());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isDefaultBlockOnAcknowledge());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isDefaultBlockOnNonPersistentSend());
- assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND, cf.isDefaultBlockOnPersistentSend());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE, cf.getDefaultConsumerWindowSize());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_CONSUMER_MAX_RATE, cf.getDefaultConsumerMaxRate());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE, cf.getDefaultProducerWindowSize());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_PRODUCER_MAX_RATE, cf.getDefaultProducerMaxRate());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isDefaultBlockOnAcknowledge());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isDefaultBlockOnNonPersistentSend());
+ assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND, cf.isDefaultBlockOnPersistentSend());
}
private void checkGetSetAttributes(ClientConnectionFactory cf,
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java 2008-06-10 14:36:20 UTC (rev 4423)
@@ -0,0 +1,411 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.client.impl;
+
+import java.util.Set;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ClientConnection;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientConnectionImpl;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.core.version.impl.VersionImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A ClientConnectionImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClientConnectionImplTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(ClientConnectionImplTest.class);
+
+ public void testGetAttributes() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Version version = new VersionImpl("blah132", 1, 1, 1, 12, "blah1652");
+
+ Location location = new LocationImpl(TransportType.TCP, "sausages");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, 23, rc, version);
+
+ assertTrue(conn.getServerVersion() == version);
+ assertTrue(conn.getRemotingConnection() == rc);
+ }
+
+ public void testCreateSession() throws Exception
+ {
+ testCreateSession(false, false, false, 14526512, false, false, true);
+ testCreateSession(true, true, true, 14526512, true, true, true);
+
+ testCreateSession(false, false, false, 14526512, false, false, false);
+ testCreateSession(true, true, true, 14526512, true, true, false);
+ }
+
+ public void testStartStop() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+ Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ final int serverTargetID = 23;
+
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+
+ rc.sendOneWay(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CONN_START));
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CONN_STOP))).andReturn(null);
+
+ EasyMock.replay(rc);
+
+ conn.start();
+
+ conn.stop();
+
+ EasyMock.verify(rc);
+ }
+
+ public void testSetRemotingSessionListener() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+ Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ final int serverTargetID = 23;
+
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+
+ RemotingSessionListener listener = new RemotingSessionListener()
+ {
+ public void sessionDestroyed(long sessionID, MessagingException me)
+ {
+ }
+ };
+
+ rc.setRemotingSessionListener(listener);
+
+ EasyMock.replay(rc);
+
+ conn.setRemotingSessionListener(listener);
+
+ EasyMock.verify(rc);
+ }
+
+ public void testClose() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+ Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ final int serverTargetID = 23;
+
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+
+ assertFalse(conn.isClosed());
+
+ //Create some sessions
+
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
+
+ ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ request = new ConnectionCreateSessionMessage(false, false, false);
+
+ response = new ConnectionCreateSessionResponseMessage(2);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ request = new ConnectionCreateSessionMessage(false, false, false);
+
+ response = new ConnectionCreateSessionResponseMessage(3);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ EasyMock.replay(rc);
+
+ ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
+
+ ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
+
+ ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
+
+ assertFalse(sess1.isClosed());
+ assertFalse(sess2.isClosed());
+ assertFalse(sess3.isClosed());
+
+ EasyMock.verify(rc);
+
+ EasyMock.reset(rc);
+
+ //And the closes of the sessions - this can be in a different order
+ EasyMock.checkOrder(rc, false);
+ EasyMock.expect(rc.sendBlocking(1, 1, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+ EasyMock.expect(rc.sendBlocking(2, 2, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+ EasyMock.expect(rc.sendBlocking(3, 3, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+ EasyMock.checkOrder(rc, true);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+
+ rc.stop();
+
+ EasyMock.replay(rc);
+
+ conn.close();
+
+ EasyMock.verify(rc);
+
+ assertTrue(sess1.isClosed());
+ assertTrue(sess2.isClosed());
+ assertTrue(sess3.isClosed());
+
+ assertTrue(conn.isClosed());
+
+ //Close again should do nothing
+ EasyMock.reset(rc);
+
+ EasyMock.replay(rc);
+
+ conn.close();
+
+ EasyMock.verify(rc);
+
+ try
+ {
+ conn.createClientSession(false, false, false, 65655);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ conn.createClientSession(false, false, false, 545, false, false);
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ conn.start();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ conn.stop();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ conn.setRemotingSessionListener(new RemotingSessionListener()
+ {
+ public void sessionDestroyed(long sessionID, MessagingException me)
+ {
+ }
+ });
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+ }
+ }
+
+ public void testRemoveSession() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+ Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ final int serverTargetID = 23;
+
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+
+ assertFalse(conn.isClosed());
+
+ //Create some sessions
+
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
+
+ ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ request = new ConnectionCreateSessionMessage(false, false, false);
+
+ response = new ConnectionCreateSessionResponseMessage(2);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ request = new ConnectionCreateSessionMessage(false, false, false);
+
+ response = new ConnectionCreateSessionResponseMessage(3);
+
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ EasyMock.replay(rc);
+
+ ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
+
+ ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
+
+ ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
+
+ assertFalse(sess1.isClosed());
+ assertFalse(sess2.isClosed());
+ assertFalse(sess3.isClosed());
+
+ Set<ClientSession> sessions = conn.getSessions();
+ assertEquals(3, sessions.size());
+ assertTrue(sessions.contains(sess1));
+ assertTrue(sessions.contains(sess2));
+ assertTrue(sessions.contains(sess3));
+
+ EasyMock.verify(rc);
+
+ EasyMock.reset(rc);
+
+ conn.removeSession(sess2);
+
+ sessions = conn.getSessions();
+ assertEquals(2, sessions.size());
+ assertTrue(sessions.contains(sess1));
+ assertTrue(sessions.contains(sess3));
+
+ conn.removeSession(sess1);
+
+ sessions = conn.getSessions();
+ assertEquals(1, sessions.size());
+ assertTrue(sessions.contains(sess3));
+ }
+
+ // Private -----------------------------------------------------------------------------------------------------------
+
+
+ private void testCreateSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
+ final int ackBatchSize, final boolean blockOnAcknowledge,
+ final boolean cacheProducers, final boolean useDefaults) throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
+ Location location = new LocationImpl(TransportType.TCP, "oranges");
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+ if (useDefaults)
+ {
+ cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge);
+ }
+ else
+ {
+ cf.setDefaultBlockOnAcknowledge(!blockOnAcknowledge); // Should be ignored
+ }
+
+ final int connTargetID = 17267162;
+
+ Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
+
+ ClientConnection conn = new ClientConnectionImpl(cf, connTargetID, rc, version);
+
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(xa, autoCommitSends, autoCommitAcks);
+
+ final int sessionTargetID = 12127162;
+
+ ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(sessionTargetID);
+
+ EasyMock.expect(rc.sendBlocking(connTargetID, connTargetID, request)).andReturn(response);
+
+ EasyMock.replay(rc);
+
+ ClientSession session;
+
+ if (useDefaults)
+ {
+ session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+ }
+ else
+ {
+ session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge,
+ cacheProducers);
+ }
+
+
+ assertEquals(ackBatchSize, session.getLazyAckBatchSize());
+ assertEquals(xa, session.isXA());
+ assertEquals(autoCommitSends, session.isAutoCommitSends());
+ assertEquals(autoCommitAcks, session.isAutoCommitAcks());
+ assertEquals(blockOnAcknowledge, session.isBlockOnAcknowledge());
+
+ EasyMock.verify(rc);
+ }
+}
More information about the jboss-cvs-commits
mailing list