Author: clebert.suconic
Date: 2011-09-28 23:25:40 -0400 (Wed, 28 Sep 2011)
New Revision: 11441
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
JBPAPP-7205 - disconnected journal
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.asyncio;
+
+/**
+ * A IOExceptionListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOExceptionListener
+{
+ void onIOException(int code, String message);
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -29,6 +29,7 @@
import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.ReusableLatch;
@@ -160,6 +161,9 @@
private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
+
+ /** A callback for IO errors when they happen */
+ private final IOExceptionListener ioExceptionListener;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -180,12 +184,18 @@
* @param writeExecutor It needs to be a single Thread executor. If null it will use
the user thread to execute write operations
* @param pollerExecutor The thread pool that will initialize poller handlers
*/
- public AsynchronousFileImpl(final Executor writeExecutor, final Executor
pollerExecutor)
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor
pollerExecutor, final IOExceptionListener ioExceptionListener )
{
this.writeExecutor = writeExecutor;
this.pollerExecutor = pollerExecutor;
+ this.ioExceptionListener = ioExceptionListener;
}
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor
pollerExecutor)
+ {
+ this(writeExecutor, pollerExecutor, null);
+ }
+
public void open(final String fileName, final int maxIO) throws HornetQException
{
writeLock.lock();
@@ -276,7 +286,15 @@
public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws
HornetQException
{
- writeInternal(handler, positionToWrite, size, bytes);
+ try
+ {
+ writeInternal(handler, positionToWrite, size, bytes);
+ }
+ catch (HornetQException e)
+ {
+ fireExceptionListener(e.getCode(), e.getMessage());
+ throw e;
+ }
if (bufferCallback != null)
{
bufferCallback.bufferDone(bytes);
@@ -522,6 +540,8 @@
final String errorMessage)
{
AsynchronousFileImpl.log.warn("CallbackError: " + errorMessage);
+
+ fireExceptionListener(errorCode, errorMessage);
maxIOSemaphore.release();
@@ -561,6 +581,18 @@
}
}
+ /**
+ * @param errorCode
+ * @param errorMessage
+ */
+ private void fireExceptionListener(final int errorCode, final String errorMessage)
+ {
+ if (ioExceptionListener != null)
+ {
+ ioExceptionListener.onIOException(errorCode, errorMessage);
+ }
+ }
+
private void pollEvents()
{
if (!opened)
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+/**
+ * A IOCriticalErrorListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOCriticalErrorListener
+{
+ void onIOException(int code, String message, SequentialFile file);
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -31,6 +31,9 @@
List<String> listFiles(String extension) throws Exception;
boolean isSupportsCallbacks();
+
+ /** The SequentialFile will call this method when a disk IO Error happens during the
live phase. */
+ void onIOError(int errorCode, String message, SequentialFile file);
/**
* Note: You need to release the buffer if is used for reading operations.
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -17,8 +17,10 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
@@ -32,7 +34,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
-public class AIOSequentialFile extends AbstractSequentialFile
+public class AIOSequentialFile extends AbstractSequentialFile implements
IOExceptionListener
{
private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
@@ -185,9 +187,17 @@
{
opened = true;
- aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null,
pollerExecutor);
+ aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null,
pollerExecutor, this);
- aioFile.open(getFile().getAbsolutePath(), maxIO);
+ try
+ {
+ aioFile.open(getFile().getAbsolutePath(), maxIO);
+ }
+ catch (HornetQException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ throw e;
+ }
position.set(0);
@@ -251,6 +261,15 @@
// Public methods
//
-----------------------------------------------------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.IOExceptionListener#onException(int,
java.lang.String)
+ */
+ public void onIOException(int code, String message)
+ {
+ factory.onIOError(code, message, this);
+ }
+
+
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
@@ -313,4 +332,5 @@
throw new IllegalStateException("File not opened");
}
}
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -25,6 +25,7 @@
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.HornetQThreadFactory;
@@ -60,17 +61,36 @@
this(journalDir,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
- false);
+ false,
+ null);
}
+ public AIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener
listener)
+ {
+ this(journalDir,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+ false,
+ listener);
+ }
+
public AIOSequentialFileFactory(final String journalDir,
final int bufferSize,
final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, true, bufferSize, bufferTimeout, logRates);
+ this(journalDir, bufferSize, bufferTimeout, logRates, null);
}
+ public AIOSequentialFileFactory(final String journalDir,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates,
+ final IOCriticalErrorListener listener)
+ {
+ super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
+ }
+
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
return new AIOSequentialFile(this,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -52,8 +53,10 @@
protected final TimedBuffer timedBuffer;
protected final int bufferSize;
-
+
protected final long bufferTimeout;
+
+ private final IOCriticalErrorListener critialErrorListener;
/**
* Asynchronous writes need to be done at another executor.
@@ -66,7 +69,8 @@
final boolean buffered,
final int bufferSize,
final int bufferTimeout,
- final boolean logRates)
+ final boolean logRates,
+ final IOCriticalErrorListener
criticalErrorListener)
{
this.journalDir = journalDir;
@@ -80,6 +84,7 @@
}
this.bufferSize = bufferSize;
this.bufferTimeout = bufferTimeout;
+ this.critialErrorListener = criticalErrorListener;
}
public void stop()
@@ -124,6 +129,19 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#onIOError(java.lang.Exception,
java.lang.String, org.hornetq.core.journal.SequentialFile)
+ */
+ public void onIOError(int errorCode, String message, SequentialFile file)
+ {
+ if (critialErrorListener != null)
+ {
+ critialErrorListener.onIOException(errorCode, message, file);
+ }
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
*/
public void activateBuffer(final SequentialFile file)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -13,6 +13,8 @@
package org.hornetq.core.journal.impl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
+
/**
* This is an undocumented class, that will open a journal and force compacting on it.
* It may be used under special cases, but it shouldn't be needed under regular
circumstances as the system should detect
@@ -37,7 +39,7 @@
try
{
- CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2,
Integer.parseInt(arg[3]));
+ CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2,
Integer.parseInt(arg[3]), null);
}
catch (Exception e)
{
@@ -50,9 +52,10 @@
final String journalPrefix,
final String journalSuffix,
final int minFiles,
- final int fileSize) throws Exception
+ final int fileSize,
+ final IOCriticalErrorListener listener) throws
Exception
{
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -92,7 +92,7 @@
final int fileSize,
final PrintStream out) throws Exception
{
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -105,7 +105,7 @@
journalDir.mkdirs();
- NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -94,12 +94,20 @@
public void open(final int maxIO, final boolean useExecutor) throws Exception
{
- rfile = new RandomAccessFile(getFile(), "rw");
+ try
+ {
+ rfile = new RandomAccessFile(getFile(), "rw");
+
+ channel = rfile.getChannel();
+
+ fileSize = channel.size();
+ }
+ catch (IOException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ throw e;
+ }
- channel = rfile.getChannel();
-
- fileSize = channel.size();
-
if (writerExecutor != null && useExecutor)
{
maxIOSemaphore = new Semaphore(maxIO);
@@ -193,15 +201,21 @@
return bytesRead;
}
- catch (Exception e)
+ catch (IOException e)
{
if (callback != null)
{
callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
}
+
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
throw e;
}
+ catch (Exception e)
+ {
+ throw e;
+ }
}
@@ -297,9 +311,17 @@
position.addAndGet(bytes.limit());
- if (maxIOSemaphore == null)
+ if (maxIOSemaphore == null || callback == null)
{
- doInternalWrite(bytes, sync, callback);
+ // if maxIOSemaphore == null, that means we are not using executors and the
writes are synchronous
+ try
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ catch (IOException e)
+ {
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+ }
}
else
{
@@ -316,6 +338,12 @@
{
doInternalWrite(bytes, sync, callback);
}
+ catch (IOException e)
+ {
+ NIOSequentialFile.log.warn("Exception on submitting
write", e);
+ factory.onIOError(HornetQException.IO_ERROR, e.getMessage(),
NIOSequentialFile.this);
+ callback.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
catch (Throwable e)
{
NIOSequentialFile.log.warn("Exception on submitting
write", e);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -34,20 +35,34 @@
public NIOSequentialFileFactory(final String journalDir)
{
+ this(journalDir, null);
+ }
+
+ public NIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener
listener)
+ {
this(journalDir,
false,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false);
+ false,
+ listener);
}
public NIOSequentialFileFactory(final String journalDir, final boolean buffered)
{
+ this(journalDir, buffered, null);
+ }
+
+ public NIOSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final IOCriticalErrorListener listener)
+ {
this(journalDir,
buffered,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false);
+ false,
+ listener);
}
public NIOSequentialFileFactory(final String journalDir,
@@ -56,9 +71,19 @@
final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, buffered, bufferSize, bufferTimeout, logRates);
+ this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
}
+ public NIOSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates,
+ final IOCriticalErrorListener listener)
+ {
+ super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
+ }
+
public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
if (maxIO < 1)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -60,11 +60,11 @@
{
if (AIO)
{
- fileFactory = new AIOSequentialFileFactory(".", 0, 0, false);
+ fileFactory = new AIOSequentialFileFactory(".", 0, 0, false, null);
}
else
{
- fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false);
+ fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false,
null);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-09-28
16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -93,7 +93,7 @@
return executor;
}
};
- PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l,
scheduled, execfactory, false);
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l,
scheduled, execfactory, false, null);
HierarchicalRepository<AddressSettings> addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
StorageManager sm = new NullStorageManager();
@@ -176,7 +176,7 @@
*/
protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>>
loadCursorACKs(final String journalLocation) throws Exception
{
- SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation,
null);
// Will use only default values. The load function should adapt to anything
different
ConfigurationImpl defaultValues = new ConfigurationImpl();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -62,15 +63,17 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
-
+
private final ScheduledExecutorService scheduledExecutor;
-
+
private final long syncTimeout;
private StorageManager storageManager;
private PostOffice postOffice;
+ private final IOCriticalErrorListener critialErrorListener;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -81,15 +84,27 @@
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
+ this(directory, syncTimeout, scheduledExecutor, executorFactory,
syncNonTransactional, null);
+ }
+
+ public PagingStoreFactoryNIO(final String directory,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener critialErrorListener)
+ {
this.directory = directory;
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.scheduledExecutor = scheduledExecutor;
-
+
this.syncTimeout = syncTimeout;
+
+ this.critialErrorListener = critialErrorListener;
}
// Public --------------------------------------------------------
@@ -231,24 +246,24 @@
protected SequentialFileFactory newFileFactory(final String directoryName)
{
- return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName,
false);
+ return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName,
false, critialErrorListener);
}
-
+
protected PagingManager getPagingManager()
{
return pagingManager;
}
-
+
protected StorageManager getStorageManager()
{
return storageManager;
}
-
+
protected PostOffice getPostOffice()
{
return postOffice;
}
-
+
protected ExecutorFactory getExecutorFactory()
{
return executorFactory;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -66,6 +66,12 @@
/** Set the context back to the thread */
void setContext(OperationContext context);
+
+ /**
+ *
+ * @param ioCriticalError is the server being stopped due to an IO critical error
+ */
+ void stop(boolean ioCriticalError) throws Exception;
// Message related operations
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -45,6 +45,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -128,7 +129,7 @@
public static final byte SECURITY_RECORD = 26;
// Message journal record types
-
+
// This is used when a large message is created but not yet stored on the system.
// We use this to avoid temporary files missing
public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
@@ -206,8 +207,16 @@
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
- final ReplicationManager replicator)
+ final IOCriticalErrorListener criticalErrorListener)
{
+ this(config, executorFactory, null, criticalErrorListener);
+ }
+
+ public JournalStorageManager(final Configuration config,
+ final ExecutorFactory executorFactory,
+ final ReplicationManager replicator,
+ final IOCriticalErrorListener criticalErrorListener)
+ {
this.executorFactory = executorFactory;
executor = executorFactory.getExecutor();
@@ -230,7 +239,7 @@
journalDir = config.getJournalDirectory();
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir,
criticalErrorListener);
Journal localBindings = new JournalImpl(1024 * 1024,
2,
@@ -270,7 +279,8 @@
journalFF = new AIOSequentialFileFactory(journalDir,
config.getJournalBufferSize_AIO(),
config.getJournalBufferTimeout_AIO(),
- config.isLogJournalWriteRate());
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else if (config.getJournalType() == JournalType.NIO)
{
@@ -279,7 +289,8 @@
true,
config.getJournalBufferSize_NIO(),
config.getJournalBufferTimeout_NIO(),
- config.isLogJournalWriteRate());
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else
{
@@ -315,7 +326,7 @@
largeMessagesDirectory = config.getLargeMessagesDirectory();
- largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory,
false);
+ largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false,
criticalErrorListener);
perfBlastPages = config.getJournalPerfBlastPages();
}
@@ -470,7 +481,7 @@
{
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
-
+
largeMessage.setPendingRecordID(pendingRecordID);
}
@@ -478,33 +489,34 @@
}
// Non transactional operations
-
+
public long storePendingLargeMessage(final long messageID) throws Exception
{
long recordID = generateUniqueID();
-
+
messageJournal.appendAddRecord(recordID,
ADD_LARGE_MESSAGE_PENDING,
new PendingLargeMessageEncoding(messageID),
true,
getContext(true));
-
+
return recordID;
}
-
+
public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long
recordID) throws Exception
{
installLargeMessageConfirmationOnTX(tx, recordID);
- messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new
DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ messageJournal.appendDeleteRecordTransactional(tx.getID(),
+ recordID,
+ new
DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
-
-
+
/** We don't need messageID now but we are likely to need it we ever decide to
support a database */
public void confirmPendingLargeMessage(long recordID) throws Exception
{
messageJournal.appendDeleteRecord(recordID, true, getContext());
}
-
+
public void storeMessage(final ServerMessage message) throws Exception
{
if (message.getMessageID() <= 0)
@@ -739,7 +751,8 @@
messageJournal.appendCommitRecord(txID, syncTransactional,
getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional)
{
- // if lineUpContext == false, we have previously lined up a context, hence we
need to mark it as done even if syncTransactional = false
+ // if lineUpContext == false, we have previously lined up a context, hence we
need to mark it as done even if
+ // syncTransactional = false
getContext(true).done();
}
}
@@ -785,11 +798,11 @@
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
-
+
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
-
+
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -902,9 +915,9 @@
case ADD_LARGE_MESSAGE_PENDING:
{
PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
-
+
pending.decode(buff);
-
+
if (pendingLargeMessages != null)
{
// it could be null on tests, and we don't need anything on that
case
@@ -974,12 +987,14 @@
if (queueMessages == null)
{
- log.error("Cannot find queue messages for queueID=" +
encoding.queueID + " on ack for messageID=" + messageID);
+ log.error("Cannot find queue messages for queueID=" +
encoding.queueID +
+ " on ack for messageID=" +
+ messageID);
}
else
{
AddMessageRecord rec = queueMessages.remove(messageID);
-
+
if (rec == null)
{
log.error("Cannot find message " + messageID);
@@ -1055,13 +1070,16 @@
if (queueMessages == null)
{
- log.error("Cannot find queue messages " + encoding.queueID +
" for message " + messageID + " while processing scheduled
messages");
+ log.error("Cannot find queue messages " + encoding.queueID +
+ " for message " +
+ messageID +
+ " while processing scheduled messages");
}
else
{
-
+
AddMessageRecord rec = queueMessages.get(messageID);
-
+
if (rec == null)
{
log.error("Cannot find message " + messageID);
@@ -1189,19 +1207,20 @@
continue;
}
-
- // Redistribution could install a Redistributor while we are still loading
records, what will be an issue with prepared ACKs
+
+ // Redistribution could install a Redistributor while we are still loading
records, what will be an issue with
+ // prepared ACKs
// We make sure te Queue is paused before we reroute values.
queue.pause();
Collection<AddMessageRecord> valueRecords = queueRecords.values();
-
+
long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
-
+
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <=
currentTime)
{
scheduledDeliveryTime = 0;
@@ -1277,7 +1296,7 @@
{
messageJournal.perfBlast(perfBlastPages);
}
-
+
for (Queue queue : queues.values())
{
queue.resume();
@@ -1418,7 +1437,7 @@
public static void describeBindingJournal(final String bindingsDir) throws Exception
{
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir,
null);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF,
"hornetq-bindings", "bindings", 1);
@@ -1428,7 +1447,7 @@
public static void describeMessagesJournal(final String messagesDir) throws Exception
{
- SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir);
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir,
null);
// Will use only default values. The load function should adapt to anything
different
ConfigurationImpl defaultValues = new ConfigurationImpl();
@@ -1495,7 +1514,6 @@
return bindingsInfo;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#lineUpContext()
@@ -1505,7 +1523,6 @@
messageJournal.lineUpContex(getContext());
}
-
// HornetQComponent implementation
// ------------------------------------------------------
@@ -1535,14 +1552,19 @@
started = true;
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
+ stop(false);
+ }
+
+ public synchronized void stop(boolean ioCriticalError) throws Exception
+ {
if (!started)
{
return;
}
- if (journalLoaded && idGenerator != null)
+ if (!ioCriticalError && journalLoaded && idGenerator != null)
{
// Must call close to make sure last id is persisted
idGenerator.close();
@@ -1792,7 +1814,7 @@
}
MessageReference removed = queue.removeReferenceWithID(messageID);
-
+
if (removed == null)
{
log.warn("Failed to remove reference for " + messageID);
@@ -1914,12 +1936,12 @@
for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
{
byte[] data = recordDeleted.data;
-
+
if (data.length > 0)
{
HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte b = buff.readByte();
-
+
switch (b)
{
case ADD_LARGE_MESSAGE_PENDING:
@@ -1933,7 +1955,7 @@
log.warn("can't locate recordType=" + b + " on
loadPreparedTransaction//deleteRecords");
}
}
-
+
}
for (MessageReference ack : referencesToAck)
@@ -2378,7 +2400,7 @@
{
return DataConstants.SIZE_LONG;
}
-
+
public String toString()
{
return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
@@ -2476,9 +2498,9 @@
public static class DeleteEncoding implements EncodingSupport
{
public byte recordType;
-
+
public long id;
-
+
public DeleteEncoding()
{
super();
@@ -2683,7 +2705,7 @@
// SimpleString simpleStr = new SimpleString(duplID);
// return "DuplicateIDEncoding [address=" + address + ",
duplID=" + simpleStr + "]";
-
+
return "DuplicateIDEncoding [address=" + address + ",
duplID=" + Arrays.toString(duplID) + "]";
}
@@ -2986,7 +3008,7 @@
{
PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
lmEncoding.decode(buffer);
-
+
return lmEncoding;
}
case ADD_LARGE_MESSAGE:
@@ -3466,7 +3488,7 @@
journal.stop();
}
-
+
private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
{
TXLargeMessageConfirmationOperation txoper =
(TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
@@ -3477,14 +3499,12 @@
}
txoper.confirmedMessages.add(recordID);
}
-
-
-
+
class TXLargeMessageConfirmationOperation implements TransactionOperation
{
-
- public List<Long> confirmedMessages = new LinkedList<Long>();
+ public List<Long> confirmedMessages = new LinkedList<Long>();
+
/* (non-Javadoc)
* @see
org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
*/
@@ -3548,8 +3568,7 @@
{
return null;
}
-
+
}
-
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -568,8 +568,6 @@
*/
public void lineUpContext()
{
- // TODO Auto-generated method stub
-
}
/* (non-Javadoc)
@@ -586,4 +584,11 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
+ */
+ public void stop(boolean ioCriticalError) throws Exception
+ {
+ }
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
@@ -67,6 +68,8 @@
// Attributes ----------------------------------------------------
private static final boolean trace = ReplicationEndpointImpl.log.isTraceEnabled();
+
+ private final IOCriticalErrorListener criticalErrorListener;
private static void trace(final String msg)
{
@@ -93,9 +96,10 @@
private boolean deletePages = true;
// Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServer server)
+ public ReplicationEndpointImpl(final HornetQServer server, IOCriticalErrorListener
criticalErrorListener)
{
this.server = server;
+ this.criticalErrorListener = criticalErrorListener;
}
// Public --------------------------------------------------------
@@ -207,7 +211,7 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config, server.getExecutorFactory());
+ storage = new JournalStorageManager(config, server.getExecutorFactory(),
criticalErrorListener);
storage.start();
server.getManagementService().setStorageManager(storage);
@@ -222,7 +226,7 @@
config.getJournalBufferSize_NIO(),
server.getScheduledPool(),
server.getExecutorFactory(),
-
config.isJournalSyncNonTransactional()),
+
config.isJournalSyncNonTransactional(), criticalErrorListener),
storage,
server.getAddressSettingsRepository());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-28
16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -117,6 +117,8 @@
Set<ServerSession> getSessions();
boolean isStarted();
+
+ boolean isStopped();
HierarchicalRepository<Set<Role>> getSecurityRepository();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -83,7 +83,15 @@
if (!file.exists())
{
- fileCreated = file.createNewFile();
+ try
+ {
+ fileCreated = file.createNewFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("can't open file " + file, e);
+ throw e;
+ }
if (!fileCreated)
{
throw new IllegalStateException("Unable to create server lock
file");
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-28
16:32:50 UTC (rev 11440)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-29
03:25:40 UTC (rev 11441)
@@ -56,7 +56,9 @@
import org.hornetq.core.deployers.impl.SecurityDeployer;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -235,6 +237,8 @@
private Thread backupActivationThread;
private Activation activation;
+
+ private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new
ShutdownOnCriticalErrorListener();
// Constructors
// ---------------------------------------------------------------------------------
@@ -454,6 +458,11 @@
public void stop(boolean failoverOnServerShutdown) throws Exception
{
+ stop(failoverOnServerShutdown, false);
+ }
+
+ protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws
Exception
+ {
synchronized (this)
{
if (!started)
@@ -520,7 +529,7 @@
pagingManager.stop();
}
- if (storageManager != null)
+ if (!criticalIOError && storageManager != null)
{
storageManager.stop();
}
@@ -745,6 +754,11 @@
{
return started;
}
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
public ClusterManager getClusterManager()
{
@@ -1189,7 +1203,8 @@
(long)configuration.getJournalBufferSize_NIO(),
scheduledPool,
executorFactory,
-
configuration.isJournalSyncNonTransactional()),
+
configuration.isJournalSyncNonTransactional(),
+ shutdownOnCriticalIO),
storageManager,
addressSettingsRepository);
}
@@ -1201,7 +1216,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, executorFactory,
replicationManager);
+ return new JournalStorageManager(configuration, executorFactory,
replicationManager, shutdownOnCriticalIO);
}
else
{
@@ -2000,6 +2015,36 @@
}
}
}
+
+ private class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
+ {
+ boolean failedAlready = false;
+
+ public synchronized void onIOException(int code, String message, SequentialFile
file)
+ {
+ if (!failedAlready)
+ {
+ failedAlready = true;
+
+ log.warn("Critical IO Error, shutting down the server. code=" +
code + ", message=" + message);
+
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ HornetQServerImpl.this.stop(true, true);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }.run();
+ }
+ }
+ }
private interface Activation extends Runnable
{