Author: borges
Date: 2011-09-26 06:41:01 -0400 (Mon, 26 Sep 2011)
New Revision: 11424
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Always read using buffers from corresponding SequentialFileFactory
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -1140,7 +1140,7 @@
{
continue;
}
- replicator.syncPages(sFile, id, getAddress());
+ replicator.syncPages(fileFactory, sFile, id, getAddress());
}
}
finally
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -32,6 +32,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -224,10 +225,11 @@
long storePageCounterInc(long queueID, int add) throws Exception;
/**
+ * @param journalContent
* @return {@code true} if the underlying {@link SequentialFileFactory} has callback
support.
* @see SequentialFileFactory#isSupportsCallbacks()
*/
- boolean hasCallbackSupport();
+ boolean hasCallbackSupport(JournalContent journalContent);
/**
* @return the bindings journal
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -183,9 +183,9 @@
}
private Journal messageJournal;
-
+ private final SequentialFileFactory messageJournalFileFactory;
private Journal bindingsJournal;
-
+ private final SequentialFileFactory bindingsJournalFileFactory;
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
@@ -220,8 +220,6 @@
private final Map<SimpleString, PersistedAddressSetting>
mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString,
PersistedAddressSetting>();
- private final boolean hasCallbackSupport;
-
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ReplicationManager replicator)
@@ -248,13 +246,13 @@
journalDir = config.getJournalDirectory();
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ bindingsJournalFileFactory = new NIOSequentialFileFactory(bindingsDir);
Journal localBindings = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
- bindingsFF,
+ bindingsJournalFileFactory,
"hornetq-bindings",
"bindings",
1);
@@ -279,13 +277,12 @@
syncTransactional = config.isJournalSyncTransactional();
- SequentialFileFactory journalFF = null;
-
if (config.getJournalType() == JournalType.ASYNCIO)
{
JournalStorageManager.log.info("Using AIO Journal");
- journalFF = new AIOSequentialFileFactory(journalDir,
+ messageJournalFileFactory =
+ new AIOSequentialFileFactory(journalDir,
config.getJournalBufferSize_AIO(),
config.getJournalBufferTimeout_AIO(),
config.isLogJournalWriteRate());
@@ -293,7 +290,8 @@
else if (config.getJournalType() == JournalType.NIO)
{
JournalStorageManager.log.info("Using NIO Journal");
- journalFF = new NIOSequentialFileFactory(journalDir,
+ messageJournalFileFactory =
+ new NIOSequentialFileFactory(journalDir,
true,
config.getJournalBufferSize_NIO(),
config.getJournalBufferTimeout_NIO(),
@@ -303,7 +301,6 @@
{
throw new IllegalArgumentException("Unsupported journal type " +
config.getJournalType());
}
- hasCallbackSupport = journalFF.isSupportsCallbacks();
idGenerator = new BatchingIDGenerator(0,
JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
@@ -311,7 +308,7 @@
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
- journalFF,
+ messageJournalFileFactory,
"hornetq-data",
"hq",
config.getJournalType() ==
JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
@@ -409,8 +406,8 @@
storageManagerLock.writeLock().unlock();
}
- sendJournalFile(messageFiles, JournalContent.MESSAGES);
- sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ sendJournalFile(messageJournalFileFactory, messageFiles,
JournalContent.MESSAGES);
+ sendJournalFile(bindingsJournalFileFactory, bindingsFiles,
JournalContent.BINDINGS);
sendLargeMessageFiles(largeMessageFilesToSync);
sendPagesToBackup(pageFilesToSync, pagingManager);
@@ -474,7 +471,7 @@
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName,
1);
if (!seqFile.exists())
continue;
- replicator.syncLargeMessageFile(seqFile, size,
getLargeMessageIdFromFilename(fileName));
+ replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size,
getLargeMessageIdFromFilename(fileName));
}
}
@@ -507,11 +504,12 @@
/**
* Send an entire journal file to a replicating backup server.
*/
- private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
Exception
+ private void
+ sendJournalFile(SequentialFileFactory factory, JournalFile[] journalFiles,
JournalContent type) throws Exception
{
for (JournalFile jf : journalFiles)
{
- replicator.syncJournalFile(jf, type);
+ replicator.syncJournalFile(factory, jf, type);
jf.setCanReclaim(true);
}
}
@@ -3890,9 +3888,11 @@
journal.stop();
}
- public boolean hasCallbackSupport()
+ public boolean hasCallbackSupport(JournalContent journalContent)
{
- return hasCallbackSupport;
+ if (journalContent == JournalContent.MESSAGES)
+ return messageJournalFileFactory.isSupportsCallbacks();
+ return bindingsJournalFileFactory.isSupportsCallbacks();
}
@Override
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -37,6 +37,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -573,7 +574,7 @@
}
@Override
- public boolean hasCallbackSupport()
+ public boolean hasCallbackSupport(JournalContent content)
{
return false;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -20,6 +20,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -92,7 +93,7 @@
* @throws HornetQException
* @throws Exception
*/
- void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+ void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent
type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
@@ -113,7 +114,8 @@
* @param seqFile
* @throws Exception
*/
- void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws
Exception;
+ void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long
size, long id)
+
throws Exception;
/**
* @param file
@@ -121,5 +123,5 @@
* @param pageStore
* @throws Exception
*/
- void syncPages(SequentialFile file, long id, SimpleString pageStore) throws
Exception;
+ void syncPages(SequentialFileFactory factory, SequentialFile file, long id,
SimpleString pageStore) throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -492,7 +492,7 @@
{
sf.open(1, false);
}
- sf.writeDirect(ByteBuffer.wrap(data), true);
+ sf.writeDirect(data);
}
/**
@@ -519,7 +519,7 @@
Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(),
mapToFill);
registerJournal(packet.getJournalContentType().typeByte,
- new FileWrapperJournal(current, storage.hasCallbackSupport()));
+ new FileWrapperJournal(current,
storage.hasCallbackSupport(packet.getJournalContentType())));
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -26,6 +26,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -462,26 +463,28 @@
}
@Override
- public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
+ public void syncJournalFile(SequentialFileFactory factory, JournalFile jf,
JournalContent content) throws Exception
{
if (enabled)
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" +
file.size() + ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
}
}
@Override
- public void syncLargeMessageFile(SequentialFile file, long size, long id) throws
Exception
+ public void
+ syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size,
long id) throws Exception
{
- sendLargeFile(null, null, id, file, size);
+ sendLargeFile(null, null, id, file, factory, size);
}
@Override
- public void syncPages(SequentialFile file, long id, SimpleString queueName) throws
Exception
+ public void
+ syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString
queueName) throws Exception
{
- sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+ sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
}
/**
@@ -493,7 +496,11 @@
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
- private void sendLargeFile(JournalContent content, SimpleString pageStore, final long
id, SequentialFile file,
+ private void sendLargeFile(JournalContent content,
+ SimpleString pageStore,
+ final long id,
+ SequentialFile file,
+ SequentialFileFactory factory,
long maxBytesToSend)
throws Exception
{
@@ -501,11 +508,13 @@
return;
if (!file.isOpen())
{
- file.open(1, false);
+ file.open();
}
- final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
- while (true)
+ final ByteBuffer buffer = factory.newBuffer(1 << 17);
+ try
{
+ while (true)
+ {
buffer.rewind();
int bytesRead = file.read(buffer);
int toSend = bytesRead;
@@ -528,7 +537,12 @@
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
bytesRead, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
+ }
}
+ finally
+ {
+ factory.releaseBuffer(buffer);
+ }
}
@Override
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -20,12 +20,10 @@
import org.hornetq.core.journal.impl.TimedBuffer;
/**
- *
* A SequentialFile
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
*/
public interface SequentialFile
{
@@ -65,19 +63,50 @@
void write(EncodingSupport bytes, boolean sync) throws Exception;
- /** Write directly to the file without using any buffer */
+ /**
+ * Write directly to the file without using any buffer
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile
implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
- /** Write directly to the file without using any buffer */
+ /**
+ * Write directly to the file without using any buffer
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile
implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- /** Write directly to the file.
- * This is used by compacting and other places where we write a big buffer in a
single shot.
- * writeInternal should always block until the entire write is sync on disk */
+ /**
+ * Write directly to the file. This is used by compacting and other places where we
write a big
+ * buffer in a single shot. writeInternal should always block until the entire write
is sync on
+ * disk.
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile
implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeInternal(ByteBuffer bytes) throws Exception;
+ /**
+ * Wraps the bytes using a buffer from the internal {@link SequentialFileFactory} and
writes it
+ * directly.
+ */
+ void writeDirect(byte[] bytes) throws Exception;
+
+ /**
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile
implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
+ /**
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile
implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
int read(ByteBuffer bytes) throws Exception;
void position(long pos) throws Exception;
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -14,10 +14,10 @@
package org.hornetq.core.journal.impl;
import java.io.File;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -286,7 +286,7 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
- public void writeInternal(final ByteBuffer bytes) throws Exception
+ public void writeInternal(final ByteBuffer bytes) throws HornetQException
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -234,6 +234,20 @@
}
}
+ @Override
+ public void writeDirect(byte[] data) throws Exception
+ {
+ ByteBuffer buffer = factory.wrapBuffer(data);
+ try
+ {
+ writeDirect(buffer, true);
+ }
+ finally
+ {
+ factory.releaseBuffer(buffer);
+ }
+ }
+
public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask
callback) throws Exception
{
if (timedBuffer != null)
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -338,7 +338,9 @@
* @throws IOException
* @throws Exception
*/
- private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
+ private
+ void
+ doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback) throws IOException
{
channel.write(bytes);
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26
10:39:02 UTC (rev 11423)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26
10:41:01 UTC (rev 11424)
@@ -31,9 +31,9 @@
import org.hornetq.core.logging.Logger;
/**
- *
+ *
* A FakeSequentialFileFactory
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -313,8 +313,6 @@
public boolean isOpen()
{
- // log.debug("is open" + System.identityHashCode(this) +" open is
now "
- // + open);
return open;
}
@@ -499,7 +497,7 @@
{
writeDirect(bytes, sync, null);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
*/
@@ -508,8 +506,8 @@
writeDirect(bytes, true);
}
-
+
private void checkAndResize(final int size)
{
int oldpos = data == null ? 0 : data.position();
@@ -681,9 +679,15 @@
public void copyTo(SequentialFile newFileName)
{
// TODO Auto-generated method stub
-
}
+ @Override
+ public void writeDirect(byte[] bytes) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.length);
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ write(outbuffer, true);
+ }
}
/* (non-Javadoc)