[jboss-cvs] JBoss Messaging SVN: r5248 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 3 12:33:11 EST 2008


Author: timfox
Date: 2008-11-03 12:33:11 -0500 (Mon, 03 Nov 2008)
New Revision: 5248

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/SendLock.java
   trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java
   trunk/tests/bin/runtest
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Log:
More fixing tests, failover etc


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -126,7 +126,7 @@
    private final Executor executor;
 
    private volatile RemotingConnection remotingConnection;
-   
+
    private volatile RemotingConnection backupConnection;
 
    private final Map<Long, ClientProducerInternal> producers = new ConcurrentHashMap<Long, ClientProducerInternal>();
@@ -157,7 +157,7 @@
    private final IDGenerator idGenerator = new SimpleIDGenerator(0);
 
    private volatile boolean started;
-   
+
    // Constructors ----------------------------------------------------------------------------
 
    public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -179,7 +179,7 @@
       this.name = name;
 
       this.remotingConnection = remotingConnection;
-      
+
       this.backupConnection = backupConnection;
 
       this.connectionFactory = connectionFactory;
@@ -209,7 +209,7 @@
 
       this.channel = channel;
 
-      this.version = version;      
+      this.version = version;
    }
 
    // ClientSession implementation
@@ -220,7 +220,7 @@
                            final SimpleString filterString,
                            final boolean durable,
                            final boolean temp) throws MessagingException
-   {   
+   {
       checkClosed();
 
       SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temp);
@@ -229,14 +229,14 @@
    }
 
    public void deleteQueue(final SimpleString queueName) throws MessagingException
-   {    
+   {
       checkClosed();
 
       channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
    }
 
    public SessionQueueQueryResponseMessage queueQuery(final SimpleString queueName) throws MessagingException
-   {    
+   {
       checkClosed();
 
       SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
@@ -247,7 +247,7 @@
    }
 
    public SessionBindingQueryResponseMessage bindingQuery(final SimpleString address) throws MessagingException
-   {    
+   {
       checkClosed();
 
       SessionBindingQueryMessage request = new SessionBindingQueryMessage(address);
@@ -258,7 +258,7 @@
    }
 
    public void addDestination(final SimpleString address, final boolean durable, final boolean temp) throws MessagingException
-   {     
+   {
       checkClosed();
 
       SessionAddDestinationMessage request = new SessionAddDestinationMessage(address, durable, temp);
@@ -267,7 +267,7 @@
    }
 
    public void removeDestination(final SimpleString address, final boolean durable) throws MessagingException
-   {     
+   {
       checkClosed();
 
       SessionRemoveDestinationMessage request = new SessionRemoveDestinationMessage(address, durable);
@@ -276,7 +276,7 @@
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
-   {      
+   {
       checkClosed();
 
       return createConsumer(queueName, null, false);
@@ -285,14 +285,15 @@
    public ClientConsumer createConsumer(final SimpleString queueName,
                                         final SimpleString filterString,
                                         final boolean direct) throws MessagingException
-   {     
+   {
       checkClosed();
 
       return createConsumer(queueName,
                             filterString,
                             direct,
                             connectionFactory.getConsumerWindowSize(),
-                            connectionFactory.getConsumerMaxRate(), false);
+                            connectionFactory.getConsumerMaxRate(),
+                            false);
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName,
@@ -300,11 +301,12 @@
                                         final boolean direct,
                                         final boolean browseOnly) throws MessagingException
    {
-       return createConsumer(queueName,
+      return createConsumer(queueName,
                             filterString,
                             direct,
                             connectionFactory.getConsumerWindowSize(),
-                            connectionFactory.getConsumerMaxRate(), browseOnly);
+                            connectionFactory.getConsumerMaxRate(),
+                            browseOnly);
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName,
@@ -313,7 +315,7 @@
                                         final int windowSize,
                                         final int maxRate,
                                         final boolean browseOnly) throws MessagingException
-   {     
+   {
       checkClosed();
 
       SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
@@ -372,7 +374,7 @@
    }
 
    public ClientProducer createProducer(final SimpleString address) throws MessagingException
-   {      
+   {
       checkClosed();
 
       return createProducer(address, connectionFactory.getProducerMaxRate());
@@ -380,7 +382,7 @@
 
    public ClientProducer createProducer(final SimpleString address, final int maxRate) throws MessagingException
    {
-      return createProducer(address,               
+      return createProducer(address,
                             maxRate,
                             connectionFactory.isBlockOnNonPersistentSend(),
                             connectionFactory.isBlockOnPersistentSend());
@@ -390,7 +392,7 @@
                                         final int maxRate,
                                         final boolean blockOnNonPersistentSend,
                                         final boolean blockOnPersistentSend) throws MessagingException
-   {    
+   {
       checkClosed();
 
       ClientProducerInternal producer = null;
@@ -402,9 +404,7 @@
 
       if (producer == null)
       {
-         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,                                                                             
-                                                                                 maxRate,
-                                                                                 autoGroupId);
+         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, maxRate, autoGroupId);
 
          SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
 
@@ -422,7 +422,7 @@
                                                                                                    false),
                                            autoCommitSends && blockOnNonPersistentSend,
                                            autoCommitSends && blockOnPersistentSend,
-                                           response.getAutoGroupId(),                                          
+                                           response.getAutoGroupId(),
                                            channel);
       }
 
@@ -447,10 +447,10 @@
    {
       checkClosed();
 
-      //We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
-      //first before rolling back
-      //This ensures messages are received in the same order after rollback w.r.t. to messages in the buffer
-      //For core we could just do a straight rollback, it really depends if we want JMS style semantics or not...
+      // We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
+      // first before rolling back
+      // This ensures messages are received in the same order after rollback w.r.t. to messages in the buffer
+      // For core we could just do a straight rollback, it really depends if we want JMS style semantics or not...
 
       boolean wasStarted = started;
 
@@ -529,7 +529,7 @@
    }
 
    public void start() throws MessagingException
-   {    
+   {
       checkClosed();
 
       if (!started)
@@ -541,7 +541,7 @@
    }
 
    public void stop() throws MessagingException
-   {   
+   {
       checkClosed();
 
       if (started)
@@ -577,7 +577,7 @@
 
    // This acknowledges all messages received by the consumer so far
    public void acknowledge(final long consumerID, final long messageID) throws MessagingException
-   {     
+   {
       checkClosed();
 
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
@@ -643,7 +643,7 @@
    }
 
    public void close() throws MessagingException
-   {     
+   {
       if (closed)
       {
          return;
@@ -675,14 +675,14 @@
       doCleanup();
    }
 
-   //Needs to be synchronized to prevent issues with occurring concurrently with close()
+   // Needs to be synchronized to prevent issues with occurring concurrently with close()
    public synchronized void handleFailover()
    {
       if (closed)
       {
-         return ;
+         return;
       }
-      
+
       // We lock the channel to prevent any packets to be added to the resend
       // cache during the failover process
       channel.lock();
@@ -698,9 +698,9 @@
          Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
 
          Channel channel1 = backupConnection.getChannel(1, -1, false);
-        
-         ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);         
 
+         ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+
          if (!response.isRemoved())
          {
             channel.replayCommands(response.getLastReceivedCommandID());
@@ -711,7 +711,7 @@
             // closed on the server so we need to interrupt it
             channel.returnBlocking();
          }
-         
+
          backupConnection = null;
       }
       catch (Throwable t)
@@ -724,8 +724,8 @@
       }
 
       channel.send(new SessionFailoverCompleteMessage(name));
-      
-      //Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
+
+      // Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
       remotingConnection.addFailureListener(this);
    }
 
@@ -996,7 +996,7 @@
    // FailureListener implementation --------------------------------------------
 
    public void connectionFailed(final MessagingException me)
-   { 
+   {
       try
       {
          cleanUp();
@@ -1004,7 +1004,7 @@
       catch (Exception e)
       {
          log.error("Failed to cleanup session");
-      }         
+      }
    }
 
    // Public
@@ -1019,7 +1019,7 @@
    {
       return remotingConnection;
    }
-   
+
    public RemotingConnection getBackupConnection()
    {
       return backupConnection;
@@ -1057,16 +1057,16 @@
       {
          producerCache.clear();
       }
-     
+
       remotingConnection.removeFailureListener(this);
 
       synchronized (this)
       {
          closed = true;
-         
-         channel.close();                                  
-      }  
-      
+
+         channel.close();
+      }
+
       sessionFactory.removeSession(this);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -137,6 +137,14 @@
       pagingManager.stop();
 
       addressManager.clear();
+      
+      //Release all the locks
+      for (SendLock lock: addressLocks.values())
+      {
+         lock.close();
+      }
+      
+      addressLocks.clear();
 
       started = false;
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -210,17 +210,17 @@
    private boolean idGeneratorSynced = false;
 
    private final Object transferLock = new Object();
-   
+
    private final ChannelHandler ppHandler = new PingPongHandler();
-   
+
    private boolean frozen;
-   
+
    private final Object failLock = new Object();
-      
+
    // debug only stuff
 
    private boolean createdActive;
-  
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -239,22 +239,15 @@
    /*
     * Create a server side connection
     */
-   public RemotingConnectionImpl(final Connection transportConnection,                                                               
+   public RemotingConnectionImpl(final Connection transportConnection,
                                  final List<Interceptor> interceptors,
                                  final RemotingConnection replicatingConnection,
                                  final boolean active)
 
    {
-      this(transportConnection,
-           -1,
-           -1,
-           null,
-           interceptors,
-           replicatingConnection,
-           active,
-           false);
+      this(transportConnection, -1, -1, null, interceptors, replicatingConnection, active, false);
    }
-   
+
    private RemotingConnectionImpl(final Connection transportConnection,
                                   final long blockingCallTimeout,
                                   final long pingPeriod,
@@ -313,9 +306,7 @@
       return transportConnection.getID();
    }
 
-   public synchronized Channel getChannel(final long channelID,
-                                          final int windowSize,
-                                          final boolean block)
+   public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
    {
       ChannelImpl channel = channels.get(channelID);
 
@@ -368,16 +359,16 @@
          }
 
          log.warn(me.getMessage());
-         
+
          // Then call the listeners
          callListeners(me);
-        
+
          internalClose();
 
          for (Channel channel : channels.values())
          {
             channel.fail();
-         }                  
+         }
       }
    }
 
@@ -436,13 +427,13 @@
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
       final Packet packet = decode(buffer);
-      
+
       synchronized (transferLock)
       {
          if (!frozen)
          {
             final ChannelImpl channel = channels.get(packet.getChannelID());
-   
+
             if (channel != null)
             {
                channel.handlePacket(packet);
@@ -455,17 +446,17 @@
    {
       active = true;
    }
-   
+
    public void freeze()
    {
-      //Prevent any more packets being handled on this connection
-      
+      // Prevent any more packets being handled on this connection
+
       synchronized (transferLock)
       {
          frozen = true;
       }
    }
-   
+
    // Package protected
    // ----------------------------------------------------------------------------
 
@@ -519,7 +510,7 @@
          channel.close();
       }
    }
-  
+
    private Packet decode(final MessagingBuffer in)
    {
       final byte packetType = in.getByte();
@@ -844,13 +835,13 @@
       private boolean failingOver;
 
       private final Queue<DelayedResult> responseActions = new ConcurrentLinkedQueue<DelayedResult>();
-      
+
       private final int windowSize;
-      
+
       private final int confWindowSize;
-      
+
       private final Semaphore sendSemaphore;
-      
+
       private int receivedBytes;
 
       private ChannelImpl(final RemotingConnectionImpl connection,
@@ -866,22 +857,22 @@
          {
             // Don't want to send confirmations if replicating to backup
             this.windowSize = -1;
-            
+
             this.confWindowSize = -1;
-            
-            //We don't redirect the ping channel
-            
+
+            // We don't redirect the ping channel
+
             if (id != 0)
             {
                replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
-   
+
                replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
             }
          }
          else
          {
             this.windowSize = windowSize;
-            
+
             this.confWindowSize = (int)(0.75 * windowSize);
 
             replicatingChannel = null;
@@ -890,7 +881,7 @@
          if (this.windowSize != -1)
          {
             resendCache = new ConcurrentLinkedQueue<Packet>();
-            
+
             if (block)
             {
                sendSemaphore = new Semaphore(windowSize, true);
@@ -903,7 +894,7 @@
          else
          {
             resendCache = null;
-            
+
             sendSemaphore = null;
          }
       }
@@ -917,7 +908,7 @@
       {
          return lastReceivedCommandID;
       }
-      
+
       public Lock getLock()
       {
          return lock;
@@ -945,9 +936,26 @@
          synchronized (sendLock)
          {
             packet.setChannelID(id);
-
+   
+            final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+   
+            int size = packet.encode(buffer);
+   
+            // Must block on semaphore outside the main lock or this can prevent failover from occurring
+            if (sendSemaphore != null)
+            {
+               try
+               {
+                  sendSemaphore.acquire(size);
+               }
+               catch (InterruptedException e)
+               {
+                  throw new IllegalStateException("Semaphore interrupted");
+               }
+            }
+   
             lock.lock();
-
+   
             try
             {
                while (failingOver)
@@ -961,8 +969,16 @@
                   {
                   }
                }
-
-               addToCacheAndWrite(packet, true);
+   
+               if (resendCache != null && packet.isRequiresConfirmations())
+               {
+                  resendCache.add(packet);
+               }
+   
+               if (connection.active || packet.isWriteAlways())
+               {
+                  connection.transportConnection.write(buffer);
+               }
             }
             finally
             {
@@ -978,7 +994,7 @@
          {
             throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
          }
-         
+
          if (connection.blockingCallTimeout == -1)
          {
             throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
@@ -986,6 +1002,23 @@
 
          packet.setChannelID(id);
 
+         final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+         int size = packet.encode(buffer);
+
+         // Must block on semaphore outside the main lock or this can prevent failover from occurring
+         if (sendSemaphore != null)
+         {
+            try
+            {
+               sendSemaphore.acquire(size);
+            }
+            catch (InterruptedException e)
+            {
+               throw new IllegalStateException("Semaphore interrupted");
+            }
+         }
+
          lock.lock();
 
          try
@@ -1003,9 +1036,14 @@
             }
 
             response = null;
-            
-            addToCacheAndWrite(packet, false);
-  
+
+            if (resendCache != null && packet.isRequiresConfirmations())
+            {
+               resendCache.add(packet);
+            }
+
+            connection.transportConnection.write(buffer);
+
             long toWait = connection.blockingCallTimeout;
 
             long start = System.currentTimeMillis();
@@ -1017,7 +1055,7 @@
                   sendCondition.await(toWait, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e)
-               {                  
+               {
                }
 
                final long now = System.currentTimeMillis();
@@ -1125,20 +1163,20 @@
       }
 
       public void fail()
-      {         
+      {
       }
 
       public Channel getReplicatingChannel()
       {
          return replicatingChannel;
       }
-            
+
       public void transferConnection(final RemotingConnection newConnection)
       {
          // Needs to synchronize on the connection to make sure no packets from
          // the old connection get processed after transfer has occurred
          synchronized (connection.transferLock)
-         {          
+         {
             connection.channels.remove(id);
 
             // And switch it
@@ -1185,7 +1223,7 @@
 
          lock.unlock();
       }
-      
+
       public RemotingConnection getConnection()
       {
          return connection;
@@ -1265,30 +1303,30 @@
             }
          }
       }
-      
+
       private void doWrite(final Packet packet)
-      {      
+      {
          final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
          packet.encode(buffer);
-               
+
          connection.transportConnection.write(buffer);
       }
-      
+
       private void checkConfirmation(final Packet packet)
       {
          if (resendCache != null && packet.isRequiresConfirmations())
          {
             lastReceivedCommandID++;
-            
+
             receivedBytes += packet.getPacketSize();
-                        
+
             if (receivedBytes >= confWindowSize)
             {
                final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
 
                confirmed.setChannelID(id);
-               
+
                receivedBytes = 0;
 
                doWrite(confirmed);
@@ -1296,35 +1334,6 @@
          }
       }
 
-      private void addToCacheAndWrite(final Packet packet, final boolean checkActive)
-      {
-         final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
-         int size = packet.encode(buffer);
-                           
-         if (resendCache != null && packet.isRequiresConfirmations())
-         {
-            resendCache.add(packet);
-            
-            if (sendSemaphore != null)
-            {
-               try
-               {                                  
-                  sendSemaphore.acquire(size);                                   
-               }
-               catch (InterruptedException e)
-               {
-                  throw new IllegalStateException("Semaphore interrupted");
-               }
-            }
-         }
-         
-         if (!checkActive || connection.active || packet.isWriteAlways())
-         {  
-            connection.transportConnection.write(buffer);
-         }        
-      }
-
       private void clearUpTo(final int lastReceivedCommandID)
       {
          final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
@@ -1333,7 +1342,7 @@
          {
             throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
          }
-         
+
          int sizeToFree = 0;
 
          for (int i = 0; i < numberToClear; i++)
@@ -1357,12 +1366,12 @@
                                                " created active " +
                                                connection.createdActive);
             }
-            
+
             sizeToFree += packet.getPacketSize();
          }
 
          firstStoredCommandID += numberToClear;
-         
+
          if (sendSemaphore != null)
          {
             sendSemaphore.release(sizeToFree);
@@ -1370,21 +1379,21 @@
       }
 
       private class ReplicatedPacketsConfirmedChannelHandler implements ChannelHandler
-      {  
+      {
          public void handlePacket(final Packet packet)
          {
             switch (packet.getType())
-            {                          
+            {
                case PACKETS_CONFIRMED:
                {
                   doWrite(packet);
-                  
+
                   break;
                }
                case REPLICATION_RESPONSE:
                {
                   replicateResponseReceived();
-                  
+
                   break;
                }
                default:
@@ -1404,8 +1413,9 @@
          {
             // Error - didn't get pong back
             final MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED,
-                                                                 "Did not receive pong from server, active " + 
-                                                                 createdActive + " client " + client);
+                                                                 "Did not receive pong from server, active " + createdActive +
+                                                                          " client " +
+                                                                          client);
 
             fail(me);
          }
@@ -1430,7 +1440,7 @@
          if (type == PONG)
          {
             gotPong = true;
-            
+
             if (stopPinging)
             {
                future.cancel(true);
@@ -1442,7 +1452,7 @@
 
             // Parameter is placeholder for future
             final Packet pong = new Pong(-1);
- 
+
             pingChannel.send(pong);
          }
          else

Modified: trunk/src/main/org/jboss/messaging/core/server/SendLock.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/SendLock.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/server/SendLock.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -41,4 +41,6 @@
    void beforeSend();
    
    void afterSend();
+   
+   void close();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -40,14 +40,26 @@
    private boolean locked;
    
    private int count;
+   
+   private boolean closed;
       
    public synchronized void lock()
    {
+      if (closed)
+      {
+         return;
+      }
+      
       while (count > 0 || locked)
       {
          try
          {
             wait();
+            
+            if (closed)
+            {
+               return;
+            }
          }
          catch (InterruptedException e)
          {            
@@ -59,6 +71,11 @@
    
    public synchronized void unlock()
    {
+      if (closed)
+      {
+         return;
+      }
+      
       locked = false;
       
       notifyAll();
@@ -66,11 +83,21 @@
      
    public synchronized void beforeSend()   
    {
+      if (closed)
+      {
+         return;
+      }
+      
       while (locked)
       {
          try
          {
             wait();
+            
+            if (closed)
+            {
+               return;
+            }
          }
          catch (InterruptedException e)
          {            
@@ -82,6 +109,11 @@
    
    public synchronized void afterSend()
    {
+      if (closed)
+      {
+         return;
+      }
+      
       count--;
       
       if (count < 0)
@@ -94,5 +126,12 @@
          notifyAll();
       }
    }
+   
+   public synchronized void close()
+   {
+      closed = true;
+      
+      notifyAll();
+   }
 
 }

Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/bin/runtest	2008-11-03 17:33:11 UTC (rev 5248)
@@ -133,7 +133,7 @@
 fi
 
 JAVA_OPTS="-Xmx1024M $JAVA_OPTS \
--Dmodule.output=$reldir/../../ \
+-Dcom.sun.management.jmxremote -Dmodule.output=$reldir/../../ \
 $REMOTE_TEST \
 -Dtest.database=$TEST_DATABASE \
 -Dtest.serialization=$TEST_SERIALIZATION \

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -1194,7 +1194,7 @@
 
    protected int getNumIterations()
    {
-      return 20;
+      return 50;
    }
 
    protected void setUp() throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-11-03 17:33:11 UTC (rev 5248)
@@ -67,136 +67,8 @@
 
    // Public --------------------------------------------------------
 
-   public void testGetStore() throws Exception
-   {
-      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
-      queueSettings.setDefault(new QueueSettings());
+   
 
-      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1);
-
-      SimpleString destination = new SimpleString("some-destination");
-
-      try
-      {
-         manager.getPageStore(destination);
-         fail("supposed to throw an exception");
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      manager.start();
-
-      PagingStore store = EasyMock.createNiceMock(PagingStore.class);
-
-      EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andReturn(store);
-
-      store.start();
-
-      EasyMock.replay(spi, store);
-
-      assertEquals(store, manager.getPageStore(destination));
-
-      EasyMock.verify(spi, store);
-
-      EasyMock.reset(spi, store);
-
-      EasyMock.replay(spi, store);
-
-      // it should use the cached store, so nothing else should be called on any
-      // SPI
-      assertEquals(store, manager.getPageStore(destination));
-
-      EasyMock.verify(spi, store);
-
-      EasyMock.reset(spi, store);
-
-      store.stop();
-
-      EasyMock.replay(spi, store);
-
-      manager.stop();
-
-      EasyMock.verify(spi, store);
-
-   }
-
-   public void testMultipleThreadsGetStore() throws Exception
-   {
-      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1);
-
-      final SimpleString destination = new SimpleString("some-destination");
-
-      final SequentialFileFactory factory = EasyMock.createNiceMock(SequentialFileFactory.class);
-
-      EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
-
-      PagingStoreImpl storeImpl = new PagingStoreImpl(manager,
-                                                      factory,
-                                                      destination,
-                                                      new QueueSettings(),
-                                                      Executors.newSingleThreadExecutor());
-
-      EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class)))
-              .andStubReturn(storeImpl);
-
-      EasyMock.replay(spi, factory);
-
-      manager.start();
-
-      int NUMBER_OF_THREADS = 100;
-
-      final CountDownLatch latchPositioned = new CountDownLatch(NUMBER_OF_THREADS);
-      final CountDownLatch latchReady = new CountDownLatch(1);
-
-      class GetPageThread extends Thread
-      {
-         Exception e;
-
-         @Override
-         public void run()
-         {
-            try
-            {
-               latchPositioned.countDown();
-               latchReady.await();
-               manager.getPageStore(destination);
-
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-               this.e = e;
-            }
-
-         }
-      }
-
-      GetPageThread threads[] = new GetPageThread[NUMBER_OF_THREADS];
-      for (int i = 0; i < NUMBER_OF_THREADS; i++)
-      {
-         threads[i] = new GetPageThread();
-         threads[i].start();
-      }
-
-      latchPositioned.await();
-      latchReady.countDown();
-
-      for (GetPageThread thread : threads)
-      {
-         thread.join();
-         if (thread.e != null)
-         {
-            throw thread.e;
-         }
-      }
-
-      EasyMock.verify(spi, factory);
-
-   }
-
    public void testOnDepage() throws Exception
    {
       long time = System.currentTimeMillis() + 10000;




More information about the jboss-cvs-commits mailing list