JBoss hornetq SVN: r11450 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-30 05:08:42 -0400 (Fri, 30 Sep 2011)
New Revision: 11450
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
Log:
Insert place holders to make sure warnings are turned on again before merging
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-30 08:58:28 UTC (rev 11449)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-30 09:08:42 UTC (rev 11450)
@@ -425,6 +425,7 @@
paused = false;
+ // HORNETQ-720
if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()) && false)
{
NettyAcceptor.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-30 08:58:28 UTC (rev 11449)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-30 09:08:42 UTC (rev 11450)
@@ -378,6 +378,7 @@
batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
}
+ // HORNETQ-720
if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()) && false)
{
NettyConnector.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
13 years, 2 months
JBoss hornetq SVN: r11449 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence and 10 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-30 04:58:28 -0400 (Fri, 30 Sep 2011)
New Revision: 11449
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Do not use AIO during backup synchronization
Also fix the logic at BackupSyncJournalTest for testing fileID reservation.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -1140,7 +1140,7 @@
{
continue;
}
- replicator.syncPages(fileFactory, sFile, id, getAddress());
+ replicator.syncPages(sFile, id, getAddress());
}
}
finally
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -32,7 +31,6 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -225,13 +223,6 @@
long storePageCounterInc(long queueID, int add) throws Exception;
/**
- * @param journalContent
- * @return {@code true} if the underlying {@link SequentialFileFactory} has callback support.
- * @see SequentialFileFactory#isSupportsCallbacks()
- */
- boolean hasCallbackSupport(JournalContent journalContent);
-
- /**
* @return the bindings journal
*/
Journal getBindingsJournal();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -441,7 +441,6 @@
PagingStore store = manager.getPageStore(entry.getKey());
store.sendPages(replicator, entry.getValue());
}
-
}
/**
@@ -471,7 +470,7 @@
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
if (!seqFile.exists())
continue;
- replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size, getLargeMessageIdFromFilename(fileName));
+ replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
}
}
@@ -509,7 +508,7 @@
{
for (JournalFile jf : journalFiles)
{
- replicator.syncJournalFile(factory, jf, type);
+ replicator.syncJournalFile(jf, type);
jf.setCanReclaim(true);
}
}
@@ -3888,13 +3887,6 @@
journal.stop();
}
- public boolean hasCallbackSupport(JournalContent journalContent)
- {
- if (journalContent == JournalContent.MESSAGES)
- return messageJournalFileFactory.isSupportsCallbacks();
- return bindingsJournalFileFactory.isSupportsCallbacks();
- }
-
@Override
public boolean addToPage(PagingManager pagingManager,
SimpleString address,
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -37,7 +37,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -574,12 +573,6 @@
}
@Override
- public boolean hasCallbackSupport(JournalContent content)
- {
- return false;
- }
-
- @Override
public Journal getBindingsJournal()
{
return null;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -20,7 +20,6 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -89,13 +88,6 @@
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
/**
- * Sends the whole content of the file to be duplicated.
- * @throws HornetQException
- * @throws Exception
- */
- void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent type) throws Exception;
-
- /**
* Reserve the following fileIDs in the backup server.
* @param datafiles
* @param contentType
@@ -111,11 +103,17 @@
void sendSynchronizationDone();
/**
+ * Sends the whole content of the file to be duplicated.
+ * @throws HornetQException
+ * @throws Exception
+ */
+ void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+
+ /**
* @param seqFile
* @throws Exception
*/
- void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long size, long id)
- throws Exception;
+ void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
/**
* @param file
@@ -123,5 +121,5 @@
* @param pageStore
* @throws Exception
*/
- void syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString pageStore) throws Exception;
+ void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -23,8 +23,10 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
@@ -579,7 +581,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
{
throw new UnsupportedOperationException("This method should only be called at a replicating backup");
}
@@ -620,6 +622,18 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ throw new UnsupportedOperationException();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,7 +13,11 @@
package org.hornetq.core.replication.impl;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -85,8 +89,8 @@
private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
/** Files reserved in each journal for synchronization of existing data from the 'live' server. */
- private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
- new HashMap<JournalContent, Map<Long, JournalFile>>();
+ private final Map<JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
/**
@@ -240,7 +244,7 @@
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
// We only need to load internal structures on the backup...
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
}
@@ -446,7 +450,7 @@
{
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
- SequentialFile sf;
+ SequentialFile channel;
switch (msg.getFileType())
{
case LARGE_MESSAGE:
@@ -461,38 +465,44 @@
largeMessage.setMessageID(id);
largeMessagesOnSync.put(id, largeMessage);
}
- sf = largeMessage.getFile();
+ channel = largeMessage.getFile();
}
break;
}
- case JOURNAL:
- {
- JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
- sf = journalFile.getFile();
- break;
- }
case PAGE:
{
Page page = getPage(msg.getPageStore(), (int)msg.getId());
- sf = page.getFile();
+ channel = page.getFile();
break;
}
+ case JOURNAL:
+ {
+ JournalSyncFile journalSyncFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+ FileChannel channel2 = journalSyncFile.getChannel();
+ if (data == null)
+ {
+ channel2.close();
+ return;
+ }
+ channel2.write(ByteBuffer.wrap(data));
+ return;
+ }
default:
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled file type " + msg.getFileType());
}
if (data == null)
{
- sf.close();
+ channel.close();
return;
}
- if (!sf.isOpen())
+ if (!channel.isOpen())
{
- sf.open(1, false);
+ channel.open(1, false);
}
- sf.writeDirect(data);
+ channel.writeDirect(ByteBuffer.wrap(data), true);
}
/**
@@ -516,10 +526,12 @@
final Journal journal = journalsHolder.get(packet.getJournalContentType());
- Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
- JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
- registerJournal(packet.getJournalContentType().typeByte,
- new FileWrapperJournal(current, storage.hasCallbackSupport(packet.getJournalContentType())));
+ Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+ for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+ {
+ mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ }
+ registerJournal(packet.getJournalContentType().typeByte, new FileWrapperJournal(journal));
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
@@ -808,4 +820,38 @@
{
return journals[journalID];
}
+
+ public static class JournalSyncFile
+ {
+
+ private FileChannel channel;
+ private final File file;
+
+ public JournalSyncFile(JournalFile jFile) throws Exception
+ {
+ SequentialFile seqFile = jFile.getFile();
+ file = seqFile.getJavaFile();
+ seqFile.close();
+ }
+
+ FileChannel getChannel() throws Exception
+ {
+ if (channel == null)
+ {
+ channel = new FileOutputStream(file).getChannel();
+ }
+ return channel;
+ }
+
+ void close() throws IOException
+ {
+ channel.close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JournalSyncFile(file=" + file.getAbsolutePath() + ")";
+ }
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,7 +13,9 @@
package org.hornetq.core.replication.impl;
+import java.io.FileInputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -26,7 +28,6 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -463,28 +464,26 @@
}
@Override
- public void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent content) throws Exception
+ public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
if (enabled)
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
}
}
@Override
- public void
- syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size, long id) throws Exception
+ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
{
- sendLargeFile(null, null, id, file, factory, size);
+ sendLargeFile(null, null, id, file, size);
}
@Override
- public void
- syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString queueName) throws Exception
+ public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
{
- sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
+ sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
/**
@@ -500,9 +499,7 @@
SimpleString pageStore,
final long id,
SequentialFile file,
- SequentialFileFactory factory,
- long maxBytesToSend)
- throws Exception
+ long maxBytesToSend) throws Exception
{
if (!enabled)
return;
@@ -510,16 +507,17 @@
{
file.open();
}
- final ByteBuffer buffer = factory.newBuffer(1 << 17);
+ final FileChannel channel = (new FileInputStream(file.getJavaFile())).getChannel();
try
{
+ final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
while (true)
- {
- buffer.rewind();
- int bytesRead = file.read(buffer);
- int toSend = bytesRead;
- if (bytesRead > 0)
{
+ buffer.clear();
+ int bytesRead = channel.read(buffer);
+ int toSend = bytesRead;
+ if (bytesRead > 0)
+ {
if (bytesRead >= maxBytesToSend)
{
toSend = (int)maxBytesToSend;
@@ -541,7 +539,7 @@
}
finally
{
- factory.releaseBuffer(buffer);
+ channel.close();
}
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -17,6 +17,7 @@
import java.util.Map;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -148,13 +149,11 @@
* During the synchronization between a live server and backup, we reserve in the backup the
* journal file IDs used in the live server. This call also makes sure the files are created
* empty without any kind of headers added.
- * @param fileIds ids to reserve for synchronization
- * @param mapToFill map to be filled with id and journal file pairs for <b>synchronization</b>.
- * @return a new {@link JournalFile} to be used for regular <b>replication</b> during
- * synchronization
+ * @param fileIds IDs to reserve for synchronization
+ * @return map to be filled with id and journal file pairs for <b>synchronization</b>.
* @throws Exception
*/
- JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception;
+ Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception;
/**
* @return whether automatic reclaiming of Journal files is enabled
@@ -190,4 +189,9 @@
*/
JournalFile[] getDataFiles();
+ SequentialFileFactory getFileFactory();
+
+ JournalFilesRepository getFilesRepository();
+
+ int getFileSize();
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -128,4 +129,9 @@
void copyTo(SequentialFile newFileName) throws Exception;
void setTimedBuffer(TimedBuffer buffer);
+
+ /**
+ * Returns a native File of the file underlying this sequential file.
+ */
+ File getJavaFile();
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -37,8 +37,6 @@
void debugWait() throws Exception;
- int getFileSize();
-
int getMinFiles();
String getFilePrefix();
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
/**
*
@@ -35,8 +34,6 @@
*/
public class AIOSequentialFile extends AbstractSequentialFile
{
- private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
-
private boolean opened = false;
private final int maxIO;
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -389,4 +389,9 @@
}
+ @Override
+ public File getJavaFile()
+ {
+ return getFile().getAbsoluteFile();
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -15,6 +15,8 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
@@ -36,15 +38,14 @@
private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
- private final JournalFile currentFile;
-
/**
- * @param file
+ * @param journal
+ * @throws Exception
*/
- public FileWrapperJournal(JournalFile file, boolean hasCallbackSupport)
+ public FileWrapperJournal(Journal journal) throws Exception
{
- super(hasCallbackSupport);
- currentFile = file;
+ super(journal.getFileFactory(), journal.getFilesRepository(), journal.getFileSize());
+ setUpCurrentFile(JournalImpl.SIZE_HEADER);
}
@Override
@@ -56,7 +57,11 @@
@Override
public void stop() throws Exception
{
- currentFile.getFile().close();
+ SequentialFile seqFile = currentFile.getFile();
+ long pos = seqFile.position();
+ seqFile.close();
+ seqFile.open();
+ seqFile.position(pos);
}
@Override
@@ -92,7 +97,7 @@
{
callback.storeLineUp();
}
-
+ switchFileIfNecessary(encoder.getEncodeSize());
encoder.setFileID(currentFile.getRecordID());
if (callback != null)
@@ -189,6 +194,12 @@
return defaultValue.get();
}
+ @Override
+ public String toString()
+ {
+ return FileWrapperJournal.class.getName() + "(currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
+ }
+
// UNSUPPORTED STUFF
@Override
@@ -260,7 +271,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
{
throw new UnsupportedOperationException();
}
@@ -300,4 +311,22 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ void scheduleReclaim()
+ {
+ // no-op
+ }
+
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -3,16 +3,35 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.logging.Logger;
abstract class JournalBase
{
- private final boolean hasCallbackSupport;
+ protected final JournalFilesRepository filesRepository;
+ protected final SequentialFileFactory fileFactory;
+ protected volatile JournalFile currentFile;
+ protected final int fileSize;
- public JournalBase(boolean hasCallbackSupport)
+ private static final Logger log = Logger.getLogger(JournalBase.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ public JournalBase(SequentialFileFactory fileFactory, JournalFilesRepository journalFilesRepository, int fileSize)
{
- this.hasCallbackSupport = hasCallbackSupport;
+ if (fileSize < JournalImpl.MIN_FILE_SIZE)
+ {
+ throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
+ }
+ if (fileSize % fileFactory.getAlignment() != 0)
+ {
+ throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
+ fileFactory.getAlignment());
+ }
+ this.fileFactory = fileFactory;
+ this.filesRepository = journalFilesRepository;
+ this.fileSize = fileSize;
}
abstract public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record,
@@ -179,9 +198,56 @@
}
}
+ /**
+ * @param size
+ * @throws Exception
+ */
+ protected void switchFileIfNecessary(int size) throws Exception
+ {
+ // We take into account the fileID used on the Header
+ if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
+ {
+ throw new IllegalArgumentException("Record is too large to store " + size);
+ }
+
+ if (!currentFile.getFile().fits(size))
+ {
+ moveNextFile(true);
+
+ // The same check needs to be done at the new file also
+ if (!currentFile.getFile().fits(size))
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Invalid logic on buffer allocation");
+ }
+ }
+ }
+
+ abstract void scheduleReclaim();
+
+ // You need to guarantee lock.acquire() before calling this method
+ protected void moveNextFile(final boolean scheduleReclaim) throws Exception
+ {
+ filesRepository.closeFile(currentFile);
+
+ currentFile = filesRepository.openFile();
+
+ if (scheduleReclaim)
+ {
+ scheduleReclaim();
+ }
+
+ if (trace)
+ {
+ log.info("moveNextFile: " + currentFile);
+ }
+
+ fileFactory.activateBuffer(currentFile.getFile());
+ }
+
protected SyncIOCompletion getSyncCallback(final boolean sync)
{
- if (hasCallbackSupport)
+ if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
@@ -213,4 +279,40 @@
}
}
+ /**
+ * @param lastDataPos
+ * @throws Exception
+ */
+ protected void setUpCurrentFile(int lastDataPos) throws Exception
+ {
+ // Create any more files we need
+
+ filesRepository.ensureMinFiles();
+
+ // The current file is the last one that has data
+
+ currentFile = filesRepository.pollLastDataFile();
+
+ if (currentFile != null)
+ {
+ currentFile.getFile().open();
+
+ currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+ }
+ else
+ {
+ currentFile = filesRepository.getFreeFile();
+
+ filesRepository.openFile(currentFile, true);
+ }
+
+ fileFactory.activateBuffer(currentFile.getFile());
+
+ filesRepository.pushOpenedFile();
+ }
+
+ public int getFileSize()
+ {
+ return fileSize;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -33,8 +33,6 @@
* Guaranteeing that they will be delivered in order to the Journal
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class JournalFilesRepository
{
@@ -91,6 +89,18 @@
final int fileSize,
final int minFiles)
{
+ if (filePrefix == null)
+ {
+ throw new IllegalArgumentException("filePrefix cannot be null");
+ }
+ if (fileExtension == null)
+ {
+ throw new IllegalArgumentException("fileExtension cannot be null");
+ }
+ if (maxAIO <= 0)
+ {
+ throw new IllegalArgumentException("maxAIO must be a positive number");
+ }
this.fileFactory = fileFactory;
this.maxAIO = maxAIO;
this.filePrefix = filePrefix;
@@ -447,9 +457,9 @@
* Creates files for journal synchronization of a replicated backup.
* @param isCurrent a current file is initialized and kept open.
*/
- public JournalFile createRemoteBackupSyncFile(long fileID, boolean isCurrent) throws Exception
+ public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
{
- return createFile(isCurrent, false, isCurrent, false, fileID);
+ return createFile(false, false, true, false, fileID);
}
// Package protected ---------------------------------------------
@@ -576,4 +586,11 @@
return jf;
}
+
+ @Override
+ public String toString()
+ {
+ return "JournalFilesRepository(dataFiles=" + dataFiles + ", freeFiles=" + freeFiles + ", openedFiles=" +
+ openedFiles + ")";
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -18,6 +18,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -171,18 +172,12 @@
private final int userVersion;
- private final int fileSize;
-
private final int minFiles;
private final float compactPercentage;
private final int compactMinFiles;
- private final SequentialFileFactory fileFactory;
-
- private final JournalFilesRepository filesRepository;
-
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -212,8 +207,6 @@
*/
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
- private volatile JournalFile currentFile;
-
private volatile JournalState state = JournalState.STOPPED;
private final Reclaimer reclaimer = new Reclaimer();
@@ -229,47 +222,12 @@
final String fileExtension,
final int maxAIO)
{
- this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
- }
-
- public JournalImpl(final int fileSize,
- final int minFiles,
- final int compactMinFiles,
- final int compactPercentage,
- final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int maxAIO,
- final int userVersion)
- {
- super(fileFactory.isSupportsCallbacks());
- if (fileSize < JournalImpl.MIN_FILE_SIZE)
- {
- throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
- }
- if (fileSize % fileFactory.getAlignment() != 0)
- {
- throw new IllegalArgumentException("Invalid journal-file-size " + fileSize +
- ", It should be multiple of " +
- fileFactory.getAlignment());
- }
+ super(fileFactory, new JournalFilesRepository(fileFactory, filePrefix, fileExtension, 0, maxAIO, fileSize,
+ minFiles), fileSize);
if (minFiles < 2)
{
throw new IllegalArgumentException("minFiles cannot be less than 2");
}
- if (filePrefix == null)
- {
- throw new NullPointerException("filePrefix is null");
- }
- if (fileExtension == null)
- {
- throw new NullPointerException("fileExtension is null");
- }
- if (maxAIO <= 0)
- {
- throw new IllegalStateException("maxAIO should aways be a positive number");
- }
-
if (compactPercentage < 0 || compactPercentage > 100)
{
throw new IllegalArgumentException("Compact Percentage out of range");
@@ -285,28 +243,14 @@
}
this.compactMinFiles = compactMinFiles;
-
- this.fileSize = fileSize;
-
this.minFiles = minFiles;
-
- this.fileFactory = fileFactory;
-
- filesRepository = new JournalFilesRepository(fileFactory,
- filePrefix,
- fileExtension,
- userVersion,
- maxAIO,
- fileSize,
- minFiles);
-
- this.userVersion = userVersion;
+ this.userVersion = 0;
}
@Override
public String toString()
{
- return super.toString() + " " + state;
+ return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
}
public void runDirectJournalBlast() throws Exception
@@ -2041,31 +1985,8 @@
return new JournalLoadInformation(0, -1);
}
- // Create any more files we need
+ setUpCurrentFile(lastDataPos);
- filesRepository.ensureMinFiles();
-
- // The current file is the last one that has data
-
- currentFile = filesRepository.pollLastDataFile();
-
- if (currentFile != null)
- {
- currentFile.getFile().open();
-
- currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
- }
- else
- {
- currentFile = filesRepository.getFreeFile();
-
- filesRepository.openFile(currentFile, true);
- }
-
- fileFactory.activateBuffer(currentFile.getFile());
-
- filesRepository.pushOpenedFile();
-
setJournalState(JournalState.LOADED);
for (TransactionHolder transaction : loadTransactions.values())
@@ -2162,7 +2083,7 @@
totalLiveSize += file.getLiveSize();
}
- long totalBytes = (long)dataFiles.length * (long)fileSize;
+ long totalBytes = dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
@@ -2224,7 +2145,7 @@
// TestableJournal implementation
// --------------------------------------------------------------
- public void setAutoReclaim(final boolean autoReclaim)
+ public synchronized void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
}
@@ -2329,6 +2250,7 @@
return records.size();
}
+ @Override
public int getFileSize()
{
return fileSize;
@@ -2775,26 +2697,10 @@
final IOAsyncTask callback;
- int size = encoder.getEncodeSize();
+ final int size = encoder.getEncodeSize();
- // We take into account the fileID used on the Header
- if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
- {
- throw new IllegalArgumentException("Record is too large to store " + size);
- }
+ switchFileIfNecessary(size);
- if (!currentFile.getFile().fits(size))
- {
- moveNextFile(true);
-
- // The same check needs to be done at the new file also
- if (!currentFile.getFile().fits(size))
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Invalid logic on buffer allocation");
- }
- }
-
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2842,28 +2748,9 @@
return currentFile;
}
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean scheduleReclaim) throws Exception
+ @Override
+ void scheduleReclaim()
{
- filesRepository.closeFile(currentFile);
-
- currentFile = filesRepository.openFile();
-
- if (scheduleReclaim)
- {
- scheduleReclaim();
- }
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile);
- }
-
- fileFactory.activateBuffer(currentFile.getFile());
- }
-
- private void scheduleReclaim()
- {
if (state != JournalState.LOADED)
{
return;
@@ -3102,21 +2989,21 @@
* @throws Exception
*/
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+ public synchronized Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
{
writeLock();
try
{
+ Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
log.info("Reserving fileIDs before synchronization: " + Arrays.toString(fileIds));
long maxID = -1;
for (long id : fileIds)
{
maxID = Math.max(maxID, id);
- map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id, false));
+ map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
- maxID += 1;
filesRepository.setNextFileID(maxID);
- return filesRepository.createRemoteBackupSyncFile(maxID, true);
+ return map;
}
finally
{
@@ -3128,4 +3015,16 @@
{
return autoReclaim;
}
+
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ return fileFactory;
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ return filesRepository;
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -28,7 +28,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- protected int n_msgs = 10;
+ protected int n_msgs = 20;
@Override
protected void setUp() throws Exception
@@ -64,12 +64,19 @@
}
backupServer.start();
- syncDelay.deliverUpToDateMsg();
+
+ // Deliver messages with Backup in-sync
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
+ sendMessages(session, producer, n_msgs);
+ // Deliver messages with Backup up-to-date
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, n_msgs);
+
Set<Long> liveIds = getFileIds(messageJournal);
+ int size = messageJournal.getFileSize();
PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
if (ps.getPageSizeBytes() == PAGE_SIZE)
{
@@ -79,11 +86,14 @@
finishSyncAndFailover();
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ System.out.println("backup journal " + backupMsgJournal);
+ System.out.println("live journal " + messageJournal);
+ assertEquals("file sizes must be the same", size, backupMsgJournal.getFileSize());
Set<Long> backupIds = getFileIds(backupMsgJournal);
assertEquals("File IDs must match!", liveIds, backupIds);
// "+ 2": there two other calls that send N_MSGS.
- for (int i = 0; i < totalRounds + 2; i++)
+ for (int i = 0; i < totalRounds + 3; i++)
{
receiveMsgsInRange(0, n_msgs);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -1571,7 +1571,7 @@
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull("Null message", message);
+ Assert.assertNull("expecting null message", message);
session2.close();
@@ -1610,7 +1610,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", latch.await(5, TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -1772,7 +1772,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", latch.await(5, TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -84,6 +84,7 @@
private Packet onHold;
private Channel channel;
public volatile boolean deliver;
+ private volatile boolean delivered;
private boolean receivedUpToDate;
private boolean mustHold = true;
@@ -97,6 +98,8 @@
deliver = true;
if (!receivedUpToDate)
return;
+ if (delivered)
+ return;
if (onHold == null)
{
@@ -108,6 +111,7 @@
try
{
handler.handlePacket(onHold);
+ delivered = true;
}
finally
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -51,8 +51,10 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -853,7 +855,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
{
return null;
}
@@ -894,5 +896,17 @@
return null;
}
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ return null;
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ return null;
+ }
+
}
}
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.journal.impl.fakes;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -28,7 +29,6 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
-import org.hornetq.core.logging.Logger;
/**
*
@@ -40,8 +40,6 @@
*/
public class FakeSequentialFileFactory implements SequentialFileFactory
{
- private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -285,11 +283,6 @@
}
}
- public boolean isSendError()
- {
- return sendError;
- }
-
public void setSendError(final boolean sendError)
{
this.sendError = sendError;
@@ -688,6 +681,12 @@
HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
write(outbuffer, true);
}
+
+ @Override
+ public File getJavaFile()
+ {
+ throw new UnsupportedOperationException();
+ }
}
/* (non-Javadoc)
13 years, 2 months
JBoss hornetq SVN: r11448 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-30 04:55:35 -0400 (Fri, 30 Sep 2011)
New Revision: 11448
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
currentFile cannot be null at this point
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30 08:54:55 UTC (rev 11447)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30 08:55:35 UTC (rev 11448)
@@ -2795,11 +2795,6 @@
}
}
- if (currentFile == null)
- {
- throw new NullPointerException("Current file = null");
- }
-
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
13 years, 2 months
JBoss hornetq SVN: r11447 - branches/HORNETQ-720_Replication.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-30 04:54:55 -0400 (Fri, 30 Sep 2011)
New Revision: 11447
Modified:
branches/HORNETQ-720_Replication/.gitignore
Log:
Update gitignore
Modified: branches/HORNETQ-720_Replication/.gitignore
===================================================================
--- branches/HORNETQ-720_Replication/.gitignore 2011-09-30 04:16:23 UTC (rev 11446)
+++ branches/HORNETQ-720_Replication/.gitignore 2011-09-30 08:54:55 UTC (rev 11447)
@@ -10,6 +10,8 @@
org.maven.ide.eclipse.prefs
.classpath
/bin
+tests/*-tests/hs_err_pid*.log
+tests/**/server.lock
# /
/build
13 years, 2 months
JBoss hornetq SVN: r11446 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-30 00:16:23 -0400 (Fri, 30 Sep 2011)
New Revision: 11446
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
Isolating Stomp Messages on topics
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29 22:03:44 UTC (rev 11445)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-30 04:16:23 UTC (rev 11446)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -94,14 +95,56 @@
{
StompSubscription subscription = subscriptions.get(consumerID);
+ StompFrame frame = createFrame(serverMessage, deliveryCount, subscription);
+
+ int length = frame.getEncodedSize();
+
+ if (subscription.isAutoACK())
+ {
+ session.acknowledge(consumerID, serverMessage.getMessageID());
+ session.commit();
+ }
+ else
+ {
+ messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
+ }
+
+ // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
+ manager.send(connection, frame);
+
+ return length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ /**
+ * @param serverMessage
+ * @param deliveryCount
+ * @param subscription
+ * @return
+ * @throws UnsupportedEncodingException
+ * @throws Exception
+ */
+ private StompFrame createFrame(ServerMessage serverMessage, int deliveryCount, StompSubscription subscription) throws UnsupportedEncodingException,
+ Exception
+ {
+ synchronized (serverMessage)
+ {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(Stomp.Headers.Message.DESTINATION, serverMessage.getAddress().toString());
if (subscription.getID() != null)
{
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
+
HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
+
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
: serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
@@ -127,31 +170,8 @@
serverMessage.getBodyBuffer().resetReaderIndex();
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
- int length = frame.getEncodedSize();
-
- if (subscription.isAutoACK())
- {
- session.acknowledge(consumerID, serverMessage.getMessageID());
- session.commit();
- }
- else
- {
- messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
- }
-
- // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
- manager.send(connection, frame);
-
- return length;
-
+ return frame;
}
- catch (Exception e)
- {
- e.printStackTrace();
- return 0;
- }
-
}
public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
13 years, 2 months
JBoss hornetq SVN: r11445 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-29 18:03:44 -0400 (Thu, 29 Sep 2011)
New Revision: 11445
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
Log:
JBPAPP-7256 & SOA-3363 - added a test for this on SecurityTest
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2011-09-29 22:02:54 UTC (rev 11444)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2011-09-29 22:03:44 UTC (rev 11445)
@@ -538,8 +538,9 @@
roles.add(role);
+
+ // This was added to validate https://issues.jboss.org/browse/SOA-3363
securityRepository.addMatch(SecurityTest.addressA, roles);
-
boolean failed = false;
try
{
@@ -549,6 +550,7 @@
{
failed = true;
}
+ // This was added to validate https://issues.jboss.org/browse/SOA-3363 ^^^^^
assertTrue("Failure expected on send after removing the match", failed);
13 years, 2 months
JBoss hornetq SVN: r11444 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/security/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-29 18:02:54 -0400 (Thu, 29 Sep 2011)
New Revision: 11444
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/SecurityStore.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
Log:
JBPAPP-7256 & SOA-3363 - added a test for this on
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/SecurityStore.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/SecurityStore.java 2011-09-29 19:53:05 UTC (rev 11443)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/SecurityStore.java 2011-09-29 22:02:54 UTC (rev 11444)
@@ -28,4 +28,6 @@
void authenticate(String user, String password) throws Exception;
void check(SimpleString address, CheckType checkType, ServerSession session) throws Exception;
+
+ void stop();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java 2011-09-29 19:53:05 UTC (rev 11443)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java 2011-09-29 22:02:54 UTC (rev 11444)
@@ -97,9 +97,15 @@
this.managementClusterUser = managementClusterUser;
this.managementClusterPassword = managementClusterPassword;
this.notificationService = notificationService;
+ this.securityRepository.registerListener(this);
}
// SecurityManager implementation --------------------------------
+
+ public void stop()
+ {
+ securityRepository.unRegisterListener(this);
+ }
public void authenticate(final String user, final String password) throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-29 19:53:05 UTC (rev 11443)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-29 22:02:54 UTC (rev 11444)
@@ -493,7 +493,10 @@
for (ServerSession session : sessions.values())
{
session.close(true);
- session.waitContextCompletion();
+ if (!criticalIOError)
+ {
+ session.waitContextCompletion();
+ }
}
remotingService.stop();
@@ -601,7 +604,9 @@
{
// Ignore
}
-
+
+ securityStore.stop();
+
threadPool = null;
scheduledPool = null;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2011-09-29 19:53:05 UTC (rev 11443)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2011-09-29 22:02:54 UTC (rev 11444)
@@ -493,21 +493,68 @@
try
{
server.start();
+
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
+
HornetQSecurityManager securityManager = server.getSecurityManager();
+
securityManager.addUser("auser", "pass");
- Role role = new Role("arole", true, false, true, false, false, false, false);
+
+ Role role = new Role("arole", true, true, true, false, false, false, false);
+
Set<Role> roles = new HashSet<Role>();
+
roles.add(role);
+
securityRepository.addMatch(SecurityTest.addressA, roles);
+
securityManager.addRole("auser", "arole");
+
locator.setBlockOnNonDurableSend(true);
+
ClientSessionFactory cf = locator.createSessionFactory();
+
ClientSession session = cf.createSession("auser", "pass", false, true, true, false, -1);
+
session.createQueue(SecurityTest.addressA, SecurityTest.queueA, true);
+
ClientProducer cp = session.createProducer(SecurityTest.addressA);
+
cp.send(session.createMessage(false));
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(queueA);
+
+ ClientMessage receivedMessage = cons.receive(5000);
+
+ assertNotNull(receivedMessage);
+
+ receivedMessage.acknowledge();
+
+ role = new Role("arole", false, false, true, false, false, false, false);
+
+ roles = new HashSet<Role>();
+
+ roles.add(role);
+
+ securityRepository.addMatch(SecurityTest.addressA, roles);
+
+ boolean failed = false;
+ try
+ {
+ cp.send(session.createMessage(true));
+ }
+ catch (HornetQException e)
+ {
+ failed = true;
+ }
+
+ assertTrue("Failure expected on send after removing the match", failed);
+
+
session.close();
+
}
finally
{
13 years, 2 months
JBoss hornetq SVN: r11443 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/remoting/impl/invm and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-29 15:53:05 -0400 (Thu, 29 Sep 2011)
New Revision: 11443
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
improvements on stomp
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -56,6 +57,8 @@
private final long creationTime;
private StompDecoder decoder = new StompDecoder();
+
+ private final Acceptor acceptorUsed;
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
@@ -70,13 +73,15 @@
return decoder;
}
- StompConnection(final Connection transportConnection, final StompProtocolManager manager)
+ StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
this.manager = manager;
this.creationTime = System.currentTimeMillis();
+
+ this.acceptorUsed = acceptorUsed;
}
public void addFailureListener(final FailureListener listener)
@@ -186,6 +191,11 @@
callClosingListeners();
}
+
+ Acceptor getAcceptorUsed()
+ {
+ return acceptorUsed;
+ }
private void internalClose()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -112,7 +112,7 @@
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
- StompConnection conn = new StompConnection(connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this);
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -20,17 +20,21 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
@@ -54,15 +58,20 @@
private final Map<Long, StompSubscription> subscriptions = new ConcurrentHashMap<Long, StompSubscription>();
// key = message ID, value = consumer ID
- private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long, Long>();
+ private final Map<Long, Pair<Long, Integer>> messagesToAck = new ConcurrentHashMap<Long, Pair<Long, Integer>>();
private volatile boolean noLocal = false;
+ private final int consumerCredits;
+
StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext)
{
this.connection = connection;
this.manager = manager;
this.sessionContext = sessionContext;
+ this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT,
+ TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT,
+ connection.getAcceptorUsed().getConfiguration());
}
void setServerSession(ServerSession session)
@@ -119,19 +128,20 @@
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
- if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+ int length = frame.getEncodedSize();
+
+ if (subscription.isAutoACK())
{
session.acknowledge(consumerID, serverMessage.getMessageID());
session.commit();
}
else
{
- messagesToAck.put(serverMessage.getMessageID(), consumerID);
+ messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
}
// Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
manager.send(connection, frame);
- int length = frame.getEncodedSize();
return length;
@@ -157,10 +167,10 @@
public void closed()
{
}
-
+
public void addReadyListener(final ReadyListener listener)
{
- connection.getTransportConnection().addReadyListener(listener);
+ connection.getTransportConnection().addReadyListener(listener);
}
public void removeReadyListener(final ReadyListener listener)
@@ -171,9 +181,21 @@
public void acknowledge(String messageID) throws Exception
{
long id = Long.parseLong(messageID);
- long consumerID = messagesToAck.remove(id);
- session.acknowledge(consumerID, id);
- session.commit();
+ Pair<Long, Integer> pair = messagesToAck.remove(id);
+
+ if (pair != null)
+ {
+ long consumerID = pair.a;
+ int credits = pair.b;
+
+ if (this.consumerCredits != -1)
+ {
+ session.receiveConsumerCredits(consumerID, credits);
+ }
+
+ session.acknowledge(consumerID, id);
+ session.commit();
+ }
}
public void addSubscription(long consumerID,
@@ -200,14 +222,6 @@
{
session.createQueue(SimpleString.toSimpleString(destination), queue, null, false, true);
}
- else
- {
- // Already exists
- if (query.getConsumerCount() > 0)
- {
- throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
- }
- }
}
else
{
@@ -216,11 +230,19 @@
}
}
session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
- session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(subscriptionID, ack);
+
+ StompSubscription subscription = new StompSubscription(subscriptionID, ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO));
subscriptions.put(consumerID, subscription);
- // FIXME not very smart: since we can't start the consumer, we start the session
- // every time to start the new consumer (and all previous consumers...)
+
+ if (subscription.isAutoACK())
+ {
+ session.receiveConsumerCredits(consumerID, -1);
+ }
+ else
+ {
+ session.receiveConsumerCredits(consumerID, consumerCredits);
+ }
+
session.start();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -27,24 +27,24 @@
// Attributes ----------------------------------------------------
private final String subID;
+
+ private final boolean autoACK;
- private final String ack;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public StompSubscription(String subID, String ack)
+ public StompSubscription(String subID, boolean ack)
{
this.subID = subID;
- this.ack = ack;
+ this.autoACK = ack;
}
// Public --------------------------------------------------------
- public String getAck()
+ public boolean isAutoACK()
{
- return ack;
+ return autoACK;
}
public String getID()
@@ -55,7 +55,7 @@
@Override
public String toString()
{
- return "StompSubscription[id=" + subID + ", ack=" + ack + "]";
+ return "StompSubscription[id=" + subID + ", autoACK=" + autoACK + "]";
}
// Package protected ---------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -61,6 +61,8 @@
private boolean paused;
private NotificationService notificationService;
+
+ private final Map<String, Object> configuration;
public InVMAcceptor(final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@@ -70,6 +72,8 @@
{
this.clusterConnection = clusterConnection;
+ this.configuration = configuration;
+
this.handler = handler;
this.listener = listener;
@@ -78,6 +82,11 @@
executorFactory = new OrderedExecutorFactory(threadPool);
}
+
+ public Map<String, Object> getConfiguration()
+ {
+ return configuration;
+ }
public ClusterConnection getClusterConnection()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -143,6 +143,8 @@
private HttpAcceptorHandler httpHandler = null;
private final ConcurrentMap<Object, NettyConnection> connections = new ConcurrentHashMap<Object, NettyConnection>();
+
+ private final Map<String, Object> configuration;
private final Executor threadPool;
@@ -185,6 +187,8 @@
this.clusterConnection = clusterConnection;
+ this.configuration = configuration;
+
this.handler = handler;
this.decoder = decoder;
@@ -505,6 +509,11 @@
serverChannelGroup.add(serverChannel);
}
}
+
+ public Map<String, Object> getConfiguration()
+ {
+ return this.configuration;
+ }
public synchronized void stop()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -76,6 +76,10 @@
public static final String CLUSTER_CONNECTION = "cluster-connection";
+ public static final String STOMP_CONSUMERS_CREDIT = "stomp-consumer-credits";
+
+ public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -154,6 +158,7 @@
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
+ allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -13,6 +13,8 @@
package org.hornetq.spi.core.remoting;
+import java.util.Map;
+
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
@@ -35,6 +37,8 @@
* @return the cluster connection associated with this Acceptor
*/
ClusterConnection getClusterConnection();
+
+ Map<String, Object> getConfiguration();
/**
* Set the notification service for this acceptor to use.
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-29 03:26:28 UTC (rev 11442)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-29 19:53:05 UTC (rev 11443)
@@ -117,6 +117,7 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
@@ -168,7 +169,7 @@
protected Socket createSocket() throws IOException
{
- return new Socket("127.0.0.1", port);
+ return new Socket("localhost", port);
}
protected String getQueueName()
13 years, 2 months
JBoss hornetq SVN: r11442 - in branches/Branch_2_2_EAP/tests/src/org/hornetq/tests: unit/core/journal/impl/fakes and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-09-28 23:26:28 -0400 (Wed, 28 Sep 2011)
New Revision: 11442
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/opt/DisconnectDiskTest.java
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
JBPAPP-7205 - disconnected journal
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/opt/DisconnectDiskTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/opt/DisconnectDiskTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/opt/DisconnectDiskTest.java 2011-09-29 03:26:28 UTC (rev 11442)
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.opt;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A test where we validate the server being shutdown when the disk crashed
+ *
+ * It's not possible to automate this test, for this reason follow these steps:
+ *
+ * - you will need any sort of USB disk. I would recommend a real disk using ext4 (or any other linux file system)
+ * - Change getTestDir() to the mounted directory
+ * - Run the test, and when the test prompts so, disconnect that disk
+ *
+ *
+ * @author clebert
+ *
+ *
+ */
+public class DisconnectDiskTest extends ServiceTestBase
+{
+
+ Logger log = Logger.getLogger(DisconnectDiskTest.class);
+
+ protected String getTestDir()
+ {
+ return "/media/tstClebert/hqtest";
+ }
+
+ public void testIOError() throws Exception
+ {
+
+ String ADDRESS = "testAddress";
+ String QUEUE = "testQueue";
+ HornetQServer server = createServer(true, false);
+
+ server.getConfiguration().setJournalType(JournalType.NIO);
+
+ try
+ {
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS, QUEUE, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientSession session2 = sf.createSession(true, true, 0);
+ session2.start();
+
+ ClientConsumer consumer = session2.createConsumer(QUEUE);
+
+ int count = 0;
+ int countReceive = 0;
+
+ int loopCount = 0;
+
+ while (true)
+ {
+
+ loopCount++;
+
+ if (loopCount == 10)
+ {
+ // it wasn't possible to just get a notification when the file is deleted, on either AIO or NIO, for that
+ // reason you have to actually disconnect the disk
+ // deleteDirectory(new File(getTestDir()));
+ System.out.println("Disconnect disk now!");
+ Thread.sleep(5000);
+ }
+
+ try
+ {
+ for (int i = 0; i < 20; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("tst", count++);
+ producer.send(msg);
+ }
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ if (loopCount != 10)
+ {
+ throw e;
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (loopCount >= 10)
+ {
+ fail("Exception expected");
+ }
+
+ System.out.println("Sent 20 messages");
+
+ for (int i = 0; i < 20; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ Assert.assertEquals(countReceive++, msg.getIntProperty("tst").intValue());
+ msg.acknowledge();
+ }
+ System.out.println("Received 20 messages");
+ }
+
+ }
+ finally
+ {
+ Thread.sleep(1000);
+ AsynchronousFileImpl.resetMaxAIO();
+
+ disableCheckThread();
+ assertFalse(server.isStarted());
+
+ }
+
+ }
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-29 03:25:40 UTC (rev 11441)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-29 03:26:28 UTC (rev 11442)
@@ -757,4 +757,11 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#onIOError(java.lang.Exception, java.lang.String, org.hornetq.core.journal.SequentialFile)
+ */
+ public void onIOError(int errorCode, String message, SequentialFile file)
+ {
+ }
+
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-29 03:25:40 UTC (rev 11441)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-29 03:26:28 UTC (rev 11442)
@@ -1697,6 +1697,15 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
+ */
+ public void stop(boolean ioCriticalError) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-29 03:25:40 UTC (rev 11441)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-29 03:26:28 UTC (rev 11442)
@@ -115,6 +115,13 @@
// There is a verification about thread leakages. We only fail a single thread when this happens
private static Set<Thread> alreadyFailedThread = new HashSet<Thread>();
+
+ private boolean checkThread = true;
+
+ protected void disableCheckThread()
+ {
+ checkThread = false;
+ }
// Static --------------------------------------------------------
@@ -941,36 +948,46 @@
}
}
- StringBuffer buffer = null;
+ if (checkThread)
+ {
+ StringBuffer buffer = null;
- boolean failed = true;
+ boolean failed = true;
+
- long timeout = System.currentTimeMillis() + 60000;
- while (failed && timeout > System.currentTimeMillis())
- {
- buffer = new StringBuffer();
+ long timeout = System.currentTimeMillis() + 60000;
+ while (failed && timeout > System.currentTimeMillis())
+ {
+ buffer = new StringBuffer();
+
+ failed = checkThread(buffer);
+
+ if (failed)
+ {
+ forceGC();
+ Thread.sleep(500);
+ log.info("There are still threads running, trying again");
+ }
+ }
- failed = checkThread(buffer);
-
if (failed)
{
- forceGC();
- Thread.sleep(500);
- log.info("There are still threads running, trying again");
+ logAndSystemOut("Thread leaked on test " + this.getClass().getName() +
+ "::" +
+ this.getName() +
+ "\n" +
+ buffer.toString());
+ logAndSystemOut("Thread leakage");
+
+ fail("Thread leaked");
}
+
}
-
- if (failed)
+ else
{
- logAndSystemOut("Thread leaked on test " + this.getClass().getName() +
- "::" +
- this.getName() +
- "\n" +
- buffer.toString());
- logAndSystemOut("Thread leakage");
-
- fail("Thread leaked");
+ checkThread = true;
}
+
super.tearDown();
}
13 years, 2 months
JBoss hornetq SVN: r11441 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: asyncio/impl and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-09-28 23:25:40 -0400 (Wed, 28 Sep 2011)
New Revision: 11441
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
JBPAPP-7205 - disconnected journal
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.asyncio;
+
+/**
+ * A IOExceptionListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOExceptionListener
+{
+ void onIOException(int code, String message);
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -29,6 +29,7 @@
import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.ReusableLatch;
@@ -160,6 +161,9 @@
private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
+
+ /** A callback for IO errors when they happen */
+ private final IOExceptionListener ioExceptionListener;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -180,12 +184,18 @@
* @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
* @param pollerExecutor The thread pool that will initialize poller handlers
*/
- public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final IOExceptionListener ioExceptionListener )
{
this.writeExecutor = writeExecutor;
this.pollerExecutor = pollerExecutor;
+ this.ioExceptionListener = ioExceptionListener;
}
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
+ {
+ this(writeExecutor, pollerExecutor, null);
+ }
+
public void open(final String fileName, final int maxIO) throws HornetQException
{
writeLock.lock();
@@ -276,7 +286,15 @@
public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
{
- writeInternal(handler, positionToWrite, size, bytes);
+ try
+ {
+ writeInternal(handler, positionToWrite, size, bytes);
+ }
+ catch (HornetQException e)
+ {
+ fireExceptionListener(e.getCode(), e.getMessage());
+ throw e;
+ }
if (bufferCallback != null)
{
bufferCallback.bufferDone(bytes);
@@ -522,6 +540,8 @@
final String errorMessage)
{
AsynchronousFileImpl.log.warn("CallbackError: " + errorMessage);
+
+ fireExceptionListener(errorCode, errorMessage);
maxIOSemaphore.release();
@@ -561,6 +581,18 @@
}
}
+ /**
+ * @param errorCode
+ * @param errorMessage
+ */
+ private void fireExceptionListener(final int errorCode, final String errorMessage)
+ {
+ if (ioExceptionListener != null)
+ {
+ ioExceptionListener.onIOException(errorCode, errorMessage);
+ }
+ }
+
private void pollEvents()
{
if (!opened)
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+/**
+ * A IOCriticalErrorListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOCriticalErrorListener
+{
+ void onIOException(int code, String message, SequentialFile file);
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -31,6 +31,9 @@
List<String> listFiles(String extension) throws Exception;
boolean isSupportsCallbacks();
+
+ /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */
+ void onIOError(int errorCode, String message, SequentialFile file);
/**
* Note: You need to release the buffer if is used for reading operations.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -17,8 +17,10 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
@@ -32,7 +34,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class AIOSequentialFile extends AbstractSequentialFile
+public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener
{
private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
@@ -185,9 +187,17 @@
{
opened = true;
- aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor);
+ aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this);
- aioFile.open(getFile().getAbsolutePath(), maxIO);
+ try
+ {
+ aioFile.open(getFile().getAbsolutePath(), maxIO);
+ }
+ catch (HornetQException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ throw e;
+ }
position.set(0);
@@ -251,6 +261,15 @@
// Public methods
// -----------------------------------------------------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.IOExceptionListener#onException(int, java.lang.String)
+ */
+ public void onIOException(int code, String message)
+ {
+ factory.onIOError(code, message, this);
+ }
+
+
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
@@ -313,4 +332,5 @@
throw new IllegalStateException("File not opened");
}
}
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -25,6 +25,7 @@
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.HornetQThreadFactory;
@@ -60,17 +61,36 @@
this(journalDir,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
- false);
+ false,
+ null);
}
+ public AIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener)
+ {
+ this(journalDir,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+ false,
+ listener);
+ }
+
public AIOSequentialFileFactory(final String journalDir,
final int bufferSize,
final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, true, bufferSize, bufferTimeout, logRates);
+ this(journalDir, bufferSize, bufferTimeout, logRates, null);
}
+ public AIOSequentialFileFactory(final String journalDir,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates,
+ final IOCriticalErrorListener listener)
+ {
+ super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
+ }
+
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
return new AIOSequentialFile(this,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -52,8 +53,10 @@
protected final TimedBuffer timedBuffer;
protected final int bufferSize;
-
+
protected final long bufferTimeout;
+
+ private final IOCriticalErrorListener critialErrorListener;
/**
* Asynchronous writes need to be done at another executor.
@@ -66,7 +69,8 @@
final boolean buffered,
final int bufferSize,
final int bufferTimeout,
- final boolean logRates)
+ final boolean logRates,
+ final IOCriticalErrorListener criticalErrorListener)
{
this.journalDir = journalDir;
@@ -80,6 +84,7 @@
}
this.bufferSize = bufferSize;
this.bufferTimeout = bufferTimeout;
+ this.critialErrorListener = criticalErrorListener;
}
public void stop()
@@ -124,6 +129,19 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#onIOError(java.lang.Exception, java.lang.String, org.hornetq.core.journal.SequentialFile)
+ */
+ public void onIOError(int errorCode, String message, SequentialFile file)
+ {
+ if (critialErrorListener != null)
+ {
+ critialErrorListener.onIOException(errorCode, message, file);
+ }
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
*/
public void activateBuffer(final SequentialFile file)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -13,6 +13,8 @@
package org.hornetq.core.journal.impl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
+
/**
* This is an undocumented class, that will open a journal and force compacting on it.
* It may be used under special cases, but it shouldn't be needed under regular circumstances as the system should detect
@@ -37,7 +39,7 @@
try
{
- CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+ CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), null);
}
catch (Exception e)
{
@@ -50,9 +52,10 @@
final String journalPrefix,
final String journalSuffix,
final int minFiles,
- final int fileSize) throws Exception
+ final int fileSize,
+ final IOCriticalErrorListener listener) throws Exception
{
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -92,7 +92,7 @@
final int fileSize,
final PrintStream out) throws Exception
{
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -105,7 +105,7 @@
journalDir.mkdirs();
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -94,12 +94,20 @@
public void open(final int maxIO, final boolean useExecutor) throws Exception
{
- rfile = new RandomAccessFile(getFile(), "rw");
+ try
+ {
+ rfile = new RandomAccessFile(getFile(), "rw");
+
+ channel = rfile.getChannel();
+
+ fileSize = channel.size();
+ }
+ catch (IOException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ throw e;
+ }
- channel = rfile.getChannel();
-
- fileSize = channel.size();
-
if (writerExecutor != null && useExecutor)
{
maxIOSemaphore = new Semaphore(maxIO);
@@ -193,15 +201,21 @@
return bytesRead;
}
- catch (Exception e)
+ catch (IOException e)
{
if (callback != null)
{
callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
}
+
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
throw e;
}
+ catch (Exception e)
+ {
+ throw e;
+ }
}
@@ -297,9 +311,17 @@
position.addAndGet(bytes.limit());
- if (maxIOSemaphore == null)
+ if (maxIOSemaphore == null || callback == null)
{
- doInternalWrite(bytes, sync, callback);
+ // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
+ try
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ catch (IOException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ }
}
else
{
@@ -316,6 +338,12 @@
{
doInternalWrite(bytes, sync, callback);
}
+ catch (IOException e)
+ {
+ NIOSequentialFile.log.warn("Exception on submitting write", e);
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), NIOSequentialFile.this);
+ callback.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
catch (Throwable e)
{
NIOSequentialFile.log.warn("Exception on submitting write", e);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -34,20 +35,34 @@
public NIOSequentialFileFactory(final String journalDir)
{
+ this(journalDir, null);
+ }
+
+ public NIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener)
+ {
this(journalDir,
false,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false);
+ false,
+ listener);
}
public NIOSequentialFileFactory(final String journalDir, final boolean buffered)
{
+ this(journalDir, buffered, null);
+ }
+
+ public NIOSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final IOCriticalErrorListener listener)
+ {
this(journalDir,
buffered,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false);
+ false,
+ listener);
}
public NIOSequentialFileFactory(final String journalDir,
@@ -56,9 +71,19 @@
final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, buffered, bufferSize, bufferTimeout, logRates);
+ this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
}
+ public NIOSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates,
+ final IOCriticalErrorListener listener)
+ {
+ super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
+ }
+
public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
if (maxIO < 1)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -60,11 +60,11 @@
{
if (AIO)
{
- fileFactory = new AIOSequentialFileFactory(".", 0, 0, false);
+ fileFactory = new AIOSequentialFileFactory(".", 0, 0, false, null);
}
else
{
- fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false);
+ fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false, null);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -93,7 +93,7 @@
return executor;
}
};
- PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false, null);
HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
StorageManager sm = new NullStorageManager();
@@ -176,7 +176,7 @@
*/
protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
{
- SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation, null);
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -62,15 +63,17 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
-
+
private final ScheduledExecutorService scheduledExecutor;
-
+
private final long syncTimeout;
private StorageManager storageManager;
private PostOffice postOffice;
+ private final IOCriticalErrorListener critialErrorListener;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -81,15 +84,27 @@
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
+ this(directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, null);
+ }
+
+ public PagingStoreFactoryNIO(final String directory,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener critialErrorListener)
+ {
this.directory = directory;
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.scheduledExecutor = scheduledExecutor;
-
+
this.syncTimeout = syncTimeout;
+
+ this.critialErrorListener = critialErrorListener;
}
// Public --------------------------------------------------------
@@ -231,24 +246,24 @@
protected SequentialFileFactory newFileFactory(final String directoryName)
{
- return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
+ return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false, critialErrorListener);
}
-
+
protected PagingManager getPagingManager()
{
return pagingManager;
}
-
+
protected StorageManager getStorageManager()
{
return storageManager;
}
-
+
protected PostOffice getPostOffice()
{
return postOffice;
}
-
+
protected ExecutorFactory getExecutorFactory()
{
return executorFactory;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -66,6 +66,12 @@
/** Set the context back to the thread */
void setContext(OperationContext context);
+
+ /**
+ *
+ * @param ioCriticalError is the server being stopped due to an IO critical error
+ */
+ void stop(boolean ioCriticalError) throws Exception;
// Message related operations
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -45,6 +45,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -128,7 +129,7 @@
public static final byte SECURITY_RECORD = 26;
// Message journal record types
-
+
// This is used when a large message is created but not yet stored on the system.
// We use this to avoid temporary files missing
public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
@@ -206,8 +207,16 @@
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
- final ReplicationManager replicator)
+ final IOCriticalErrorListener criticalErrorListener)
{
+ this(config, executorFactory, null, criticalErrorListener);
+ }
+
+ public JournalStorageManager(final Configuration config,
+ final ExecutorFactory executorFactory,
+ final ReplicationManager replicator,
+ final IOCriticalErrorListener criticalErrorListener)
+ {
this.executorFactory = executorFactory;
executor = executorFactory.getExecutor();
@@ -230,7 +239,7 @@
journalDir = config.getJournalDirectory();
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, criticalErrorListener);
Journal localBindings = new JournalImpl(1024 * 1024,
2,
@@ -270,7 +279,8 @@
journalFF = new AIOSequentialFileFactory(journalDir,
config.getJournalBufferSize_AIO(),
config.getJournalBufferTimeout_AIO(),
- config.isLogJournalWriteRate());
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else if (config.getJournalType() == JournalType.NIO)
{
@@ -279,7 +289,8 @@
true,
config.getJournalBufferSize_NIO(),
config.getJournalBufferTimeout_NIO(),
- config.isLogJournalWriteRate());
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else
{
@@ -315,7 +326,7 @@
largeMessagesDirectory = config.getLargeMessagesDirectory();
- largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false);
+ largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
perfBlastPages = config.getJournalPerfBlastPages();
}
@@ -470,7 +481,7 @@
{
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
-
+
largeMessage.setPendingRecordID(pendingRecordID);
}
@@ -478,33 +489,34 @@
}
// Non transactional operations
-
+
public long storePendingLargeMessage(final long messageID) throws Exception
{
long recordID = generateUniqueID();
-
+
messageJournal.appendAddRecord(recordID,
ADD_LARGE_MESSAGE_PENDING,
new PendingLargeMessageEncoding(messageID),
true,
getContext(true));
-
+
return recordID;
}
-
+
public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
{
installLargeMessageConfirmationOnTX(tx, recordID);
- messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ messageJournal.appendDeleteRecordTransactional(tx.getID(),
+ recordID,
+ new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
-
-
+
/** We don't need messageID now but we are likely to need it we ever decide to support a database */
public void confirmPendingLargeMessage(long recordID) throws Exception
{
messageJournal.appendDeleteRecord(recordID, true, getContext());
}
-
+
public void storeMessage(final ServerMessage message) throws Exception
{
if (message.getMessageID() <= 0)
@@ -739,7 +751,8 @@
messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional)
{
- // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if syncTransactional = false
+ // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if
+ // syncTransactional = false
getContext(true).done();
}
}
@@ -785,11 +798,11 @@
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
-
+
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
-
+
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -902,9 +915,9 @@
case ADD_LARGE_MESSAGE_PENDING:
{
PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
-
+
pending.decode(buff);
-
+
if (pendingLargeMessages != null)
{
// it could be null on tests, and we don't need anything on that case
@@ -974,12 +987,14 @@
if (queueMessages == null)
{
- log.error("Cannot find queue messages for queueID=" + encoding.queueID + " on ack for messageID=" + messageID);
+ log.error("Cannot find queue messages for queueID=" + encoding.queueID +
+ " on ack for messageID=" +
+ messageID);
}
else
{
AddMessageRecord rec = queueMessages.remove(messageID);
-
+
if (rec == null)
{
log.error("Cannot find message " + messageID);
@@ -1055,13 +1070,16 @@
if (queueMessages == null)
{
- log.error("Cannot find queue messages " + encoding.queueID + " for message " + messageID + " while processing scheduled messages");
+ log.error("Cannot find queue messages " + encoding.queueID +
+ " for message " +
+ messageID +
+ " while processing scheduled messages");
}
else
{
-
+
AddMessageRecord rec = queueMessages.get(messageID);
-
+
if (rec == null)
{
log.error("Cannot find message " + messageID);
@@ -1189,19 +1207,20 @@
continue;
}
-
- // Redistribution could install a Redistributor while we are still loading records, what will be an issue with prepared ACKs
+
+ // Redistribution could install a Redistributor while we are still loading records, what will be an issue with
+ // prepared ACKs
// We make sure te Queue is paused before we reroute values.
queue.pause();
Collection<AddMessageRecord> valueRecords = queueRecords.values();
-
+
long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
-
+
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime)
{
scheduledDeliveryTime = 0;
@@ -1277,7 +1296,7 @@
{
messageJournal.perfBlast(perfBlastPages);
}
-
+
for (Queue queue : queues.values())
{
queue.resume();
@@ -1418,7 +1437,7 @@
public static void describeBindingJournal(final String bindingsDir) throws Exception
{
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "hornetq-bindings", "bindings", 1);
@@ -1428,7 +1447,7 @@
public static void describeMessagesJournal(final String messagesDir) throws Exception
{
- SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir);
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null);
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
@@ -1495,7 +1514,6 @@
return bindingsInfo;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#lineUpContext()
@@ -1505,7 +1523,6 @@
messageJournal.lineUpContex(getContext());
}
-
// HornetQComponent implementation
// ------------------------------------------------------
@@ -1535,14 +1552,19 @@
started = true;
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
+ stop(false);
+ }
+
+ public synchronized void stop(boolean ioCriticalError) throws Exception
+ {
if (!started)
{
return;
}
- if (journalLoaded && idGenerator != null)
+ if (!ioCriticalError && journalLoaded && idGenerator != null)
{
// Must call close to make sure last id is persisted
idGenerator.close();
@@ -1792,7 +1814,7 @@
}
MessageReference removed = queue.removeReferenceWithID(messageID);
-
+
if (removed == null)
{
log.warn("Failed to remove reference for " + messageID);
@@ -1914,12 +1936,12 @@
for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
{
byte[] data = recordDeleted.data;
-
+
if (data.length > 0)
{
HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte b = buff.readByte();
-
+
switch (b)
{
case ADD_LARGE_MESSAGE_PENDING:
@@ -1933,7 +1955,7 @@
log.warn("can't locate recordType=" + b + " on loadPreparedTransaction//deleteRecords");
}
}
-
+
}
for (MessageReference ack : referencesToAck)
@@ -2378,7 +2400,7 @@
{
return DataConstants.SIZE_LONG;
}
-
+
public String toString()
{
return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
@@ -2476,9 +2498,9 @@
public static class DeleteEncoding implements EncodingSupport
{
public byte recordType;
-
+
public long id;
-
+
public DeleteEncoding()
{
super();
@@ -2683,7 +2705,7 @@
// SimpleString simpleStr = new SimpleString(duplID);
// return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
-
+
return "DuplicateIDEncoding [address=" + address + ", duplID=" + Arrays.toString(duplID) + "]";
}
@@ -2986,7 +3008,7 @@
{
PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
lmEncoding.decode(buffer);
-
+
return lmEncoding;
}
case ADD_LARGE_MESSAGE:
@@ -3466,7 +3488,7 @@
journal.stop();
}
-
+
private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
{
TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
@@ -3477,14 +3499,12 @@
}
txoper.confirmedMessages.add(recordID);
}
-
-
-
+
class TXLargeMessageConfirmationOperation implements TransactionOperation
{
-
- public List<Long> confirmedMessages = new LinkedList<Long>();
+ public List<Long> confirmedMessages = new LinkedList<Long>();
+
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
*/
@@ -3548,8 +3568,7 @@
{
return null;
}
-
+
}
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -568,8 +568,6 @@
*/
public void lineUpContext()
{
- // TODO Auto-generated method stub
-
}
/* (non-Javadoc)
@@ -586,4 +584,11 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
+ */
+ public void stop(boolean ioCriticalError) throws Exception
+ {
+ }
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
@@ -67,6 +68,8 @@
// Attributes ----------------------------------------------------
private static final boolean trace = ReplicationEndpointImpl.log.isTraceEnabled();
+
+ private final IOCriticalErrorListener criticalErrorListener;
private static void trace(final String msg)
{
@@ -93,9 +96,10 @@
private boolean deletePages = true;
// Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServer server)
+ public ReplicationEndpointImpl(final HornetQServer server, IOCriticalErrorListener criticalErrorListener)
{
this.server = server;
+ this.criticalErrorListener = criticalErrorListener;
}
// Public --------------------------------------------------------
@@ -207,7 +211,7 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config, server.getExecutorFactory());
+ storage = new JournalStorageManager(config, server.getExecutorFactory(), criticalErrorListener);
storage.start();
server.getManagementService().setStorageManager(storage);
@@ -222,7 +226,7 @@
config.getJournalBufferSize_NIO(),
server.getScheduledPool(),
server.getExecutorFactory(),
- config.isJournalSyncNonTransactional()),
+ config.isJournalSyncNonTransactional(), criticalErrorListener),
storage,
server.getAddressSettingsRepository());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -117,6 +117,8 @@
Set<ServerSession> getSessions();
boolean isStarted();
+
+ boolean isStopped();
HierarchicalRepository<Set<Role>> getSecurityRepository();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -83,7 +83,15 @@
if (!file.exists())
{
- fileCreated = file.createNewFile();
+ try
+ {
+ fileCreated = file.createNewFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("can't open file " + file, e);
+ throw e;
+ }
if (!fileCreated)
{
throw new IllegalStateException("Unable to create server lock file");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-29 03:25:40 UTC (rev 11441)
@@ -56,7 +56,9 @@
import org.hornetq.core.deployers.impl.SecurityDeployer;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -235,6 +237,8 @@
private Thread backupActivationThread;
private Activation activation;
+
+ private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
// Constructors
// ---------------------------------------------------------------------------------
@@ -454,6 +458,11 @@
public void stop(boolean failoverOnServerShutdown) throws Exception
{
+ stop(failoverOnServerShutdown, false);
+ }
+
+ protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
+ {
synchronized (this)
{
if (!started)
@@ -520,7 +529,7 @@
pagingManager.stop();
}
- if (storageManager != null)
+ if (!criticalIOError && storageManager != null)
{
storageManager.stop();
}
@@ -745,6 +754,11 @@
{
return started;
}
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
public ClusterManager getClusterManager()
{
@@ -1189,7 +1203,8 @@
(long)configuration.getJournalBufferSize_NIO(),
scheduledPool,
executorFactory,
- configuration.isJournalSyncNonTransactional()),
+ configuration.isJournalSyncNonTransactional(),
+ shutdownOnCriticalIO),
storageManager,
addressSettingsRepository);
}
@@ -1201,7 +1216,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, executorFactory, replicationManager);
+ return new JournalStorageManager(configuration, executorFactory, replicationManager, shutdownOnCriticalIO);
}
else
{
@@ -2000,6 +2015,36 @@
}
}
}
+
+ private class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
+ {
+ boolean failedAlready = false;
+
+ public synchronized void onIOException(int code, String message, SequentialFile file)
+ {
+ if (!failedAlready)
+ {
+ failedAlready = true;
+
+ log.warn("Critical IO Error, shutting down the server. code=" + code + ", message=" + message);
+
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ HornetQServerImpl.this.stop(true, true);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }.run();
+ }
+ }
+ }
private interface Activation extends Runnable
{
13 years, 2 months