[hornetq-commits] JBoss hornetq SVN: r8209 - in trunk: src/main/org/hornetq/core/journal and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 4 11:20:15 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-04 11:20:14 -0500 (Wed, 04 Nov 2009)
New Revision: 8209

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-198 - AIO Executors shutdown and few other minor tweaks

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -71,7 +71,7 @@
    private final SimpleString groupID;
 
    private final int minLargeMessageSize;
-   
+
    private final ClientProducerCredits credits;
 
    // Static ---------------------------------------------------------------------------------------
@@ -109,7 +109,7 @@
       }
 
       this.minLargeMessageSize = minLargeMessageSize;
-      
+
       if (address != null)
       {
          credits = session.getCredits(address);
@@ -128,7 +128,7 @@
    }
 
    public void send(final Message msg) throws HornetQException
-   {     
+   {
       checkClosed();
 
       doSend(null, msg);
@@ -204,18 +204,18 @@
    private void doSend(final SimpleString address, final Message msg) throws HornetQException
    {
       ClientProducerCredits theCredits;
-      
+
       if (address != null)
       {
          msg.setDestination(address);
-         
-         //Anonymous
+
+         // Anonymous
          theCredits = session.getCredits(address);
       }
       else
       {
          msg.setDestination(this.address);
-         
+
          theCredits = credits;
       }
 
@@ -234,24 +234,24 @@
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
       SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
-      
+
       session.workDone();
-      
-      boolean large;
-      
+
+      boolean isLarge;
+
       if (msg.getBodyInputStream() != null || msg.getEncodeSize() >= minLargeMessageSize || msg.isLargeMessage())
       {
-         large = true;
+         isLarge = true;
       }
       else
       {
-         large = false;
+         isLarge = false;
       }
-            
-      if (large)
+
+      if (isLarge)
       {
-         sendMessageInChunks(sendBlocking, msg);
-      }      
+         largeMessageSend(sendBlocking, msg);
+      }
       else if (sendBlocking)
       {
          channel.sendBlocking(message);
@@ -260,36 +260,47 @@
       {
          channel.send(message);
       }
-      
+
       try
       {
-         //This will block if credits are not available
-         
-         //Note, that for a large message, the encode size only includes the properties + headers
-         //Not the continuations, but this is ok since we are only interested in limiting the amount of
-         //data in *memory* and continuations go straight to the disk
-         
-         if (large)
+         // This will block if credits are not available
+
+         // Note, that for a large message, the encode size only includes the properties + headers
+         // Not the continuations, but this is ok since we are only interested in limiting the amount of
+         // data in *memory* and continuations go straight to the disk
+
+         if (isLarge)
          {
-            //TODO this is pretty hacky - we should define consistent meanings of encode size             
-                        
+            // TODO this is pretty hacky - we should define consistent meanings of encode size
+
             theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
          }
          else
-         {         
+         {
             theCredits.acquireCredits(msg.getEncodeSize());
          }
       }
       catch (InterruptedException e)
-      {         
+      {
       }
    }
 
+   private void checkClosed() throws HornetQException
+   {
+      if (closed)
+      {
+         throw new HornetQException(HornetQException.OBJECT_CLOSED, "Producer is closed");
+      }
+   }
+   
+   
+   // Methods to send Large Messages----------------------------------------------------------------
+   
    /**
     * @param msg
     * @throws HornetQException
     */
-   private void sendMessageInChunks(final boolean sendBlocking, final Message msg) throws HornetQException
+   private void largeMessageSend(final boolean sendBlocking, final Message msg) throws HornetQException
    {
       int headerSize = msg.getHeadersAndPropertiesEncodeSize();
 
@@ -313,129 +324,144 @@
       channel.send(initialChunk);
 
       InputStream input = msg.getBodyInputStream();
-      
+
       if (input != null)
       {
-         boolean lastChunk = false;
+         largeMessageSendStreamed(sendBlocking, input);
+      }
+      else
+      {
+         largeMessageSendBuffered(sendBlocking, msg);
+      }
+   }
 
-         while (!lastChunk)
-         {
-            byte[] buff = new byte[minLargeMessageSize];
-            
-            int pos = 0;
-                                   
-            do
-            {               
-               int numberOfBytesRead;
-               
-               int wanted = minLargeMessageSize - pos;
-               
-               try
-               {
-                  numberOfBytesRead = input.read(buff, pos, wanted);
-               }
-               catch (IOException e)
-               {
-                  throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
-                                             "Error reading the LargeMessageBody",
-                                             e);
-               }
-               
-               if (numberOfBytesRead == -1)
-               {                  
-                  lastChunk = true;
-                  
-                  break;
-               }
-                                             
-               pos += numberOfBytesRead;
-            }
-            while (pos < minLargeMessageSize);
-                        
-            if (lastChunk)
-            {
-               byte[] buff2 = new byte[pos];
-               
-               System.arraycopy(buff, 0, buff2, 0, pos);
-               
-               buff = buff2;
-            }
-            
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(buff,                                                                                           
-                                                                                            !lastChunk,
-                                                                                            lastChunk && sendBlocking);
+   /**
+    * @param sendBlocking
+    * @param msg
+    * @throws HornetQException
+    */
+   private void largeMessageSendBuffered(final boolean sendBlocking, final Message msg) throws HornetQException
+   {
+      final long bodySize = msg.getLargeBodySize();
 
-            if (sendBlocking && lastChunk)
-            {
-               // When sending it blocking, only the last chunk will be blocking.
-               channel.sendBlocking(chunk);
-            }
-            else
-            {
-               channel.send(chunk);
-            }
-         }
+      LargeMessageEncodingContext context = new DecodingContext(msg);
 
-         try
+      for (int pos = 0; pos < bodySize;)
+      {
+         final boolean lastChunk;
+
+         final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+
+         final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+
+         msg.encodeBody(bodyBuffer, context, chunkLength);
+
+         pos += chunkLength;
+
+         lastChunk = pos >= bodySize;
+
+         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
+                                                                                         !lastChunk,
+                                                                                         lastChunk && sendBlocking);
+
+         if (sendBlocking && lastChunk)
          {
-            input.close();
+            // When sending it blocking, only the last chunk will be blocking.
+            channel.sendBlocking(chunk);
          }
-         catch (IOException e)
+         else
          {
-            throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
-                                       "Error closing stream from LargeMessageBody",
-                                       e);
+            channel.send(chunk);
          }
       }
-      else
+   }
+
+   /**
+    * @param sendBlocking
+    * @param input
+    * @throws HornetQException
+    */
+   private void largeMessageSendStreamed(final boolean sendBlocking, InputStream input) throws HornetQException
+   {
+      boolean lastPacket = false;
+
+      while (!lastPacket)
       {
-         final long bodySize = msg.getLargeBodySize();
+         byte[] buff = new byte[minLargeMessageSize];
 
-         LargeMessageEncodingContext context = new DecodingContext(msg);
+         int pos = 0;
 
-         for (int pos = 0; pos < bodySize;)
+         do
          {
-            final boolean lastChunk;
+            int numberOfBytesRead;
 
-            final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+            int wanted = minLargeMessageSize - pos;
 
-            final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+            try
+            {
+               numberOfBytesRead = input.read(buff, pos, wanted);
+            }
+            catch (IOException e)
+            {
+               throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
+                                          "Error reading the LargeMessageBody",
+                                          e);
+            }
 
-            msg.encodeBody(bodyBuffer, context, chunkLength);
+            if (numberOfBytesRead == -1)
+            {
+               lastPacket = true;
 
-            pos += chunkLength;
+               break;
+            }
 
-            lastChunk = pos >= bodySize;
+            pos += numberOfBytesRead;
+         }
+         while (pos < minLargeMessageSize);
 
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),                                                                                            
-                                                                                            !lastChunk,
-                                                                                            lastChunk && sendBlocking);
+         if (lastPacket)
+         {
+            byte[] buff2 = new byte[pos];
 
-            if (sendBlocking && lastChunk)
-            {
-               // When sending it blocking, only the last chunk will be blocking.
-               channel.sendBlocking(chunk);
-            }
-            else
-            {
-               channel.send(chunk);
-            }
+            System.arraycopy(buff, 0, buff2, 0, pos);
+
+            buff = buff2;
          }
+
+         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(buff,
+                                                                                         !lastPacket,
+                                                                                         lastPacket && sendBlocking);
+
+         if (sendBlocking && lastPacket)
+         {
+            // When sending it blocking, only the last chunk will be blocking.
+            channel.sendBlocking(chunk);
+         }
+         else
+         {
+            channel.send(chunk);
+         }
       }
-   }
 
-   private void checkClosed() throws HornetQException
-   {
-      if (closed)
+      try
       {
-         throw new HornetQException(HornetQException.OBJECT_CLOSED, "Producer is closed");
+         input.close();
       }
+      catch (IOException e)
+      {
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
+                                    "Error closing stream from LargeMessageBody",
+                                    e);
+      }
    }
 
+
+
    // Inner Classes --------------------------------------------------------------------------------
    class DecodingContext implements LargeMessageEncodingContext
    {
       private final Message message;
+
       private int lastPos = 0;
 
       public DecodingContext(Message message)

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -61,8 +61,7 @@
     */
    void createDirs() throws Exception;
    
-   // used on tests only
-   void testFlush();
+   void flush();
 
 
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -15,8 +15,9 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -35,6 +36,10 @@
  */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
 {
+   
+   // Timeout used to wait executors to shutdown
+   private static final int EXECUTOR_TIMEOUT = 60;
+   
    private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
 
    private static final boolean trace = log.isTraceEnabled();
@@ -52,11 +57,9 @@
    /** A single AIO write executor for every AIO File.
     *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
     *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
-   private final Executor writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
-                                                                                                 true));
+   private ExecutorService writeExecutor;
 
-   private final Executor pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
-                                                                                              true));
+   private ExecutorService pollerExecutor;
 
    private final int bufferSize;
 
@@ -102,7 +105,7 @@
       }
    }
 
-   public void testFlush()
+   public void flush()
    {
       timedBuffer.flush();
    }
@@ -184,12 +187,45 @@
    public void start()
    {
       timedBuffer.start();
+      
+      writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
+                                                                                                         true));
+
+      pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+                                                                                                      true));
+
+
    }
 
    public void stop()
    {
       buffersControl.stop();
       timedBuffer.stop();
+      
+      this.writeExecutor.shutdown();
+      try
+      {
+         if (!this.writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+         {
+            log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
+         }
+      }
+      catch (InterruptedException e)
+      {
+      }
+      
+      this.pollerExecutor.shutdown();
+
+      try
+      {
+         if (!this.pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+         {
+            log.warn("Timed out on AIO poller shutdown", new Exception("Timed out on AIO writer shutdown"));
+         }
+      }
+      catch (InterruptedException e)
+      {
+      }
    }
 
    protected void finalize()

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -64,7 +64,7 @@
    {
    }
    
-   public void testFlush()
+   public void flush()
    {
    }
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -2407,7 +2407,7 @@
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
    {
-      fileFactory.testFlush();
+      fileFactory.flush();
 
       for (JournalTransaction tx : transactions.values())
       {
@@ -2559,7 +2559,7 @@
             log.warn("Couldn't stop journal executor after 60 seconds");
          }
 
-         fileFactory.stop();
+         fileFactory.flush();
 
          if (currentFile != null && currentFile.getFile().isOpen())
          {
@@ -2570,6 +2570,8 @@
          {
             file.getFile().close();
          }
+         
+         fileFactory.stop();
 
          currentFile = null;
 

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -77,217 +77,6 @@
       return false;
    }
 
-///  Those tests are duplicating ConsumerWindowSizeTest and NettyConsumerWindowSizeTest. Do we need those here?
-// 
-//   public void testFlowControlWithSyncReceiveZeroConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithSyncReceive(0);
-//   }
-//
-//   public void testFlowControlWithSyncReceiveSmallConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithSyncReceive(1000);
-//   }
-//
-//   private void testFlowControlWithSyncReceive(final int consumerWindowSize) throws Exception
-//   {
-//      ClientSession session = null;
-//
-//      try
-//      {
-//         server = createServer(true, isNetty());
-//
-//         server.start();
-//
-//         ClientSessionFactory sf = createFactory(isNetty());
-//
-//         sf.setConsumerWindowSize(consumerWindowSize);
-//         sf.setMinLargeMessageSize(1000);
-//
-//         int messageSize = 10000;
-//
-//         session = sf.createSession(false, true, true);
-//
-//         session.createTemporaryQueue(ADDRESS, ADDRESS);
-//
-//         ClientProducer producer = session.createProducer(ADDRESS);
-//
-//         final int numMessages = 1000;
-//
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            Message clientFile = createLargeClientMessage(session, messageSize, true);
-//
-//            producer.send(clientFile);
-//
-//            log.info("Sent message " + i);
-//         }
-//
-//         ClientConsumer consumer = session.createConsumer(ADDRESS);
-//
-//         session.start();
-//
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            ClientMessage msg = consumer.receive(1000);
-//
-//            int availBytes = msg.getBody().readableBytes();
-//
-//            assertEquals(messageSize, availBytes);
-//
-//            byte[] bytes = new byte[availBytes];
-//
-//            msg.getBody().readBytes(bytes);
-//
-//            msg.acknowledge();
-//
-//            log.info("Received message " + i);
-//         }
-//
-//         session.close();
-//
-//         validateNoFilesOnLargeDir();
-//      }
-//      finally
-//      {
-//         try
-//         {
-//            server.stop();
-//         }
-//         catch (Throwable ignored)
-//         {
-//         }
-//
-//         try
-//         {
-//            session.close();
-//         }
-//         catch (Throwable ignored)
-//         {
-//         }
-//      }
-//   }
-//
-//   public void testFlowControlWithListenerZeroConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithListener(0);
-//   }
-//
-//   public void testFlowControlWithListenerSmallConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithListener(1000);
-//   }
-//
-//   private void testFlowControlWithListener(final int consumerWindowSize) throws Exception
-//   {
-//      ClientSession session = null;
-//
-//      try
-//      {
-//         server = createServer(true, isNetty());
-//
-//         server.start();
-//
-//         ClientSessionFactory sf;
-//
-//         sf = createFactory(isNetty());
-//
-//         sf.setConsumerWindowSize(consumerWindowSize);
-//         sf.setMinLargeMessageSize(1000);
-//
-//         final int messageSize = 10000;
-//
-//         session = sf.createSession(false, true, true);
-//
-//         session.createTemporaryQueue(ADDRESS, ADDRESS);
-//
-//         ClientProducer producer = session.createProducer(ADDRESS);
-//
-//         final int numMessages = 1000;
-//
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            Message clientFile = createLargeClientMessage(session, messageSize, false);
-//
-//            producer.send(clientFile);
-//
-//            log.info("Sent message " + i);
-//         }
-//
-//         ClientConsumer consumer = session.createConsumer(ADDRESS);
-//
-//         class MyHandler implements MessageHandler
-//         {
-//            int count = 0;
-//
-//            final CountDownLatch latch = new CountDownLatch(1);
-//
-//            volatile Exception exception;
-//
-//            public void onMessage(ClientMessage message)
-//            {
-//               try
-//               {
-//                  log.info("got message " + count);
-//
-//                  int availBytes = message.getBody().readableBytes();
-//
-//                  assertEquals(messageSize, availBytes);
-//
-//                  byte[] bytes = new byte[availBytes];
-//
-//                  message.getBody().readBytes(bytes);
-//
-//                  message.acknowledge();
-//
-//                  if (++count == numMessages)
-//                  {
-//                     latch.countDown();
-//                  }
-//               }
-//               catch (Exception e)
-//               {
-//                  log.error("Failed to handle message", e);
-//
-//                  this.exception = e;
-//               }
-//            }
-//         }
-//
-//         MyHandler handler = new MyHandler();
-//
-//         consumer.setMessageHandler(handler);
-//
-//         session.start();
-//
-//         handler.latch.await(10000, TimeUnit.MILLISECONDS);
-//
-//         assertNull(handler.exception);
-//
-//         session.close();
-//
-//         validateNoFilesOnLargeDir();
-//      }
-//      finally
-//      {
-//         try
-//         {
-//            server.stop();
-//         }
-//         catch (Throwable ignored)
-//         {
-//         }
-//
-//         try
-//         {
-//            session.close();
-//         }
-//         catch (Throwable ignored)
-//         {
-//         }
-//      }
-//   }
-
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -199,15 +199,15 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-      System.out.println("waiting for bindings on node " + node +
-                         " address " +
-                         address +
-                         " count " +
-                         count +
-                         " consumerCount " +
-                         consumerCount +
-                         " local " +
-                         local);
+//      System.out.println("waiting for bindings on node " + node +
+//                         " address " +
+//                         address +
+//                         " count " +
+//                         count +
+//                         " consumerCount " +
+//                         consumerCount +
+//                         " local " +
+//                         local);
       HornetQServer server = this.servers[node];
 
       if (server == null)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -69,6 +69,8 @@
       super.setUp();
 
       resetFileFactory();
+      
+      fileFactory.start();
 
       transactions.clear();
 
@@ -88,6 +90,8 @@
          {
          }
       }
+      
+      fileFactory.stop();
 
       fileFactory = null;
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -41,6 +41,8 @@
       super.setUp();
       
       factory = createFactory();
+      
+      factory.start();
    }
 
    @Override
@@ -48,6 +50,8 @@
    {
       assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
       
+      factory.stop();
+      
       factory = null;
       
       forceGC();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -698,7 +698,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#testFlush()
     */
-   public void testFlush()
+   public void flush()
    {
    }
 

Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-11-04 13:32:04 UTC (rev 8208)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-11-04 16:20:14 UTC (rev 8209)
@@ -170,7 +170,7 @@
       Map<Thread, StackTraceElement[]> stackTrace = Thread.getAllStackTraces();
 
       out.println("*******************************************************************************");
-      out.println("Complete Thread dump" + msg);
+      out.println("Complete Thread dump " + msg);
 
       for (Map.Entry<Thread, StackTraceElement[]> el : stackTrace.entrySet())
       {
@@ -184,7 +184,7 @@
       }
       
       out.println("===============================================================================");
-      out.println("End Thread dump" + msg);
+      out.println("End Thread dump " + msg);
       out.println("*******************************************************************************");
       
       



More information about the hornetq-commits mailing list