[jboss-cvs] JBoss Messaging SVN: r4423 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 10 10:36:20 EDT 2008


Author: timfox
Date: 2008-06-10 10:36:20 -0400 (Tue, 10 Jun 2008)
New Revision: 4423

Added:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
Log:
New test and tweak to queue concurrcny


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -65,5 +65,11 @@
    
    boolean isAutoCommitAcks();
    
+   boolean isBlockOnAcknowledge();
+   
+   boolean isCacheProducers();
+   
    int getLazyAckBatchSize();
+   
+   boolean isXA();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -64,38 +64,40 @@
    private final RemotingConnectionFactory remotingConnectionFactory;
       
    private final Location location;
+   
+   //These attributes are mutable and can be updated by different threads so must be volatile
 
-   private ConnectionParams connectionParams;
+   private volatile ConnectionParams connectionParams;
  
-   private int defaultConsumerWindowSize;
+   private volatile int defaultConsumerWindowSize;
    
-   private int defaultConsumerMaxRate;
+   private volatile int defaultConsumerMaxRate;
 
-   private int defaultProducerWindowSize;
+   private volatile int defaultProducerWindowSize;
    
-   private int defaultProducerMaxRate;
+   private volatile int defaultProducerMaxRate;
    
-   private boolean defaultBlockOnAcknowledge;
+   private volatile boolean defaultBlockOnAcknowledge;
    
-   private boolean defaultBlockOnPersistentSend;
+   private volatile boolean defaultBlockOnPersistentSend;
    
-   private boolean defaultBlockOnNonPersistentSend;
+   private volatile boolean defaultBlockOnNonPersistentSend;
         
    // Static ---------------------------------------------------------------------------------------
    
-   public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
+   public static final int DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
    
-   public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
+   public static final int DEFAULT_DEFAULT_CONSUMER_MAX_RATE = -1;
    
-   public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+   public static final int DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
    
-   public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
+   public static final int DEFAULT_DEFAULT_PRODUCER_MAX_RATE = -1;
    
-   public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
+   public static final boolean DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
    
-   public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+   public static final boolean DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
    
-   public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
+   public static final boolean DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
    
    // Constructors ---------------------------------------------------------------------------------
 
@@ -145,13 +147,13 @@
    private ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams,
                                        final boolean dummy)
    {
-      defaultConsumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
-      defaultConsumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
-      defaultProducerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
-      defaultProducerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
-      defaultBlockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-      defaultBlockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-      defaultBlockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;      
+      defaultConsumerWindowSize = DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE;
+      defaultConsumerMaxRate = DEFAULT_DEFAULT_CONSUMER_MAX_RATE;
+      defaultProducerWindowSize = DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE;
+      defaultProducerMaxRate = DEFAULT_DEFAULT_PRODUCER_MAX_RATE;
+      defaultBlockOnAcknowledge = DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+      defaultBlockOnPersistentSend = DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+      defaultBlockOnNonPersistentSend = DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;      
       this.location = location;
       this.connectionParams = connectionParams;
       this.remotingConnectionFactory = new RemotingConnectionFactoryImpl();
@@ -183,11 +185,7 @@
          CreateConnectionResponse response =
             (CreateConnectionResponse)remotingConnection.sendBlocking(0, 0, request);
 
-         return new ClientConnectionImpl(response.getConnectionTargetID(), remotingConnection,
-               defaultConsumerWindowSize, defaultConsumerMaxRate,
-               defaultProducerWindowSize, defaultProducerMaxRate,
-               defaultBlockOnAcknowledge, defaultBlockOnNonPersistentSend,
-               defaultBlockOnPersistentSend, response.getServerVersion());
+         return new ClientConnectionImpl(this, response.getConnectionTargetID(), remotingConnection, response.getServerVersion());
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -24,6 +24,7 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import org.jboss.messaging.core.client.ClientConnectionFactory;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -63,58 +64,27 @@
 
    private final Set<ClientSession> sessions = new ConcurrentHashSet<ClientSession>();
 
-   private volatile boolean closed;
-   
-   private final int defaultConsumerWindowSize;
-   
-   private final int defaultConsumerMaxRate;
-   
-   private final int defaultProducerWindowSize;
-   
-   private final int defaultProducerMaxRate;
-   
-   private final boolean defaultBlockOnAcknowledge;
-   
-   private final boolean defaultSendNonPersistentMessagesBlocking;
-   
-   private final boolean defaultSendPersistentMessagesBlocking;
-
    private final Version serverVersion;
    
-
+   private final ClientConnectionFactory connectionFactory;
+   
+   private volatile boolean closed;
+      
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientConnectionImpl(final long serverTargetID,
+   public ClientConnectionImpl(final ClientConnectionFactory connectionFactory,
+                               final long serverTargetID,
                                final RemotingConnection connection,
-                               final int defaultConsumerWindowSize,     
-                               final int defaultConsumerMaxRate,
-                               final int defaultProducerWindowSize,
-                               final int defaultProducerMaxRate,
-                               final boolean defaultBlockOnAcknowledge,
-                               final boolean defaultSendNonPersistentMessagesBlocking,
-                               final boolean defaultSendPersistentMessagesBlocking,
                                final Version serverVersion)
    {
+      this.connectionFactory = connectionFactory;
+      
       this.serverTargetID = serverTargetID;
       
       this.remotingConnection = connection;
       
-      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
-      
-      this.defaultConsumerMaxRate = defaultConsumerMaxRate;
-      
-      this.defaultProducerWindowSize = defaultProducerWindowSize;
-      
-      this.defaultProducerMaxRate = defaultProducerMaxRate;
-      
-      this.defaultBlockOnAcknowledge = defaultBlockOnAcknowledge;
-      
-      this.defaultSendNonPersistentMessagesBlocking = defaultSendNonPersistentMessagesBlocking;
-      
-      this.defaultSendPersistentMessagesBlocking = defaultSendPersistentMessagesBlocking;
-
       this.serverVersion = serverVersion;
    }
    
@@ -132,11 +102,12 @@
          (ConnectionCreateSessionResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);   
 
       ClientSession session =
-      	new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers,
-      			autoCommitSends, autoCommitAcks, blockOnAcknowledge, defaultSendNonPersistentMessagesBlocking,
-      			defaultSendPersistentMessagesBlocking, 
-      			defaultConsumerWindowSize, defaultConsumerMaxRate, defaultProducerWindowSize,
-      			defaultProducerMaxRate);
+      	new ClientSessionImpl(this, response.getSessionID(), xa, ackBatchSize, cacheProducers,
+      			autoCommitSends, autoCommitAcks, blockOnAcknowledge, connectionFactory.isDefaultBlockOnNonPersistentSend(),
+      			connectionFactory.isDefaultBlockOnPersistentSend(), 
+      			connectionFactory.getDefaultConsumerWindowSize(), connectionFactory.getDefaultConsumerMaxRate(),
+      			connectionFactory.getDefaultProducerWindowSize(),
+      			connectionFactory.getDefaultProducerMaxRate());
 
       sessions.add(session);
 
@@ -144,10 +115,11 @@
    }
    
    public ClientSession createClientSession(final boolean xa, final boolean autoCommitSends,
-         final boolean autoCommitAcks,
-         final int ackBatchSize) throws MessagingException
+                                            final boolean autoCommitAcks,
+                                            final int ackBatchSize) throws MessagingException
    {
-      return createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, defaultBlockOnAcknowledge, false);
+      return createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize,
+                                 connectionFactory.isDefaultBlockOnAcknowledge(), false);
    }
    
    public void start() throws MessagingException
@@ -209,6 +181,11 @@
    {
       sessions.remove(session);
    }
+   
+   public Set<ClientSession> getSessions()
+   {
+      return new HashSet<ClientSession>(this.sessions);
+   }
 
    public Version getServerVersion()
    {
@@ -236,7 +213,7 @@
       //We copy the set of sessions to prevent ConcurrentModificationException which would occur
       //when the child trues to remove itself from its parent
       Set<ClientSession> childrenClone = new HashSet<ClientSession>(sessions);
-      
+       
       for (ClientSession session: childrenClone)
       {
          session.close(); 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -6,6 +6,8 @@
  */
 package org.jboss.messaging.core.client.impl;
 
+import java.util.Set;
+
 import org.jboss.messaging.core.client.ClientConnection;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -22,4 +24,6 @@
    RemotingConnection getRemotingConnection();
 
    void removeSession(ClientSession session);
+   
+   Set<ClientSession> getSessions();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -97,6 +97,8 @@
       
    private final long serverTargetID;
    
+   private final boolean xa;
+   
    private final int lazyAckBatchSize;
    
    private final boolean cacheProducers;
@@ -153,6 +155,7 @@
    // Constructors ---------------------------------------------------------------------------------
    
    public ClientSessionImpl(final ClientConnectionInternal connection, final long serverTargetID,
+                            final boolean xa,
                             final int lazyAckBatchSize, final boolean cacheProducers,                            
                             final boolean autoCommitSends, final boolean autoCommitAcks,
                             final boolean blockOnAcknowledge,
@@ -186,6 +189,8 @@
       
       executor = Executors.newSingleThreadExecutor();
       
+      this.xa = xa;
+      
       this.lazyAckBatchSize = lazyAckBatchSize;
       
       if (cacheProducers)
@@ -523,11 +528,26 @@
    	return autoCommitAcks;   	   	
    }
    
+   public boolean isBlockOnAcknowledge()
+   {
+      return blockOnAcknowledge;
+   }
+   
+   public boolean isCacheProducers()
+   {
+      return cacheProducers;
+   }
+   
    public int getLazyAckBatchSize()
    {
    	return lazyAckBatchSize;
    }
    
+   public boolean isXA()
+   {
+      return xa;
+   }
+   
    // ClientSessionInternal implementation ------------------------------------------------------------
    
    public long getServerTargetID()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConnectionCreateSessionMessage.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -76,6 +76,22 @@
       autoCommitSends = buffer.getBoolean();
       autoCommitAcks = buffer.getBoolean();
    }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof ConnectionCreateSessionMessage == false)
+      {
+         return false;
+      }
+            
+      ConnectionCreateSessionMessage r = (ConnectionCreateSessionMessage)other;
+      
+      boolean matches = this.xa == r.xa &&
+                        this.autoCommitSends == r.autoCommitSends &&
+                        this.autoCommitAcks == r.autoCommitAcks;
+      
+      return matches;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/EmptyPacket.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -197,6 +197,18 @@
       return getParentString() + "]";
    }
    
+   public boolean equals(Object other)
+   {
+      if (other instanceof EmptyPacket == false)
+      {
+         return false;
+      }
+            
+      EmptyPacket r = (EmptyPacket)other;
+      
+      return r.type == this.type;      
+   }
+   
    // Package protected ---------------------------------------------
 
    protected String getParentString()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -162,7 +162,7 @@
       return name;
    }
    
-   public synchronized HandleStatus addLast(final MessageReference ref)
+   public HandleStatus addLast(final MessageReference ref)
    {
       if (locked)
       {
@@ -181,7 +181,7 @@
       }
    }
 
-   public synchronized HandleStatus addFirst(final MessageReference ref)
+   public HandleStatus addFirst(final MessageReference ref)
    {
       if (locked)
       {
@@ -535,7 +535,7 @@
    // Private
    // ------------------------------------------------------------------------------
 
-   private HandleStatus add(final MessageReference ref, final boolean first)
+   private synchronized HandleStatus add(final MessageReference ref, final boolean first)
    {
       if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().encodeSize() >= maxSizeBytes)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-10 14:21:36 UTC (rev 4422)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -304,13 +304,13 @@
    
    private void checkDefaults(final ClientConnectionFactory cf) throws Exception
    {
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, cf.getDefaultConsumerWindowSize());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, cf.getDefaultConsumerMaxRate());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, cf.getDefaultProducerWindowSize());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, cf.getDefaultProducerMaxRate());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isDefaultBlockOnAcknowledge());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isDefaultBlockOnNonPersistentSend());
-      assertEquals(ClientConnectionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND, cf.isDefaultBlockOnPersistentSend());      
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_CONSUMER_WINDOW_SIZE, cf.getDefaultConsumerWindowSize());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_CONSUMER_MAX_RATE, cf.getDefaultConsumerMaxRate());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_PRODUCER_WINDOW_SIZE, cf.getDefaultProducerWindowSize());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_PRODUCER_MAX_RATE, cf.getDefaultProducerMaxRate());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isDefaultBlockOnAcknowledge());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isDefaultBlockOnNonPersistentSend());
+      assertEquals(ClientConnectionFactoryImpl.DEFAULT_DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND, cf.isDefaultBlockOnPersistentSend());      
    }
    
    private void checkGetSetAttributes(ClientConnectionFactory cf,

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java	2008-06-10 14:36:20 UTC (rev 4423)
@@ -0,0 +1,411 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.client.impl;
+
+import java.util.Set;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ClientConnection;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientConnectionImpl;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.core.version.impl.VersionImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * 
+ * A ClientConnectionImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClientConnectionImplTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(ClientConnectionImplTest.class);
+
+   public void testGetAttributes() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+
+      Version version = new VersionImpl("blah132", 1, 1, 1, 12, "blah1652");
+
+      Location location = new LocationImpl(TransportType.TCP, "sausages");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, 23, rc, version);
+
+      assertTrue(conn.getServerVersion() == version);
+      assertTrue(conn.getRemotingConnection() == rc);      
+   }
+
+   public void testCreateSession() throws Exception
+   {
+      testCreateSession(false, false, false, 14526512, false, false, true);
+      testCreateSession(true, true, true, 14526512, true, true, true);
+
+      testCreateSession(false, false, false, 14526512, false, false, false);
+      testCreateSession(true, true, true, 14526512, true, true, false);
+   }
+   
+   public void testStartStop() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+      
+      Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+      Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+      final int serverTargetID = 23;
+      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+      
+      rc.sendOneWay(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CONN_START));
+      
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CONN_STOP))).andReturn(null);
+      
+      EasyMock.replay(rc);
+      
+      conn.start();
+      
+      conn.stop();
+      
+      EasyMock.verify(rc);            
+   }
+   
+   public void testSetRemotingSessionListener() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+      
+      Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+      Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+      final int serverTargetID = 23;
+      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+      
+      RemotingSessionListener listener = new RemotingSessionListener()
+      {
+         public void sessionDestroyed(long sessionID, MessagingException me)
+         {            
+         }
+      };
+      
+      rc.setRemotingSessionListener(listener);
+            
+      EasyMock.replay(rc);
+      
+      conn.setRemotingSessionListener(listener);
+      
+      EasyMock.verify(rc);            
+   }
+   
+   public void testClose() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+      
+      Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+      Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+      
+      final int serverTargetID = 23;
+      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+      
+      assertFalse(conn.isClosed());
+    
+      //Create some sessions
+      
+      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
+
+      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      request = new ConnectionCreateSessionMessage(false, false, false);
+
+      response = new ConnectionCreateSessionResponseMessage(2);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      request = new ConnectionCreateSessionMessage(false, false, false);
+
+      response = new ConnectionCreateSessionResponseMessage(3);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      EasyMock.replay(rc);
+      
+      ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
+      
+      ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
+      
+      ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
+      
+      assertFalse(sess1.isClosed());
+      assertFalse(sess2.isClosed());
+      assertFalse(sess3.isClosed());
+      
+      EasyMock.verify(rc);
+      
+      EasyMock.reset(rc);
+            
+      //And the closes of the sessions - this can be in a different order
+      EasyMock.checkOrder(rc, false);
+      EasyMock.expect(rc.sendBlocking(1, 1, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+      EasyMock.expect(rc.sendBlocking(2, 2, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+      EasyMock.expect(rc.sendBlocking(3, 3, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+      EasyMock.checkOrder(rc, true);
+      
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new EmptyPacket(EmptyPacket.CLOSE))).andReturn(null);
+            
+      rc.stop();      
+      
+      EasyMock.replay(rc);
+      
+      conn.close();
+      
+      EasyMock.verify(rc);
+      
+      assertTrue(sess1.isClosed());
+      assertTrue(sess2.isClosed());
+      assertTrue(sess3.isClosed());
+      
+      assertTrue(conn.isClosed());
+      
+      //Close again should do nothing
+      EasyMock.reset(rc);
+      
+      EasyMock.replay(rc);
+      
+      conn.close();
+      
+      EasyMock.verify(rc);
+      
+      try
+      {
+         conn.createClientSession(false, false, false, 65655);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         conn.createClientSession(false, false, false, 545, false, false);
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         conn.start();
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         conn.stop();
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+      
+      try
+      {
+         conn.setRemotingSessionListener(new RemotingSessionListener()
+         {
+            public void sessionDestroyed(long sessionID, MessagingException me)
+            {            
+            }
+         });
+         fail("Should throw exception");
+      }
+      catch (MessagingException e)
+      {
+         assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
+      }
+   }
+   
+   public void testRemoveSession() throws Exception
+   {
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);   
+      
+      Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
+
+      Location location = new LocationImpl(TransportType.TCP, "ftftf");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+      
+      final int serverTargetID = 23;
+      
+      ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version);
+      
+      assertFalse(conn.isClosed());
+    
+      //Create some sessions
+      
+      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
+
+      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(1);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      request = new ConnectionCreateSessionMessage(false, false, false);
+
+      response = new ConnectionCreateSessionResponseMessage(2);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      request = new ConnectionCreateSessionMessage(false, false, false);
+
+      response = new ConnectionCreateSessionResponseMessage(3);
+
+      EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+      
+      EasyMock.replay(rc);
+      
+      ClientSession sess1 = conn.createClientSession(false, false, false, 23234);
+      
+      ClientSession sess2 = conn.createClientSession(false, false, false, 23234);
+      
+      ClientSession sess3 = conn.createClientSession(false, false, false, 23234);
+      
+      assertFalse(sess1.isClosed());
+      assertFalse(sess2.isClosed());
+      assertFalse(sess3.isClosed());
+            
+      Set<ClientSession> sessions = conn.getSessions();
+      assertEquals(3, sessions.size());
+      assertTrue(sessions.contains(sess1));
+      assertTrue(sessions.contains(sess2));
+      assertTrue(sessions.contains(sess3));
+      
+      EasyMock.verify(rc);
+      
+      EasyMock.reset(rc);
+      
+      conn.removeSession(sess2);
+      
+      sessions = conn.getSessions();
+      assertEquals(2, sessions.size());
+      assertTrue(sessions.contains(sess1));     
+      assertTrue(sessions.contains(sess3));
+      
+      conn.removeSession(sess1);
+      
+      sessions = conn.getSessions();
+      assertEquals(1, sessions.size());   
+      assertTrue(sessions.contains(sess3));
+   }
+               
+   // Private -----------------------------------------------------------------------------------------------------------
+
+
+   private void testCreateSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
+         final int ackBatchSize, final boolean blockOnAcknowledge,
+         final boolean cacheProducers, final boolean useDefaults) throws Exception
+   {       
+      RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class); 
+
+      Location location = new LocationImpl(TransportType.TCP, "oranges");
+
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+
+      if (useDefaults)
+      {
+         cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge); 
+      }
+      else
+      {
+         cf.setDefaultBlockOnAcknowledge(!blockOnAcknowledge); // Should be ignored
+      }
+
+      final int connTargetID = 17267162;
+
+      Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
+
+      ClientConnection conn = new ClientConnectionImpl(cf, connTargetID, rc, version);
+
+      ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(xa, autoCommitSends, autoCommitAcks);
+
+      final int sessionTargetID = 12127162;
+
+      ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(sessionTargetID);
+
+      EasyMock.expect(rc.sendBlocking(connTargetID, connTargetID, request)).andReturn(response);
+
+      EasyMock.replay(rc);
+
+      ClientSession session;
+
+      if (useDefaults)
+      {
+         session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+      }
+      else
+      {
+         session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge,
+               cacheProducers);
+      }
+
+
+      assertEquals(ackBatchSize, session.getLazyAckBatchSize());
+      assertEquals(xa, session.isXA());
+      assertEquals(autoCommitSends, session.isAutoCommitSends());
+      assertEquals(autoCommitAcks, session.isAutoCommitAcks());
+      assertEquals(blockOnAcknowledge, session.isBlockOnAcknowledge());
+
+      EasyMock.verify(rc);
+   }
+}




More information about the jboss-cvs-commits mailing list