[hornetq-commits] JBoss hornetq SVN: r10372 - in trunk: src/main/org/hornetq/core/paging/cursor and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 25 21:03:02 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-25 21:03:01 -0400 (Fri, 25 Mar 2011)
New Revision: 10372

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/server/MessageReference.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
   trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
   trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
merge -r10363:10371 from branch_2_2

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -1062,8 +1062,6 @@
 
                channel.returnBlocking();
             }
-
-            channel.setTransferring(false);
          }
          catch (Throwable t)
          {
@@ -1071,6 +1069,7 @@
          }
          finally
          {
+            channel.setTransferring(false);
             channel.unlock();
          }
 

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -37,9 +37,11 @@
    private final PagePosition position;
 
    private WeakReference<PagedMessage> message;
-   
+
    private Long deliveryTime = null;
-   
+
+   private int persistedCount;
+
    private final PageSubscription subscription;
 
    public ServerMessage getMessage()
@@ -50,10 +52,10 @@
    public synchronized PagedMessage getPagedMessage()
    {
       PagedMessage returnMessage = message != null ? message.get() : null;
-      
+
       // We only keep a few references on the Queue from paging...
       // Besides those references are SoftReferenced on page cache...
-      // So, this will unlikely be null, 
+      // So, this will unlikely be null,
       // unless the Queue has stalled for some time after paging
       if (returnMessage == null)
       {
@@ -69,7 +71,9 @@
       return position;
    }
 
-   public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
+   public PagedReferenceImpl(final PagePosition position,
+                             final PagedMessage message,
+                             final PageSubscription subscription)
    {
       this.position = position;
       this.message = new WeakReference<PagedMessage>(message);
@@ -80,6 +84,16 @@
    {
       return true;
    }
+   
+   public void setPersistedCount(int count)
+   {
+      this.persistedCount = count;
+   }
+   
+   public int getPersistedCount()
+   {
+      return persistedCount;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -744,16 +744,22 @@
 
    public void updateDeliveryCount(final MessageReference ref) throws Exception
    {
-      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
-                                                                               ref.getDeliveryCount());
+      // no need to store if it's the same value
+      // otherwise the journal will get OME in case of lots of redeliveries
+      if (ref.getDeliveryCount() != ref.getPersistedCount())
+      {
+         ref.setPersistedCount(ref.getDeliveryCount());
+         DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
+                                                                                  ref.getDeliveryCount());
+   
+         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+                                           JournalStorageManager.UPDATE_DELIVERY_COUNT,
+                                           updateInfo,
+   
+                                           syncNonTransactional,
+                                           getContext(syncNonTransactional));
+      }
 
-      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
-                                        JournalStorageManager.UPDATE_DELIVERY_COUNT,
-                                        updateInfo,
-
-                                        syncNonTransactional,
-                                        getContext(syncNonTransactional));
-
    }
 
    public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -114,8 +114,6 @@
 
    private final ServerSession session;
 
-   private final OperationContext sessionContext;
-
    // Storagemanager here is used to set the Context
    private final StorageManager storageManager;
 
@@ -126,7 +124,6 @@
    private final boolean direct;
 
    public ServerSessionPacketHandler(final ServerSession session,
-                                     final OperationContext sessionContext,
                                      final StorageManager storageManager,
                                      final Channel channel)
    {
@@ -134,8 +131,6 @@
 
       this.storageManager = storageManager;
 
-      this.sessionContext = sessionContext;
-
       this.channel = channel;
 
       this.remotingConnection = channel.getConnection();
@@ -197,7 +192,7 @@
    {
       byte type = packet.getType();
 
-      storageManager.setContext(sessionContext);
+      storageManager.setContext(session.getSessionContext());
 
       Packet response = null;
       boolean flush = false;

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -129,9 +129,9 @@
          Version version = server.getVersion();
          int[] compatibleList = version.getCompatibleVersionList();
          boolean isCompatibleClient = false;
-         for(int i=0; i<compatibleList.length; i++)
+         for (int i = 0; i < compatibleList.length; i++)
          {
-            if(compatibleList[i] == request.getVersion())
+            if (compatibleList[i] == request.getVersion())
             {
                isCompatibleClient = true;
                break;
@@ -165,22 +165,23 @@
 
          Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
 
-         ServerSession session = server.createSession(request.getName(),                                                      
+         ServerSession session = server.createSession(request.getName(),
                                                       request.getUsername(),
                                                       request.getPassword(),
-                                                      request.getMinLargeMessageSize(),                                                    
+                                                      request.getMinLargeMessageSize(),
                                                       connection,
                                                       request.isAutoCommitSends(),
                                                       request.isAutoCommitAcks(),
                                                       request.isPreAcknowledge(),
                                                       request.isXA(),
                                                       request.getDefaultAddress(),
-                                                      new CoreSessionCallback(request.getName(), protocolManager, channel));
+                                                      new CoreSessionCallback(request.getName(),
+                                                                              protocolManager,
+                                                                              channel));
 
+         session.setSessionContext(server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-                                                                             server.getStorageManager()
-                                                                                   .newContext(server.getExecutorFactory()
-                                                                                                     .getExecutor()),
                                                                              server.getStorageManager(),
                                                                              channel);
          channel.setHandler(handler);
@@ -201,11 +202,11 @@
          }
       }
       catch (Exception e)
-      {  
+      {
          log.error("Failed to create session ", e);
-         
+
          HornetQPacketHandler.log.error("Failed to create session", e);
-         
+
          response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
       }
 
@@ -225,22 +226,22 @@
    private void handleReattachSession(final ReattachSessionMessage request)
    {
       Packet response = null;
-      
+
       try
       {
-   
+
          if (!server.isStarted())
          {
             response = new ReattachSessionResponseMessage(-1, false);
          }
-   
+
          ServerSessionPacketHandler sessionHandler = protocolManager.getSessionHandler(request.getName());
-         
+
          if (!server.checkActivate())
          {
             response = new ReattachSessionResponseMessage(-1, false);
          }
-   
+
          if (sessionHandler == null)
          {
             response = new ReattachSessionResponseMessage(-1, false);
@@ -252,9 +253,9 @@
                // Even though session exists, we can't reattach since confi window size == -1,
                // i.e. we don't have a resend cache for commands, so we just close the old session
                // and let the client recreate
-   
+
                sessionHandler.close();
-   
+
                response = new ReattachSessionResponseMessage(-1, false);
             }
             else
@@ -262,7 +263,7 @@
                // Reconnect the channel to the new connection
                int serverLastConfirmedCommandID = sessionHandler.transferConnection(connection,
                                                                                     request.getLastConfirmedCommandID());
-   
+
                response = new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
             }
          }
@@ -270,7 +271,7 @@
       catch (Exception e)
       {
          HornetQPacketHandler.log.error("Failed to reattach session", e);
-         
+
          response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
       }
 

Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -47,6 +47,10 @@
    int getDeliveryCount();
 
    void setDeliveryCount(int deliveryCount);
+   
+   void setPersistedCount(int deliveryCount);
+   
+   int getPersistedCount();
 
    void incrementDeliveryCount();
 

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -23,6 +23,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.utils.json.JSONArray;
 
 /**
@@ -114,6 +115,8 @@
    void requestProducerCredits(SimpleString address, int credits) throws Exception;
 
    void close(boolean failed) throws Exception;
+   
+   void waitContextCompletion() throws Exception;
 
    void setTransferring(boolean transferring);
 
@@ -136,4 +139,10 @@
    String getLastSentMessageID(String address);
 
    long getCreationTime();
+   
+
+   OperationContext getSessionContext();
+   
+   void setSessionContext(OperationContext context);
+
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -63,11 +63,13 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -649,8 +651,8 @@
       // But at least we will do our best to avoid it on regular shutdowns
       for (ServerSession session : sessions.values())
       {
-    	 log.info("closing a session" );
          session.close(true);
+         session.waitContextCompletion();
       }
 
       remotingService.stop();

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -230,6 +230,16 @@
          ref.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
       
+      public void setPersistedCount(int count)
+      {
+         ref.setPersistedCount(count);
+      }
+      
+      public int getPersistedCount()
+      {
+         return ref.getPersistedCount();
+      }
+      
       public boolean isPaged()
       {
          return false;

Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -36,6 +36,8 @@
 
    private volatile int deliveryCount;
 
+   private volatile int persistedCount;
+
    private volatile long scheduledDeliveryTime;
 
    private final ServerMessage message;
@@ -91,6 +93,23 @@
    }
 
    // MessageReference implementation -------------------------------
+
+   /**
+    * @return the persistedCount
+    */
+   public int getPersistedCount()
+   {
+      return persistedCount;
+   }
+
+   /**
+    * @param persistedCount the persistedCount to set
+    */
+   public void setPersistedCount(int persistedCount)
+   {
+      this.persistedCount = persistedCount;
+   }
+
    public MessageReference copy(final Queue queue)
    {
       return new MessageReferenceImpl(this, queue);
@@ -109,6 +128,7 @@
    public void setDeliveryCount(final int deliveryCount)
    {
       this.deliveryCount = deliveryCount;
+      this.persistedCount = deliveryCount;
    }
 
    public void incrementDeliveryCount()
@@ -145,7 +165,7 @@
    {
       queue.referenceHandled();
    }
-   
+
    public boolean isPaged()
    {
       return false;
@@ -167,7 +187,6 @@
       queue.acknowledge(tx, this);
    }
 
-
    // Public --------------------------------------------------------
 
    @Override
@@ -175,7 +194,9 @@
    {
       return "Reference[" + getMessage().getMessageID() +
              "]:" +
-             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
+             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") +
+             ":" +
+             getMessage();
    }
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -42,6 +42,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.BindingType;
@@ -146,6 +147,8 @@
    private volatile int timeoutSeconds;
    
    private Map<String, String> metaData;
+   
+   private OperationContext sessionContext;
 
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
    private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString,  Pair<UUID, AtomicLong>>();
@@ -223,7 +226,23 @@
    }
 
    // ServerSession implementation ----------------------------------------------------------------------------
+   /**
+    * @return the sessionContext
+    */
+   public OperationContext getSessionContext()
+   {
+      return sessionContext;
+   }
 
+   /**
+    * @param sessionContext the sessionContext to set
+    */
+   public void setSessionContext(OperationContext sessionContext)
+   {
+      this.sessionContext = sessionContext;
+   }
+
+
    public String getUsername()
    {
       return username;
@@ -931,27 +950,62 @@
    {
       setStarted(false);
    }
-
-   public void close(final boolean failed)
+   
+   public void waitContextCompletion()
    {
-      storageManager.afterCompleteOperations(new IOAsyncTask()
+      OperationContext formerCtx = storageManager.getContext();
+      
+      try
       {
-         public void onError(int errorCode, String errorMessage)
+         try
          {
+            if (!storageManager.waitOnOperations(10000))
+            {
+               log.warn("Couldn't finish context execution in 10 seconds", new Exception ("warning"));
+            }
          }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
+      }
+      finally
+      {
+         storageManager.setContext(formerCtx);
+      }
+   }
 
-         public void done()
+   public void close(final boolean failed)
+   {
+      OperationContext formerCtx = storageManager.getContext();
+      
+      try
+      {
+         storageManager.setContext(sessionContext);
+
+         storageManager.afterCompleteOperations(new IOAsyncTask()
          {
-            try
+            public void onError(int errorCode, String errorMessage)
             {
-               doClose(failed);
             }
-            catch (Exception e)
+   
+            public void done()
             {
-               log.error("Failed to close session", e);
+               try
+               {
+                  doClose(failed);
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to close session", e);
+               }
             }
-         }
-      });
+         });
+      }
+      finally
+      {
+         storageManager.setContext(formerCtx);
+      }
    }
 
    public void closeConsumer(final long consumerID) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -13,13 +13,27 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -223,6 +237,111 @@
       session.close();
    }
 
+
+   public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
+   {
+      internaltestInfiniteDedeliveryMessageOnPersistent(false);
+   }
+   
+   private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict) throws Exception
+   {
+      setUp(strict);
+      ClientSession session = factory.createSession(false, false, false);
+
+      RedeliveryConsumerTest.log.info("created");
+
+      ClientProducer prod = session.createProducer(ADDRESS);
+      prod.send(createTextMessage(session, "Hello"));
+      session.commit();
+      session.close();
+
+      
+      int expectedCount = 1;
+      for (int i = 0 ; i < 700; i++)
+      {
+         session = factory.createSession(false, false, false);
+         session.start();
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         assertEquals(expectedCount, msg.getDeliveryCount());
+
+         if (i % 100 == 0)
+         {
+            expectedCount++;
+            msg.acknowledge();
+            session.rollback();
+         }
+         session.close();
+      }
+
+      factory.close();
+      server.stop();
+      
+      setUp(false);
+      
+      for (int i = 0 ; i < 700; i++)
+      {
+         session = factory.createSession(false, false, false);
+         session.start();
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         assertEquals(expectedCount, msg.getDeliveryCount());
+         session.close();
+      }
+
+      server.stop();
+
+      
+      JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 
+                                            2, 
+                                            0,
+                                            0, 
+                                            new NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+                                            "hornetq-data",
+                                            "hq",
+                                            1);
+      
+      
+      final AtomicInteger updates = new AtomicInteger();
+      
+      journal.start();
+      journal.load(new LoaderCallback()
+      {
+         
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+         
+         public void updateRecord(RecordInfo info)
+         {
+            if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
+            {
+               updates.incrementAndGet();
+            }
+         }
+         
+         public void deleteRecord(long id)
+         {
+         }
+         
+         public void addRecord(RecordInfo info)
+         {
+         }
+         
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         {
+         }
+      });
+      
+      journal.stop();
+      
+      
+      assertEquals(7, updates.get());
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -244,7 +363,14 @@
       factory = locator.createSessionFactory();
 
       ClientSession session = factory.createSession(false, false, false);
-      session.createQueue(ADDRESS, ADDRESS, true);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, true);
+      }
+      catch (HornetQException expected)
+      {
+         // in case of restart
+      }
 
       session.close();
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -682,8 +682,7 @@
          server0.getConfiguration().setQueueConfigurations(queueConfigs0);
 
          server0.start();
-         
-         
+
          locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
          ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
 
@@ -710,30 +709,30 @@
             producer0.send(message);
          }
 
+         server1.start();
 
-         server1.start();
-         
          // Inserting the duplicateIDs so the bridge will fail in a few
          {
             long ids[] = new long[100];
-            
+
             Queue queue = server0.locateQueue(new SimpleString(queueName0));
             LinkedListIterator<MessageReference> iterator = queue.iterator();
-            
-            for (int i = 0 ; i < 100; i++)
+
+            for (int i = 0; i < 100; i++)
             {
                iterator.hasNext();
                ids[i] = iterator.next().getMessage().getMessageID();
             }
-            
+
             iterator.close();
 
-            DuplicateIDCache duplicateTargetCache = server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
-            
+            DuplicateIDCache duplicateTargetCache = server1.getPostOffice()
+                                                           .getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+
             TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
             for (long id : ids)
             {
-               byte [] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+               byte[] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
                duplicateTargetCache.addToCache(duplicateArray, tx);
             }
             tx.commit();
@@ -946,9 +945,23 @@
             locator.close();
          }
 
-         server0.stop();
+         try
+         {
+             server0.stop();
+         }
+         catch(Exception ignored)
+         {
+            
+         }
 
-         server1.stop();
+         try
+         {
+             server1.stop();
+         }
+         catch(Exception ignored)
+         {
+            
+         }
       }
 
    }
@@ -1255,7 +1268,7 @@
    protected void tearDown() throws Exception
    {
       clearData();
-      super.setUp();
+      super.tearDown();
    }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -25,8 +25,14 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
 import org.hornetq.core.server.HornetQServer;
@@ -256,11 +262,6 @@
 
    protected abstract boolean checkSize(ClientMessage message);
 
-   protected int getNumThreads()
-   {
-      return 10;
-   }
-
    protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws Exception
    {
       return sf.createSession(false, true, true);
@@ -1197,6 +1198,11 @@
    {
       return 2;
    }
+   
+   protected int getNumThreads()
+   {
+      return 10;
+   }
 
    @Override
    protected void setUp() throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -16,10 +16,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.server.HornetQServers;
 
 /**

Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -94,7 +94,7 @@
       checkFreePort(5447);
       if (InVMRegistry.instance.size() > 0)
       {
-         System.exit(0);
+         fail("InVMREgistry size > 0");
       }
    }
 

Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-03-26 00:38:27 UTC (rev 10371)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-03-26 01:03:01 UTC (rev 10372)
@@ -875,26 +875,8 @@
    @Override
    protected void tearDown() throws Exception
    {
-      OperationContextImpl.clearContext();
+      cleanupPools();
 
-      deleteDirectory(new File(getTestDir()));
-
-      int invmSize = InVMRegistry.instance.size();
-      if (invmSize > 0)
-      {
-         InVMRegistry.instance.clear();
-         fail("invm registry still had acceptors registered");
-      }
-
-      if (AsynchronousFileImpl.getTotalMaxIO() != 0)
-      {
-         AsynchronousFileImpl.resetMaxAIO();
-         Assert.fail("test did not close all its files " + AsynchronousFileImpl.getTotalMaxIO());
-      }
-      
-      // We shutdown the global pools to give a better isolation between tests
-      ServerLocatorImpl.clearThreadPools();
-
       Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
       for (Thread thread : threadMap.keySet())
       {
@@ -967,6 +949,32 @@
       super.tearDown();
    }
 
+   /**
+    * 
+    */
+   protected void cleanupPools()
+   {
+      OperationContextImpl.clearContext();
+
+      deleteDirectory(new File(getTestDir()));
+
+      int invmSize = InVMRegistry.instance.size();
+      if (invmSize > 0)
+      {
+         InVMRegistry.instance.clear();
+         fail("invm registry still had acceptors registered");
+      }
+
+      if (AsynchronousFileImpl.getTotalMaxIO() != 0)
+      {
+         AsynchronousFileImpl.resetMaxAIO();
+         Assert.fail("test did not close all its files " + AsynchronousFileImpl.getTotalMaxIO());
+      }
+      
+      // We shutdown the global pools to give a better isolation between tests
+      ServerLocatorImpl.clearThreadPools();
+   }
+
    protected byte[] autoEncode(final Object... args)
    {
 



More information about the hornetq-commits mailing list