[jboss-cvs] JBoss Messaging SVN: r7763 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 17 13:36:53 EDT 2009


Author: timfox
Date: 2009-08-17 13:36:52 -0400 (Mon, 17 Aug 2009)
New Revision: 7763

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/DelegatingSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageHeaderTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/SessionCloseOnGCTest.java
Log:
 mainly factory gc fix plus a few other bits and pieces

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -50,6 +50,8 @@
    /**
     * Queues created by this method are <em>not</em> temporary
     */
+   void createQueue(String address, String queueName) throws MessagingException;
+   
    void createQueue(String address, String queueName, boolean durable) throws MessagingException;
 
    void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable) throws MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.client;
 
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -406,9 +406,9 @@
 
       // Flow control for the first packet, we will have others
       flowControl(packet.getPacketSize(), false);
+            
+      currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
 
-      currentChunkMessage = new ClientMessageImpl();
-
       currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
 
       currentChunkMessage.setLargeMessage(true);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.LargeMessageBuffer;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.message.impl.MessageImpl;
@@ -52,7 +53,6 @@
 
    private int flowControlSize = -1;
 
-
    /*
     * Constructor for when reading from network
     */
@@ -85,9 +85,10 @@
    {
       super((byte)0, durable, 0, System.currentTimeMillis(), (byte)4, body);
    }
-
-   public ClientMessageImpl()
+   
+   public ClientMessageImpl(final boolean durable)
    {
+      super((byte)0, durable, 0, System.currentTimeMillis(), (byte)4, ChannelBuffers.dynamicBuffer(1024));
    }
 
    public void onReceipt(final ClientConsumerInternal consumer)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -108,7 +108,7 @@
    public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
 
    public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
-   
+
    public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
 
    // Attributes
@@ -131,7 +131,7 @@
    private boolean readOnly;
 
    // Settable attributes:
-   
+
    private boolean cacheLargeMessagesClient = DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 
    private List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
@@ -249,9 +249,9 @@
    }
 
    private void initialise() throws Exception
-   {            
+   {
       setThreadPools();
-      
+
       instantiateLoadBalancingPolicy();
 
       if (discoveryAddress != null)
@@ -399,8 +399,7 @@
    {
       this.cacheLargeMessagesClient = cached;
    }
-   
-   
+
    public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
    {
       return staticConnectors;
@@ -809,11 +808,6 @@
          }
       }
 
-      for (ConnectionManager connectionManager : connectionManagerMap.values())
-      {
-         connectionManager.causeExit();
-      }
-
       connectionManagerMap.clear();
 
       if (!useGlobalPools)
@@ -911,6 +905,14 @@
 
    // Protected ------------------------------------------------------------------------------
 
+   @Override
+   protected void finalize() throws Throwable
+   {
+      close();
+
+      super.finalize();
+   }
+
    // Private --------------------------------------------------------------------------------
 
    private void checkWrite()

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -237,6 +237,11 @@
    // ClientSession implementation
    // -----------------------------------------------------------------
 
+   public void createQueue(final SimpleString address, final SimpleString queueName) throws MessagingException
+   {
+      internalCreateQueue(address, queueName, null, false, false);
+   }
+   
    public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws MessagingException
    {
       internalCreateQueue(address, queueName, null, durable, false);
@@ -325,7 +330,12 @@
    {
       return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, false);
    }
-
+   
+   public void createQueue(final String address, final String queueName) throws MessagingException
+   {
+      internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null, false, false);
+   }
+   
    public ClientConsumer createConsumer(final String queueName, final String filterString) throws MessagingException
    {
       return createConsumer(toSimpleString(queueName), toSimpleString(filterString));

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -62,12 +62,8 @@
 
    int numSessions();
    
-   void causeExit();
-   
    RemotingConnection getConnection(final int initialRefCount);
    
-   void close();
-   
    void addFailureListener(FailureListener listener);
 
    boolean removeFailureListener(FailureListener listener);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -24,6 +24,7 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -89,9 +90,8 @@
    // Attributes
    // -----------------------------------------------------------------------------------
 
-   // We need to keep the reference to prevent the factory getting gc'd before the sessions are finished being used
-   private final ClientSessionFactory factory;
-
+   private final ClientSessionFactory sessionFactory;
+   
    private final TransportConfiguration connectorConfig;
 
    private final TransportConfiguration backupConfig;
@@ -142,8 +142,6 @@
 
    private boolean failoverOnServerShutdown;
 
-   private volatile boolean closed;
-
    private Set<FailureListener> listeners = new ConcurrentHashSet<FailureListener>();
 
    private Connector connector;
@@ -171,7 +169,7 @@
    // Constructors
    // ---------------------------------------------------------------------------------
 
-   public ConnectionManagerImpl(final ClientSessionFactory factory,
+   public ConnectionManagerImpl(final ClientSessionFactory sessionFactory,
                                 final TransportConfiguration connectorConfig,
                                 final TransportConfiguration backupConfig,
                                 final boolean failoverOnServerShutdown,
@@ -185,8 +183,8 @@
                                 final ExecutorService threadPool,
                                 final ScheduledExecutorService scheduledThreadPool)
    {
-      this.factory = factory;
-
+      this.sessionFactory = sessionFactory;
+      
       this.connectorConfig = connectorConfig;
 
       this.backupConfig = backupConfig;
@@ -452,26 +450,6 @@
       return sessions.size();
    }
 
-   public void causeExit()
-   {
-      closed = true;
-   }
-
-   public void close()
-   {
-      synchronized (failoverLock)
-      {
-         synchronized (createSessionLock)
-         {
-            refCount = 0;
-
-            checkCloseConnections();
-         }
-      }
-
-      closed = true;
-   }
-
    public void addFailureListener(FailureListener listener)
    {
       listeners.add(listener);
@@ -598,7 +576,6 @@
                oldConnections.add(entry.connection);
             }
 
-            // closePingers();
 
             connections.clear();
 
@@ -792,11 +769,6 @@
 
       while (true)
       {
-         if (closed)
-         {
-            return null;
-         }
-
          RemotingConnection connection = getConnection(initialRefCount);
 
          if (connection == null)
@@ -849,8 +821,8 @@
          {
             pingRunnable.cancel();
 
-            pingerFuture.cancel(false);
-
+            boolean ok = pingerFuture.cancel(false);
+            
             pingRunnable = null;
 
             pingerFuture = null;
@@ -881,12 +853,13 @@
          catch (Throwable ignore)
          {
          }
+         
 
          connector = null;
       }
 
    }
-
+   
    public RemotingConnection getConnection(final int initialRefCount)
    {
       RemotingConnection conn;
@@ -987,7 +960,7 @@
          {
             pingRunnable = new PingRunnable();
 
-            pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(pingRunnable,
+            pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduled(pingRunnable),
                                                                       0,
                                                                       clientFailureCheckPeriod,
                                                                       TimeUnit.MILLISECONDS);
@@ -1219,13 +1192,34 @@
          }
       }
    }
+   
+   private static final class ActualScheduled implements Runnable
+   {
+      private final WeakReference<PingRunnable> pingRunnable;
+      
+      ActualScheduled(final PingRunnable runnable)
+      {
+         this.pingRunnable = new WeakReference<PingRunnable>(runnable);
+      }
+      
+      public void run()
+      {
+         PingRunnable runnable = pingRunnable.get();
+         
+         if (runnable != null)
+         {
+            runnable.run();
+         }
+      }
+      
+   }
 
-   private class PingRunnable implements Runnable
+   private final class PingRunnable implements Runnable
    {
       private boolean cancelled;
 
       private boolean first;
-
+      
       public synchronized void run()
       {
          if (cancelled || (stopPingingAfterOne && !first))
@@ -1277,7 +1271,7 @@
             }
          }
       }
-
+      
       public synchronized void cancel()
       {
          cancelled = true;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/DelegatingSession.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/DelegatingSession.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -20,7 +20,6 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.client.impl;
 
 import javax.transaction.xa.XAException;
@@ -55,31 +54,30 @@
 public class DelegatingSession implements ClientSessionInternal
 {
    private static final Logger log = Logger.getLogger(DelegatingSession.class);
-   
+
    private final ClientSessionInternal session;
-   
+
    private Exception creationStack;
-   
+
    @Override
    protected void finalize() throws Throwable
    {
       if (!session.isClosed())
       {
          log.warn("I'm closing a core ClientSession you left open. Please make sure you close all ClientSessions explicitly " + "before letting them go out of scope!");
-         
+
          log.warn("The ClientSession you didn't close was created here:", creationStack);
-         
+
          close();
       }
 
       super.finalize();
    }
 
-   
    public DelegatingSession(final ClientSessionInternal session)
    {
       this.session = session;
-      
+
       this.creationStack = new Exception();
    }
 
@@ -237,6 +235,11 @@
       return session.createProducer(address);
    }
 
+   public void createQueue(String address, String queueName) throws MessagingException
+   {
+      session.createQueue(address, queueName);
+   }
+   
    public void createQueue(SimpleString address, SimpleString queueName, boolean durable) throws MessagingException
    {
       session.createQueue(address, queueName, durable);
@@ -470,5 +473,5 @@
    public void setForceNotSameRM(boolean force)
    {
       session.setForceNotSameRM(force);
-   }   
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -174,11 +174,11 @@
    private RemotingService remotingService;
 
    private ManagementService managementService;
-   
+
    private MemoryManager memoryManager;
 
    private DeploymentManager deploymentManager;
-      
+
    private Deployer basicUserCredentialsDeployer;
 
    private Deployer addressSettingsDeployer;
@@ -196,13 +196,12 @@
    private final Object initialiseLock = new Object();
 
    private boolean initialised;
-   
+
    private ConnectionManager replicatingConnectionManager;
 
    private int managementConnectorID;
-   
+
    private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-   
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -226,7 +225,7 @@
    {
       this(configuration, null, securityManager);
    }
-   
+
    public MessagingServerImpl(Configuration configuration,
                               MBeanServer mbeanServer,
                               final JBMSecurityManager securityManager)
@@ -255,16 +254,16 @@
       this.addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
 
       addressSettingsRepository.setDefault(new AddressSettings());
-      
+
       this.managementConnectorID = managementConnectorSequence.decrementAndGet();
    }
-   
+
    // lifecycle methods
    // ----------------------------------------------------------------
 
    public synchronized void start() throws Exception
    {
-      log.info((configuration.isBackup() ? "backup":"live") + " server is starting..");
+      log.info((configuration.isBackup() ? "backup" : "live") + " server is starting..");
 
       if (started)
       {
@@ -289,24 +288,22 @@
 
       log.info("JBoss Messaging Server version " + getVersion().getFullVersion() + " started");
    }
-   
+
    @Override
    protected void finalize() throws Throwable
    {
       if (started)
       {
-         log.warn("MessagingServer is being finalized and has not been stopped. Please remember to stop the " +
-                  "server before letting it go out of scope");
-         
-         stop();         
+         log.warn("MessagingServer is being finalized and has not been stopped. Please remember to stop the " + "server before letting it go out of scope");
+
+         stop();
       }
-      
+
       super.finalize();
    }
-    
 
    public synchronized void stop() throws Exception
-   {      
+   {
       if (!started)
       {
          return;
@@ -369,8 +366,7 @@
 
          replicatingConnection = null;
          replicatingChannel = null;
-         
-         replicatingConnectionManager.close();
+         replicatingConnectionManager = null;
       }
 
       resourceManager.stop();
@@ -402,7 +398,7 @@
       threadPool = null;
 
       pagingManager.stop();
-      
+
       memoryManager.stop();
 
       pagingManager = null;
@@ -562,7 +558,7 @@
                                                      final int sendWindowSize) throws Exception
    {
       checkActivate(connection);
-      
+
       return doCreateSession(name,
                              channelID,
                              replicatedChannelID,
@@ -623,7 +619,7 @@
       {
          throw new IllegalArgumentException("node id is null");
       }
-      
+
       synchronized (initialiseLock)
       {
          if (initialised)
@@ -643,9 +639,12 @@
          {
             initialised = false;
 
-            throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID + ":" + backupID + "). You're probably trying to restart a live backup pair after a crash");
+            throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+                                            ":" +
+                                            backupID +
+                                            "). You're probably trying to restart a live backup pair after a crash");
          }
-         
+
          log.info("Backup server is now operational");
       }
    }
@@ -659,7 +658,7 @@
    {
       return remotingService.getConnections().size();
    }
-   
+
    public PostOffice getPostOffice()
    {
       return postOffice;
@@ -770,7 +769,7 @@
    {
       activateCallbacks.remove(callback);
    }
-   
+
    public ExecutorFactory getExecutorFactory()
    {
       return executorFactory;
@@ -790,7 +789,7 @@
       return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
                                    storageManager,
                                    addressSettingsRepository,
-                                    configuration.isJournalSyncNonTransactional(),
+                                   configuration.isJournalSyncNonTransactional(),
                                    configuration.isBackup());
    }
 
@@ -810,7 +809,7 @@
       if (configuration.isBackup())
       {
          log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed.");
-         
+
          synchronized (this)
          {
             freezeBackupConnection();
@@ -830,7 +829,7 @@
             {
                clusterManager.activate();
             }
-            
+
             if (configuration.isFileDeploymentEnabled())
             {
                queueDeployer = new QueueDeployer(deploymentManager, messagingServerControl);
@@ -900,14 +899,19 @@
                                                                                                      false));
 
       managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
-      
-      remotingService = new RemotingServiceImpl(configuration, this, managementService, threadPool, scheduledPool, managementConnectorID);      
-      
+
+      remotingService = new RemotingServiceImpl(configuration,
+                                                this,
+                                                managementService,
+                                                threadPool,
+                                                scheduledPool,
+                                                managementConnectorID);
+
       memoryManager = new MemoryManagerImpl();
-      
+
       memoryManager.start();
    }
-   
+
    private void initialisePart2() throws Exception
    {
       // Create the hard-wired components
@@ -1017,8 +1021,9 @@
       deployQueuesFromConfiguration();
 
       // Deploy any predefined queues
-      
-      // We don't activate queue deployer on the backup - all queues deployed on live are deployed on backup by replicating them
+
+      // We don't activate queue deployer on the backup - all queues deployed on live are deployed on backup by
+      // replicating them
       if (configuration.isFileDeploymentEnabled() && !configuration.isBackup())
       {
          queueDeployer = new QueueDeployer(deploymentManager, messagingServerControl);
@@ -1152,7 +1157,7 @@
                   }
                });
 
-               //This may take a while especially if the journal is large
+               // This may take a while especially if the journal is large
                boolean ok = future.await(60000);
 
                if (!ok)
@@ -1164,12 +1169,12 @@
             {
                log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
 
-               return false; 
+               return false;
             }
          }
       }
 
-      return true;      
+      return true;
    }
 
    private void loadJournal() throws Exception
@@ -1224,15 +1229,15 @@
    }
 
    private void setNodeID() throws Exception
-   {      
+   {
       if (!configuration.isBackup())
-      {         
+      {
          if (uuid == null)
-         {            
+         {
             uuid = storageManager.getPersistentID();
-            
+
             if (uuid == null)
-            {               
+            {
                uuid = UUIDGenerator.getInstance().generateUUID();
 
                storageManager.setPersistentID(uuid);
@@ -1242,9 +1247,9 @@
          }
       }
       else
-      {         
+      {
          UUID currentUUID = storageManager.getPersistentID();
-         
+
          if (currentUUID != null)
          {
             if (!currentUUID.equals(uuid))
@@ -1381,10 +1386,12 @@
    {
       if (version.getIncrementingVersion() != incrementingVersion)
       {
-         log.warn("Client with version " + incrementingVersion + " is not compatible with server version " +
-                                      version.getFullVersion()  + ". " +
-                                      "Please ensure all clients and servers are upgraded to the same version for them to " +
-                                      "interoperate");
+         log.warn("Client with version " + incrementingVersion +
+                  " is not compatible with server version " +
+                  version.getFullVersion() +
+                  ". " +
+                  "Please ensure all clients and servers are upgraded to the same version for them to " +
+                  "interoperate");
          return null;
       }
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageHeaderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageHeaderTest.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageHeaderTest.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -829,6 +829,12 @@
 
    class FakeSession implements ClientSession
    {
+      public void createQueue(String address, String queueName) throws MessagingException
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
       private final ClientMessage message;
 
       public FakeSession(ClientMessage message)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/SessionCloseOnGCTest.java	2009-08-17 16:48:11 UTC (rev 7762)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/SessionCloseOnGCTest.java	2009-08-17 17:36:52 UTC (rev 7763)
@@ -26,8 +26,11 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory;
+import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
@@ -45,6 +48,9 @@
       super.setUp();
 
       server = createServer(false);
+      server.getConfiguration()
+            .getAcceptorConfigurations()
+            .add(new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName()));
 
       server.start();
    }
@@ -58,36 +64,162 @@
 
       super.tearDown();
    }
-   
+
    /** Make sure Sessions are not leaking after closed..
     *  Also... we want to make sure the SessionFactory will close itself when there are not references into it */
-   public void testValidateLeakWithClosedSessions() throws Exception
+   public void testValidateFactoryGC1() throws Exception
    {
       try
       {
-         ClientSessionFactory factory = createInVMFactory();
+         ClientSessionFactory factory = this.createInVMFactory();
          
          ClientSession s1 = factory.createSession();
          ClientSession s2 = factory.createSession();
-         
+
          s1.close();
          s2.close();
+
+         WeakReference<ClientSession> wrs1 = new WeakReference<ClientSession>(s1);
+         WeakReference<ClientSession> wrs2 = new WeakReference<ClientSession>(s2);
+
+         s1 = null;
+         s2 = null;
+
+         checkWeakReferences(wrs1, wrs2);
+
+         WeakReference<ClientSessionFactory> fref = new WeakReference<ClientSessionFactory>(factory);
+
+         factory.close();
+
+         factory = null;
+
+         checkWeakReferences(fref, wrs1, wrs2);
+
+         assertNull(fref.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+
+   public void testValidateFactoryGC2() throws Exception
+   {
+      try
+      {
+         ClientSessionFactory factory = this.createInVMFactory();
+
+         factory.setUseGlobalPools(false);
+
+         ClientSession s1 = factory.createSession();
+         ClientSession s2 = factory.createSession();
+
+         s1.close();
+         s2.close();
+
+         WeakReference<ClientSession> wrs1 = new WeakReference<ClientSession>(s1);
+         WeakReference<ClientSession> wrs2 = new WeakReference<ClientSession>(s2);
+
+         s1 = null;
+         s2 = null;
+
+         checkWeakReferences(wrs1, wrs2);
+
+         WeakReference<ClientSessionFactory> fref = new WeakReference<ClientSessionFactory>(factory);
+
+         factory.close();
+
+         factory = null;
+
+         checkWeakReferences(fref, wrs1, wrs2);
+
+         assertNull(fref.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+
+   public void testValidateFactoryGC3() throws Exception
+   {
+      try
+      {
+         ClientSessionFactory factory = this.createInVMFactory();
+
+         ClientSession s1 = factory.createSession();
+         ClientSession s2 = factory.createSession();
+
+         s1.close();
+         s2.close();
+
+         WeakReference<ClientSession> wrs1 = new WeakReference<ClientSession>(s1);
+         WeakReference<ClientSession> wrs2 = new WeakReference<ClientSession>(s2);
+
+         s1 = null;
+         s2 = null;
+
+         checkWeakReferences(wrs1, wrs2);
+
+         WeakReference<ClientSessionFactory> fref = new WeakReference<ClientSessionFactory>(factory);
+
+         factory = null;
+
+         checkWeakReferences(fref, wrs1, wrs2);
+
+         assertNull(fref.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+   
+   public void testValidateFactoryGC4() throws Exception
+   {
+      try
+      {
+         ClientSessionFactory factory = this.createInVMFactory();
+
+         ClientSession s1 = factory.createSession();
+         ClientSession s2 = factory.createSession();
          
          WeakReference<ClientSession> wrs1 = new WeakReference<ClientSession>(s1);
          WeakReference<ClientSession> wrs2 = new WeakReference<ClientSession>(s2);
-         
+
          s1 = null;
-         
          s2 = null;
-         
+
          checkWeakReferences(wrs1, wrs2);
-         
+
          WeakReference<ClientSessionFactory> fref = new WeakReference<ClientSessionFactory>(factory);
-         
+
          factory = null;
-         
+
          checkWeakReferences(fref, wrs1, wrs2);
-         
+
+         assertNull(fref.get());
       }
       finally
       {
@@ -97,35 +229,59 @@
          }
          catch (Throwable ignored)
          {
-            
+
          }
       }
-      
    }
    
-   
+   public void testValidateFactoryGC5() throws Exception
+   {
+      try
+      {
+         ClientSessionFactory factory = this.createInVMFactory();
+         
+         WeakReference<ClientSessionFactory> fref = new WeakReference<ClientSessionFactory>(factory);
 
+         factory = null;
+
+         checkWeakReferences(fref);
+
+         assertNull(fref.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+
    public void testCloseOneSessionOnGC() throws Exception
    {
       ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
-      
+
       assertEquals(1, server.getRemotingService().getConnections().size());
-      
+
       session = null;
 
       System.gc();
       System.gc();
       System.gc();
-      
+
       Thread.sleep(2000);
-            
+
       assertEquals(0, sf.numSessions());
-      assertEquals(0, sf.numConnections());           
+      assertEquals(0, sf.numConnections());
       assertEquals(0, server.getRemotingService().getConnections().size());
    }
-   
+
    public void testCloseSeveralSessionOnGC() throws Exception
    {
       ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)createInVMFactory();
@@ -133,9 +289,9 @@
       ClientSession session1 = sf.createSession(false, true, true);
       ClientSession session2 = sf.createSession(false, true, true);
       ClientSession session3 = sf.createSession(false, true, true);
-      
+
       assertEquals(3, server.getRemotingService().getConnections().size());
-      
+
       session1 = null;
       session2 = null;
       session3 = null;
@@ -143,11 +299,11 @@
       System.gc();
       System.gc();
       System.gc();
-      
+
       Thread.sleep(2000);
-            
+
       assertEquals(0, sf.numSessions());
-      assertEquals(0, sf.numConnections());           
+      assertEquals(0, sf.numConnections());
       assertEquals(0, server.getRemotingService().getConnections().size());
    }
 




More information about the jboss-cvs-commits mailing list