[hornetq-commits] JBoss hornetq SVN: r8230 - in trunk: src/main/org/hornetq/core/journal/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 5 21:01:23 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-05 21:01:22 -0500 (Thu, 05 Nov 2009)
New Revision: 8230

Modified:
   trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Fixing PageStoreTest and few other minor tweaks

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -39,9 +39,9 @@
    /** The factory may need to do some initialization before the file is activated.
     *  this was added as a hook for AIO to initialize the Observer on TimedBuffer.
     *  It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
-   void activate(SequentialFile file);
+   void activateBuffer(SequentialFile file);
    
-   void deactivate(SequentialFile file);
+   void deactivateBuffer();
 
    // To be used in tests only
    ByteBuffer wrapBuffer(byte[] bytes);

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -36,10 +36,10 @@
  */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
 {
-   
+
    // Timeout used to wait executors to shutdown
    private static final int EXECUTOR_TIMEOUT = 60;
-   
+
    private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
 
    private static final boolean trace = log.isTraceEnabled();
@@ -77,21 +77,22 @@
    }
 
    public AIOSequentialFileFactory(final String journalDir,
-                                   int bufferSize,
-                                   long bufferTimeout,
-                                   boolean flushOnSync,
-                                   boolean logRates)
+                                   final int bufferSize,
+                                   final long bufferTimeout,
+                                   final boolean flushOnSync,
+                                   final boolean logRates)
    {
       super(journalDir);
       this.bufferSize = bufferSize;
       this.bufferTimeout = bufferTimeout;
-      this.timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
+      timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
     */
-   public void activate(SequentialFile file)
+   @Override
+   public void activateBuffer(final SequentialFile file)
    {
       final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
       timedBuffer.disableAutoFlush();
@@ -105,12 +106,14 @@
       }
    }
 
+   @Override
    public void flush()
    {
       timedBuffer.flush();
    }
 
-   public void deactivate(SequentialFile file)
+   @Override
+   public void deactivateBuffer()
    {
       timedBuffer.flush();
       timedBuffer.setObserver(null);
@@ -179,33 +182,37 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
     */
-   public void releaseBuffer(ByteBuffer buffer)
+   @Override
+   public void releaseBuffer(final ByteBuffer buffer)
    {
       AsynchronousFileImpl.destroyBuffer(buffer);
    }
 
+   @Override
    public void start()
    {
       timedBuffer.start();
-      
+
       writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
-                                                                                                         true));
+                                                                                 true));
 
       pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
-                                                                                                      true));
+                                                                              true));
 
-
    }
 
+   @Override
    public void stop()
    {
       buffersControl.stop();
+
       timedBuffer.stop();
-      
-      this.writeExecutor.shutdown();
+
+      writeExecutor.shutdown();
+
       try
       {
-         if (!this.writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+         if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
          {
             log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
          }
@@ -213,12 +220,12 @@
       catch (InterruptedException e)
       {
       }
-      
-      this.pollerExecutor.shutdown();
 
+      pollerExecutor.shutdown();
+
       try
       {
-         if (!this.pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+         if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
          {
             log.warn("Timed out on AIO poller shutdown", new Exception("Timed out on AIO writer shutdown"));
          }
@@ -228,9 +235,10 @@
       }
    }
 
+   @Override
    protected void finalize()
    {
-      this.stop();
+      stop();
    }
 
    /** Class that will control buffer-reuse */
@@ -255,7 +263,9 @@
          if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
          {
             if (trace)
+            {
                trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
+            }
 
             bufferReuseLastTime = System.currentTimeMillis();
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -52,7 +52,7 @@
    {
    }
    
-   public void activate(SequentialFile file)
+   public void activateBuffer(SequentialFile file)
    {
    }
    
@@ -60,7 +60,7 @@
    {
    }
    
-   public void deactivate(SequentialFile file)
+   public void deactivateBuffer()
    {
    }
    

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -213,15 +213,15 @@
     * Commits and rollbacks are also counted as negatives. We need to fix those also.
     * @param dependencies
     */
-   public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies)  throws Exception
+   public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies) throws Exception
    {
       for (JournalFile dependency : dependencies)
       {
          fixDependency(originalFile, dependency);
       }
-      
+
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -256,15 +256,17 @@
    {
       JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
       {
-         public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+         @Override
+         public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
          {
             if (transactionCounter.containsKey(transactionID))
             {
                dependency.incNegCount(originalFile);
             }
          }
-         
-         public void onReadRollbackRecord(long transactionID) throws Exception
+
+         @Override
+         public void onReadRollbackRecord(final long transactionID) throws Exception
          {
             if (transactionCounter.containsKey(transactionID))
             {
@@ -272,11 +274,10 @@
             }
          }
       };
-      
+
       JournalImpl.readJournalFile(fileFactory, dependency, txfix);
    }
 
-
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -2058,7 +2058,7 @@
          openFile(currentFile, true);
       }
 
-      fileFactory.activate(currentFile.getFile());
+      fileFactory.activateBuffer(currentFile.getFile());
 
       pushOpenedFile();
 
@@ -2559,7 +2559,7 @@
             log.warn("Couldn't stop journal executor after 60 seconds");
          }
 
-         fileFactory.flush();
+         fileFactory.deactivateBuffer();
 
          if (currentFile != null && currentFile.getFile().isOpen())
          {
@@ -3037,7 +3037,7 @@
          trace("moveNextFile: " + currentFile.getFile().getFileName() + " sync: " + synchronous);
       }
 
-      fileFactory.activate(currentFile.getFile());
+      fileFactory.activateBuffer(currentFile.getFile());
    }
 
    /** 
@@ -3171,7 +3171,7 @@
 
    private void closeFile(final JournalFile file, final boolean synchronous)
    {
-      fileFactory.deactivate(file.getFile());
+      fileFactory.deactivateBuffer();
       pendingCloseFiles.add(file);
 
       Runnable run = new Runnable()

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -30,14 +30,14 @@
 public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
 {
    private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
-   
+
    public NIOSequentialFileFactory(final String journalDir)
    {
       super(journalDir);
-      
+
       if (journalDir == null)
       {
-         new Exception ("journalDir is null").printStackTrace();
+         new Exception("journalDir is null").printStackTrace();
       }
    }
 
@@ -46,7 +46,7 @@
    {
       return new NIOSequentialFile(journalDir, fileName);
    }
-   
+
    public boolean isSupportsCallbacks()
    {
       return false;

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -60,7 +60,7 @@
 public class PagingStoreImpl implements TestSupportPageStore
 {
    // Constants -----------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(PagingStoreImpl.class);
 
    // Attributes ----------------------------------------------------
@@ -210,7 +210,7 @@
    public boolean isPaging()
    {
       currentPageLock.readLock().lock();
-      
+
       try
       {
          if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -271,7 +271,6 @@
       checkReleaseProducerFlowControlCredits(-credits);
    }
 
-   
    public void addSize(final ServerMessage message, final boolean add) throws Exception
    {
       long size = message.getMemoryEstimate();
@@ -307,7 +306,7 @@
          addSize(-size);
       }
    }
-   
+
    public boolean page(final ServerMessage message, final long transactionID, final boolean duplicateDetection) throws Exception
    {
       // The sync on transactions is done on commit only
@@ -320,7 +319,7 @@
       // of crash
       return page(message, -1, syncNonTransactional && message.isDurable(), duplicateDetection);
    }
-   
+
    public void sync() throws Exception
    {
       currentPageLock.readLock().lock();
@@ -482,6 +481,7 @@
       currentPageLock.readLock().lock();
       try
       {
+         // Already paging, nothing to be done
          if (currentPage != null)
          {
             return false;
@@ -515,7 +515,6 @@
       }
    }
 
-   
    public Page getCurrentPage()
    {
       return currentPage;
@@ -597,18 +596,16 @@
             {
                firstPageId = Integer.MAX_VALUE;
 
-               if (currentPage != null)
+               if (currentPage == null)
                {
-                  returnPage = currentPage;
-                  returnPage.close();
-                  currentPage = null;
-               }
-               else
-               {
                   // sanity check... it shouldn't happen!
                   throw new IllegalStateException("CurrentPage is null");
                }
 
+               returnPage = currentPage;
+               returnPage.close();
+               currentPage = null;
+
                // The current page is empty... which means we reached the end of the pages
                if (returnPage.getNumberOfMessages() == 0)
                {
@@ -679,7 +676,6 @@
 
    }
 
-   
    private synchronized void checkReleaseProducerFlowControlCredits(final long size)
    {
       if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
@@ -702,7 +698,6 @@
       }
    }
 
-   
    private void addSize(final long size) throws Exception
    {
       if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -734,7 +729,6 @@
          }
          else
          {
-            // When in Global mode, we use the default page size as the mark to start depage
             if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
             {
                if (startDepaging())
@@ -750,8 +744,11 @@
          return;
       }
    }
-   
-   private boolean page(final ServerMessage message, final long transactionID, final boolean sync, final boolean duplicateDetection) throws Exception
+
+   private boolean page(final ServerMessage message,
+                        final long transactionID,
+                        final boolean sync,
+                        final boolean duplicateDetection) throws Exception
    {
       if (!running)
       {
@@ -813,7 +810,7 @@
          {
             // We set the duplicate detection header to prevent the message being depaged more than once in case of
             // failure during depage
-            
+
             byte[] bytes = new byte[8];
 
             ByteBuffer buff = ByteBuffer.wrap(bytes);
@@ -823,8 +820,19 @@
             message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
          }
 
-         int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
+         PagedMessage pagedMessage;
 
+         if (transactionID != -1)
+         {
+            pagedMessage = new PagedMessageImpl(message, transactionID);
+         }
+         else
+         {
+            pagedMessage = new PagedMessageImpl(message);
+         }
+
+         int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
+
          if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
          {
             // Make sure nothing is currently validating or using currentPage
@@ -846,32 +854,14 @@
 
          try
          {
-            if (currentPage != null)
-            {
-               PagedMessage pagedMessage;
-               
-               if (transactionID != -1)
-               {
-                  pagedMessage = new PagedMessageImpl(message, transactionID);
-               }
-               else
-               {
-                  pagedMessage = new PagedMessageImpl(message);
-               }
-               
-               currentPage.write(pagedMessage);
+            currentPage.write(pagedMessage);
 
-               if (sync)
-               {
-                  currentPage.sync();
-               }
-               
-               return true;
-            }
-            else
+            if (sync)
             {
-               return false;
+               currentPage.sync();
             }
+
+            return true;
          }
          finally
          {
@@ -884,7 +874,7 @@
       }
 
    }
-   
+
    /**
     * This method will remove files from the page system and and route them, doing it transactionally
     *     

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -83,7 +83,7 @@
       clearData();
 
       Configuration config = createDefaultConfig();
-
+ 
       HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
 
       server.start();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -677,7 +677,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
     */
-   public void activate(SequentialFile file)
+   public void activateBuffer(SequentialFile file)
    {
    }
 
@@ -691,7 +691,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#deactivate(org.hornetq.core.journal.SequentialFile)
     */
-   public void deactivate(SequentialFile file)
+   public void deactivateBuffer()
    {
    }
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-05 19:01:30 UTC (rev 8229)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-06 02:01:22 UTC (rev 8230)
@@ -33,7 +33,6 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -77,8 +76,6 @@
 {
 
    // Constants -----------------------------------------------------
-   
-   private static final Logger log = Logger.getLogger(PagingStoreImplTest.class);
 
    private final static SimpleString destinationTestName = new SimpleString("test");
 
@@ -168,15 +165,15 @@
 
    }
 
-//   public void testPageWithNIO() throws Exception
-//   {
-//      // This integration test could fail 1 in 100 due to race conditions.
-//      for (int i = 0; i < 100; i++)
-//      {
-//         recreateDirectory(getTestDir());
-//         testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
-//      }
-//   }
+   public void testPageWithNIO() throws Exception
+   {
+      // This integration test could fail 1 in 100 due to race conditions.
+      for (int i = 0; i < 100; i++)
+      {
+         recreateDirectory(getTestDir());
+         testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
+      }
+   }
 
    public void testStore() throws Exception
    {
@@ -431,13 +428,13 @@
 
    }
 
-//   public void testConcurrentDepage() throws Exception
-//   {
-//      SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
-//
-//      testConcurrentPaging(factory, 10);
-//   }
+   public void testConcurrentDepage() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
 
+      testConcurrentPaging(factory, 10);
+   }
+
    protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
                                                                                                       InterruptedException
    {
@@ -474,28 +471,35 @@
 
       assertEquals(0, storeImpl.getNumberOfPages());
 
+      // Marked the store to be paged
       storeImpl.startPaging();
 
       assertEquals(1, storeImpl.getNumberOfPages());
 
       final SimpleString destination = new SimpleString("test");
 
-      class ProducerThread extends Thread
+      class WriterThread extends Thread
       {
+
          Exception e;
 
          @Override
          public void run()
          {
+
             try
             {
                boolean firstTime = true;
                while (true)
                {
                   long id = messageIdGenerator.incrementAndGet();
-                  ServerMessage msg = createMessage(storeImpl, destination, createRandomBuffer(id, 5));                  
-                  if (storeImpl.page(msg, true))
-                  {                     
+
+                  // Each thread will Keep paging until all the messages are depaged.
+                  // This is possible because the depage thread is not actually reading the pages. 
+                  // Just using the internal API to remove it from the page file system
+                  ServerMessage msg = createMessage(storeImpl, destination, createRandomBuffer(id, 5));  
+                  if (storeImpl.page(msg, false))
+                  {
                      buffers.put(id, msg);
                   }
                   else
@@ -505,6 +509,7 @@
 
                   if (firstTime)
                   {
+                     // We have at least one data paged. So, we can start depaging now
                      latchStart.countDown();
                      firstTime = false;
                   }
@@ -522,7 +527,7 @@
          }
       }
 
-      class ConsumerThread extends Thread
+      class ReaderThread extends Thread
       {
          Exception e;
 
@@ -533,12 +538,10 @@
             {
                // Wait every producer to produce at least one message
                latchStart.await();
+               
                while (aliveProducers.get() > 0)
                {
                   Page page = storeImpl.depage();
-                  
-                  //log.info("depaged " + page);
-                  
                   if (page != null)
                   {
                      readPages.add(page);
@@ -553,15 +556,15 @@
          }
       }
 
-      ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
+      WriterThread producerThread[] = new WriterThread[numberOfThreads];
 
       for (int i = 0; i < numberOfThreads; i++)
       {
-         producerThread[i] = new ProducerThread();
+         producerThread[i] = new WriterThread();
          producerThread[i].start();
       }
 
-      ConsumerThread consumer = new ConsumerThread();
+      ReaderThread consumer = new ReaderThread();
       consumer.start();
 
       for (int i = 0; i < numberOfThreads; i++)
@@ -611,7 +614,7 @@
       {
          SequentialFile fileTmp = factory.createSequentialFile(file, 1);
          fileTmp.open();
-         assertTrue(fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
+         assertTrue("The page file size (" + fileTmp.size() + ") shouldn't be > " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
          fileTmp.close();
       }
 



More information about the hornetq-commits mailing list