[hornetq-commits] JBoss hornetq SVN: r11424 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 26 06:41:01 EDT 2011


Author: borges
Date: 2011-09-26 06:41:01 -0400 (Mon, 26 Sep 2011)
New Revision: 11424

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Always read using buffers from corresponding SequentialFileFactory

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -1140,7 +1140,7 @@
             {
                continue;
             }
-            replicator.syncPages(sFile, id, getAddress());
+            replicator.syncPages(fileFactory, sFile, id, getAddress());
          }
       }
       finally

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -32,6 +32,7 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
@@ -224,10 +225,11 @@
    long storePageCounterInc(long queueID, int add) throws Exception;
 
    /**
+    * @param journalContent
     * @return {@code true} if the underlying {@link SequentialFileFactory} has callback support.
     * @see SequentialFileFactory#isSupportsCallbacks()
     */
-   boolean hasCallbackSupport();
+   boolean hasCallbackSupport(JournalContent journalContent);
 
    /**
     * @return the bindings journal

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -183,9 +183,9 @@
    }
 
    private Journal messageJournal;
-
+   private final SequentialFileFactory messageJournalFileFactory;
    private Journal bindingsJournal;
-
+   private final SequentialFileFactory bindingsJournalFileFactory;
    private final SequentialFileFactory largeMessagesFactory;
 
    private volatile boolean started;
@@ -220,8 +220,6 @@
 
    private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
 
-   private final boolean hasCallbackSupport;
-
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
                                 final ReplicationManager replicator)
@@ -248,13 +246,13 @@
 
       journalDir = config.getJournalDirectory();
 
-      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+      bindingsJournalFileFactory = new NIOSequentialFileFactory(bindingsDir);
 
       Journal localBindings = new JournalImpl(1024 * 1024,
                                               2,
                                               config.getJournalCompactMinFiles(),
                                               config.getJournalCompactPercentage(),
-                                              bindingsFF,
+ bindingsJournalFileFactory,
                                               "hornetq-bindings",
                                               "bindings",
                                               1);
@@ -279,13 +277,12 @@
 
       syncTransactional = config.isJournalSyncTransactional();
 
-      SequentialFileFactory journalFF = null;
-
       if (config.getJournalType() == JournalType.ASYNCIO)
       {
          JournalStorageManager.log.info("Using AIO Journal");
 
-         journalFF = new AIOSequentialFileFactory(journalDir,
+         messageJournalFileFactory =
+                  new AIOSequentialFileFactory(journalDir,
                                                   config.getJournalBufferSize_AIO(),
                                                   config.getJournalBufferTimeout_AIO(),
                                                   config.isLogJournalWriteRate());
@@ -293,7 +290,8 @@
       else if (config.getJournalType() == JournalType.NIO)
       {
          JournalStorageManager.log.info("Using NIO Journal");
-         journalFF = new NIOSequentialFileFactory(journalDir,
+         messageJournalFileFactory =
+                  new NIOSequentialFileFactory(journalDir,
                                                   true,
                                                   config.getJournalBufferSize_NIO(),
                                                   config.getJournalBufferTimeout_NIO(),
@@ -303,7 +301,6 @@
       {
          throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
       }
-      hasCallbackSupport = journalFF.isSupportsCallbacks();
 
       idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
 
@@ -311,7 +308,7 @@
                                              config.getJournalMinFiles(),
                                              config.getJournalCompactMinFiles(),
                                              config.getJournalCompactPercentage(),
-                                             journalFF,
+                               messageJournalFileFactory,
                                              "hornetq-data",
                                              "hq",
                                              config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
@@ -409,8 +406,8 @@
             storageManagerLock.writeLock().unlock();
          }
 
-         sendJournalFile(messageFiles, JournalContent.MESSAGES);
-         sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+         sendJournalFile(messageJournalFileFactory, messageFiles, JournalContent.MESSAGES);
+         sendJournalFile(bindingsJournalFileFactory, bindingsFiles, JournalContent.BINDINGS);
          sendLargeMessageFiles(largeMessageFilesToSync);
          sendPagesToBackup(pageFilesToSync, pagingManager);
 
@@ -474,7 +471,7 @@
          SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
          if (!seqFile.exists())
             continue;
-         replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
+         replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size, getLargeMessageIdFromFilename(fileName));
       }
    }
 
@@ -507,11 +504,12 @@
    /**
     * Send an entire journal file to a replicating backup server.
     */
-   private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
+   private void
+      sendJournalFile(SequentialFileFactory factory, JournalFile[] journalFiles, JournalContent type) throws Exception
    {
       for (JournalFile jf : journalFiles)
       {
-         replicator.syncJournalFile(jf, type);
+         replicator.syncJournalFile(factory, jf, type);
          jf.setCanReclaim(true);
       }
    }
@@ -3890,9 +3888,11 @@
       journal.stop();
    }
 
-   public boolean hasCallbackSupport()
+   public boolean hasCallbackSupport(JournalContent journalContent)
    {
-      return hasCallbackSupport;
+      if (journalContent == JournalContent.MESSAGES)
+      return messageJournalFileFactory.isSupportsCallbacks();
+      return bindingsJournalFileFactory.isSupportsCallbacks();
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -37,6 +37,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
@@ -573,7 +574,7 @@
    }
 
    @Override
-   public boolean hasCallbackSupport()
+   public boolean hasCallbackSupport(JournalContent content)
    {
       return false;
    }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -20,6 +20,7 @@
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
@@ -92,7 +93,7 @@
     * @throws HornetQException
     * @throws Exception
     */
-   void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+   void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent type) throws Exception;
 
    /**
     * Reserve the following fileIDs in the backup server.
@@ -113,7 +114,8 @@
     * @param seqFile
     * @throws Exception
     */
-   void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
+   void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long size, long id)
+                                                                                                       throws Exception;
 
    /**
     * @param file
@@ -121,5 +123,5 @@
     * @param pageStore
     * @throws Exception
     */
-   void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
+   void syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString pageStore) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -492,7 +492,7 @@
       {
          sf.open(1, false);
       }
-      sf.writeDirect(ByteBuffer.wrap(data), true);
+      sf.writeDirect(data);
    }
 
    /**
@@ -519,7 +519,7 @@
       Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
       JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
       registerJournal(packet.getJournalContentType().typeByte,
-                      new FileWrapperJournal(current, storage.hasCallbackSupport()));
+                      new FileWrapperJournal(current, storage.hasCallbackSupport(packet.getJournalContentType())));
      }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -26,6 +26,7 @@
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
@@ -462,26 +463,28 @@
    }
 
    @Override
-   public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
+   public void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent content) throws Exception
    {
       if (enabled)
       {
          SequentialFile file = jf.getFile().copy();
          log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
-         sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+         sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
       }
    }
 
    @Override
-   public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
+   public void
+      syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size, long id) throws Exception
    {
-      sendLargeFile(null, null, id, file, size);
+      sendLargeFile(null, null, id, file, factory, size);
    }
 
    @Override
-   public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
+   public void
+      syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString queueName) throws Exception
    {
-      sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+      sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
    }
 
    /**
@@ -493,7 +496,11 @@
     * @param maxBytesToSend maximum number of bytes to read and send from the file
     * @throws Exception
     */
-   private void sendLargeFile(JournalContent content, SimpleString pageStore, final long id, SequentialFile file,
+   private void sendLargeFile(JournalContent content,
+      SimpleString pageStore,
+      final long id,
+      SequentialFile file,
+      SequentialFileFactory factory,
             long maxBytesToSend)
             throws Exception
    {
@@ -501,11 +508,13 @@
          return;
       if (!file.isOpen())
       {
-         file.open(1, false);
+         file.open();
       }
-      final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
-      while (true)
+      final ByteBuffer buffer = factory.newBuffer(1 << 17);
+      try
       {
+         while (true)
+      {
          buffer.rewind();
          int bytesRead = file.read(buffer);
          int toSend = bytesRead;
@@ -528,7 +537,12 @@
          sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
          if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
             break;
+         }
       }
+      finally
+      {
+         factory.releaseBuffer(buffer);
+      }
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -20,12 +20,10 @@
 import org.hornetq.core.journal.impl.TimedBuffer;
 
 /**
- *
  * A SequentialFile
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
  */
 public interface SequentialFile
 {
@@ -65,19 +63,50 @@
 
    void write(EncodingSupport bytes, boolean sync) throws Exception;
 
-   /** Write directly to the file without using any buffer */
+   /**
+    * Write directly to the file without using any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
    void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
 
-   /** Write directly to the file without using any buffer */
+   /**
+    * Write directly to the file without using any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
    void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
 
-   /** Write directly to the file.
-    *  This is used by compacting and other places where we write a big buffer in a single shot.
-    *  writeInternal should always block until the entire write is sync on disk */
+   /**
+    * Write directly to the file. This is used by compacting and other places where we write a big
+    * buffer in a single shot. writeInternal should always block until the entire write is sync on
+    * disk.
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
    void writeInternal(ByteBuffer bytes) throws Exception;
 
+   /**
+    * Wraps the bytes using a buffer from the internal {@link SequentialFileFactory} and writes it
+    * directly.
+    */
+   void writeDirect(byte[] bytes) throws Exception;
+
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
    int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
 
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
    int read(ByteBuffer bytes) throws Exception;
 
    void position(long pos) throws Exception;

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -14,10 +14,10 @@
 package org.hornetq.core.journal.impl;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -286,7 +286,7 @@
       aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
    }
 
-   public void writeInternal(final ByteBuffer bytes) throws Exception
+   public void writeInternal(final ByteBuffer bytes) throws HornetQException
    {
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -234,6 +234,20 @@
       }
    }
 
+   @Override
+   public void writeDirect(byte[] data) throws Exception
+   {
+      ByteBuffer buffer = factory.wrapBuffer(data);
+      try
+      {
+         writeDirect(buffer, true);
+      }
+      finally
+      {
+         factory.releaseBuffer(buffer);
+      }
+   }
+
    public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception
    {
       if (timedBuffer != null)

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -338,7 +338,9 @@
     * @throws IOException
     * @throws Exception
     */
-   private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+   private
+            void
+            doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
    {
       channel.write(bytes);
 

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-09-26 10:41:01 UTC (rev 11424)
@@ -31,9 +31,9 @@
 import org.hornetq.core.logging.Logger;
 
 /**
- * 
+ *
  * A FakeSequentialFileFactory
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -313,8 +313,6 @@
 
       public boolean isOpen()
       {
-         // log.debug("is open" + System.identityHashCode(this) +" open is now "
-         // + open);
          return open;
       }
 
@@ -499,7 +497,7 @@
       {
          writeDirect(bytes, sync, null);
       }
-      
+
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
        */
@@ -508,8 +506,8 @@
          writeDirect(bytes, true);
       }
 
-      
 
+
       private void checkAndResize(final int size)
       {
          int oldpos = data == null ? 0 : data.position();
@@ -681,9 +679,15 @@
       public void copyTo(SequentialFile newFileName)
       {
          // TODO Auto-generated method stub
-         
       }
 
+      @Override
+      public void writeDirect(byte[] bytes) throws Exception
+      {
+         ByteBuffer buffer = newBuffer(bytes.length);
+         HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+         write(outbuffer, true);
+      }
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list