[hornetq-commits] JBoss hornetq SVN: r11441 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: asyncio/impl and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 28 23:25:42 EDT 2011


Author: clebert.suconic
Date: 2011-09-28 23:25:40 -0400 (Wed, 28 Sep 2011)
New Revision: 11441

Added:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
JBPAPP-7205 - disconnected journal

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/IOExceptionListener.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.asyncio;
+
+/**
+ * A IOExceptionListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOExceptionListener
+{
+   void onIOException(int code, String message);
+}

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -29,6 +29,7 @@
 import org.hornetq.core.asyncio.AIOCallback;
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.ReusableLatch;
 
@@ -160,6 +161,9 @@
    private Semaphore maxIOSemaphore;
 
    private BufferCallback bufferCallback;
+   
+   /** A callback for IO errors when they happen */
+   private final IOExceptionListener ioExceptionListener;
 
    /**
     *  Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -180,12 +184,18 @@
     * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
     * @param pollerExecutor The thread pool that will initialize poller handlers
     */
-   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
+   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final IOExceptionListener ioExceptionListener )
    {
       this.writeExecutor = writeExecutor;
       this.pollerExecutor = pollerExecutor;
+      this.ioExceptionListener = ioExceptionListener;
    }
 
+   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
+   {
+      this(writeExecutor, pollerExecutor, null);
+   }
+
    public void open(final String fileName, final int maxIO) throws HornetQException
    {
       writeLock.lock();
@@ -276,7 +286,15 @@
    
    public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
    {
-      writeInternal(handler, positionToWrite, size, bytes);
+      try
+      {
+         writeInternal(handler, positionToWrite, size, bytes);
+      }
+      catch (HornetQException e)
+      {
+         fireExceptionListener(e.getCode(), e.getMessage());
+         throw e;
+      }
       if (bufferCallback != null)
       {
          bufferCallback.bufferDone(bytes);
@@ -522,6 +540,8 @@
                               final String errorMessage)
    {
       AsynchronousFileImpl.log.warn("CallbackError: " + errorMessage);
+      
+      fireExceptionListener(errorCode, errorMessage);
 
       maxIOSemaphore.release();
 
@@ -561,6 +581,18 @@
       }
    }
 
+   /**
+    * @param errorCode
+    * @param errorMessage
+    */
+   private void fireExceptionListener(final int errorCode, final String errorMessage)
+   {
+      if (ioExceptionListener != null)
+      {
+         ioExceptionListener.onIOException(errorCode, errorMessage);
+      }
+   }
+
    private void pollEvents()
    {
       if (!opened)

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/IOCriticalErrorListener.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+/**
+ * A IOCriticalErrorListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOCriticalErrorListener
+{
+   void onIOException(int code, String message, SequentialFile file);
+}

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -31,6 +31,9 @@
    List<String> listFiles(String extension) throws Exception;
 
    boolean isSupportsCallbacks();
+   
+   /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */
+   void onIOError(int errorCode, String message, SequentialFile file);
 
    /**
     * Note: You need to release the buffer if is used for reading operations.

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -17,8 +17,10 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.asyncio.IOExceptionListener;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
@@ -32,7 +34,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class AIOSequentialFile extends AbstractSequentialFile
+public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener
 {
    private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
 
@@ -185,9 +187,17 @@
    {
       opened = true;
 
-      aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor);
+      aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this);
 
-      aioFile.open(getFile().getAbsolutePath(), maxIO);
+      try
+      {
+         aioFile.open(getFile().getAbsolutePath(), maxIO);
+      }
+      catch (HornetQException e)
+      {
+         factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+         throw e;
+      }
 
       position.set(0);
 
@@ -251,6 +261,15 @@
    // Public methods
    // -----------------------------------------------------------------------------------------------------
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.asyncio.IOExceptionListener#onException(int, java.lang.String)
+    */
+   public void onIOException(int code, String message)
+   {
+      factory.onIOError(code, message, this);
+   }
+   
+
    public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
    {
       if (sync)
@@ -313,4 +332,5 @@
          throw new IllegalStateException("File not opened");
       }
    }
+   
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -25,6 +25,7 @@
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -60,17 +61,36 @@
       this(journalDir,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
-           false);
+           false,
+           null);
    }
 
+   public AIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener)
+   {
+      this(journalDir,
+           ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+           ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+           false,
+           listener);
+   }
+
    public AIOSequentialFileFactory(final String journalDir,
                                    final int bufferSize,
                                    final int bufferTimeout,
                                    final boolean logRates)
    {
-      super(journalDir, true, bufferSize, bufferTimeout, logRates);
+      this(journalDir, bufferSize, bufferTimeout, logRates, null);
    }
 
+   public AIOSequentialFileFactory(final String journalDir,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
+   }
+
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
       return new AIOSequentialFile(this,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -52,8 +53,10 @@
    protected final TimedBuffer timedBuffer;
 
    protected final int bufferSize;
-
+   
    protected final long bufferTimeout;
+   
+   private final IOCriticalErrorListener critialErrorListener;  
 
    /** 
     * Asynchronous writes need to be done at another executor.
@@ -66,7 +69,8 @@
                                         final boolean buffered,
                                         final int bufferSize,
                                         final int bufferTimeout,
-                                        final boolean logRates)
+                                        final boolean logRates,
+                                        final IOCriticalErrorListener criticalErrorListener)
    {
       this.journalDir = journalDir;
 
@@ -80,6 +84,7 @@
       }
       this.bufferSize = bufferSize;
       this.bufferTimeout = bufferTimeout;
+      this.critialErrorListener = criticalErrorListener;
    }
 
    public void stop()
@@ -124,6 +129,19 @@
    }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.journal.SequentialFileFactory#onIOError(java.lang.Exception, java.lang.String, org.hornetq.core.journal.SequentialFile)
+    */
+   public void onIOError(int errorCode, String message, SequentialFile file)
+   {
+      if (critialErrorListener != null)
+      {
+         critialErrorListener.onIOException(errorCode, message, file);
+      }
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
     */
    public void activateBuffer(final SequentialFile file)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/CompactJournal.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.journal.impl;
 
+import org.hornetq.core.journal.IOCriticalErrorListener;
+
 /**
  * This is an undocumented class, that will open a journal and force compacting on it.
  * It may be used under special cases, but it shouldn't be needed under regular circumstances as the system should detect 
@@ -37,7 +39,7 @@
 
       try
       {
-         CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+         CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), null);
       }
       catch (Exception e)
       {
@@ -50,9 +52,10 @@
                                      final String journalPrefix,
                                      final String journalSuffix,
                                      final int minFiles,
-                                     final int fileSize) throws Exception
+                                     final int fileSize,
+                                     final IOCriticalErrorListener listener) throws Exception
    {
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -92,7 +92,7 @@
                                     final int fileSize,
                                     final PrintStream out) throws Exception
    {
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -105,7 +105,7 @@
 
       journalDir.mkdirs();
 
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -94,12 +94,20 @@
 
    public void open(final int maxIO, final boolean useExecutor) throws Exception
    {
-      rfile = new RandomAccessFile(getFile(), "rw");
+      try
+      {
+         rfile = new RandomAccessFile(getFile(), "rw");
+   
+         channel = rfile.getChannel();
+   
+         fileSize = channel.size();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+         throw e;
+      }
 
-      channel = rfile.getChannel();
-
-      fileSize = channel.size();
-
       if (writerExecutor != null && useExecutor)
       {
          maxIOSemaphore = new Semaphore(maxIO);
@@ -193,15 +201,21 @@
 
          return bytesRead;
       }
-      catch (Exception e)
+      catch (IOException e)
       {
          if (callback != null)
          {
             callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
          }
+         
+         factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
 
          throw e;
       }
+      catch (Exception e)
+      {
+         throw e;
+      }
 
    }
 
@@ -297,9 +311,17 @@
 
       position.addAndGet(bytes.limit());
 
-      if (maxIOSemaphore == null)
+      if (maxIOSemaphore == null || callback == null)
       {
-         doInternalWrite(bytes, sync, callback);
+         // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
+         try
+         {
+            doInternalWrite(bytes, sync, callback);
+         }
+         catch (IOException e)
+         {
+            factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), this);
+         }
       }
       else
       {
@@ -316,6 +338,12 @@
                   {
                      doInternalWrite(bytes, sync, callback);
                   }
+                  catch (IOException e)
+                  {
+                     NIOSequentialFile.log.warn("Exception on submitting write", e);
+                     factory.onIOError(HornetQException.IO_ERROR, e.getMessage(), NIOSequentialFile.this);
+                     callback.onError(HornetQException.IO_ERROR, e.getMessage());
+                  }
                   catch (Throwable e)
                   {
                      NIOSequentialFile.log.warn("Exception on submitting write", e);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -16,6 +16,7 @@
 import java.nio.ByteBuffer;
 
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -34,20 +35,34 @@
 
    public NIOSequentialFileFactory(final String journalDir)
    {
+      this(journalDir, null);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener)
+   {
       this(journalDir,
            false,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
-           false);
+           false,
+           listener);
    }
 
    public NIOSequentialFileFactory(final String journalDir, final boolean buffered)
    {
+      this(journalDir, buffered, null);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final IOCriticalErrorListener listener)
+   {
       this(journalDir,
            buffered,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
            ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
-           false);
+           false,
+           listener);
    }
 
    public NIOSequentialFileFactory(final String journalDir,
@@ -56,9 +71,19 @@
                                    final int bufferTimeout,
                                    final boolean logRates)
    {
-      super(journalDir, buffered, bufferSize, bufferTimeout, logRates);
+      this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
    }
 
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
+   }
+
    public SequentialFile createSequentialFile(final String fileName, int maxIO)
    {
       if (maxIO < 1)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -60,11 +60,11 @@
    {
       if (AIO)
       {
-         fileFactory = new AIOSequentialFileFactory(".", 0, 0, false);
+         fileFactory = new AIOSequentialFileFactory(".", 0, 0, false, null);
       }
       else
       {
-         fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false);
+         fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false, null);
       }
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -93,7 +93,7 @@
                return executor;
             }
          };
-         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false, null);
          HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
          addressSettingsRepository.setDefault(new AddressSettings());
          StorageManager sm = new NullStorageManager();
@@ -176,7 +176,7 @@
     */
    protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
    {
-      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation, null);
 
       // Will use only default values. The load function should adapt to anything different
       ConfigurationImpl defaultValues = new ConfigurationImpl();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -26,6 +26,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -62,15 +63,17 @@
    protected final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
-   
+
    private final ScheduledExecutorService scheduledExecutor;
-   
+
    private final long syncTimeout;
 
    private StorageManager storageManager;
 
    private PostOffice postOffice;
 
+   private final IOCriticalErrorListener critialErrorListener;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -81,15 +84,27 @@
                                 final ExecutorFactory executorFactory,
                                 final boolean syncNonTransactional)
    {
+      this(directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, null);
+   }
+
+   public PagingStoreFactoryNIO(final String directory,
+                                final long syncTimeout,
+                                final ScheduledExecutorService scheduledExecutor,
+                                final ExecutorFactory executorFactory,
+                                final boolean syncNonTransactional,
+                                final IOCriticalErrorListener critialErrorListener)
+   {
       this.directory = directory;
 
       this.executorFactory = executorFactory;
 
       this.syncNonTransactional = syncNonTransactional;
-      
+
       this.scheduledExecutor = scheduledExecutor;
-      
+
       this.syncTimeout = syncTimeout;
+
+      this.critialErrorListener = critialErrorListener;
    }
 
    // Public --------------------------------------------------------
@@ -231,24 +246,24 @@
 
    protected SequentialFileFactory newFileFactory(final String directoryName)
    {
-      return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
+      return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false, critialErrorListener);
    }
-   
+
    protected PagingManager getPagingManager()
    {
       return pagingManager;
    }
-   
+
    protected StorageManager getStorageManager()
    {
       return storageManager;
    }
-   
+
    protected PostOffice getPostOffice()
    {
       return postOffice;
    }
-   
+
    protected ExecutorFactory getExecutorFactory()
    {
       return executorFactory;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -66,6 +66,12 @@
 
    /** Set the context back to the thread */
    void setContext(OperationContext context);
+   
+   /**
+    * 
+    * @param ioCriticalError is the server being stopped due to an IO critical error
+    */
+   void stop(boolean ioCriticalError) throws Exception;
 
    // Message related operations
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -45,6 +45,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -128,7 +129,7 @@
    public static final byte SECURITY_RECORD = 26;
 
    // Message journal record types
-   
+
    // This is used when a large message is created but not yet stored on the system.
    // We use this to avoid temporary files missing
    public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
@@ -206,8 +207,16 @@
 
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
-                                final ReplicationManager replicator)
+                                final IOCriticalErrorListener criticalErrorListener)
    {
+      this(config, executorFactory, null, criticalErrorListener);
+   }
+
+   public JournalStorageManager(final Configuration config,
+                                final ExecutorFactory executorFactory,
+                                final ReplicationManager replicator,
+                                final IOCriticalErrorListener criticalErrorListener)
+   {
       this.executorFactory = executorFactory;
 
       executor = executorFactory.getExecutor();
@@ -230,7 +239,7 @@
 
       journalDir = config.getJournalDirectory();
 
-      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, criticalErrorListener);
 
       Journal localBindings = new JournalImpl(1024 * 1024,
                                               2,
@@ -270,7 +279,8 @@
          journalFF = new AIOSequentialFileFactory(journalDir,
                                                   config.getJournalBufferSize_AIO(),
                                                   config.getJournalBufferTimeout_AIO(),
-                                                  config.isLogJournalWriteRate());
+                                                  config.isLogJournalWriteRate(),
+                                                  criticalErrorListener);
       }
       else if (config.getJournalType() == JournalType.NIO)
       {
@@ -279,7 +289,8 @@
                                                   true,
                                                   config.getJournalBufferSize_NIO(),
                                                   config.getJournalBufferTimeout_NIO(),
-                                                  config.isLogJournalWriteRate());
+                                                  config.isLogJournalWriteRate(),
+                                                  criticalErrorListener);
       }
       else
       {
@@ -315,7 +326,7 @@
 
       largeMessagesDirectory = config.getLargeMessagesDirectory();
 
-      largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false);
+      largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
 
       perfBlastPages = config.getJournalPerfBlastPages();
    }
@@ -470,7 +481,7 @@
       {
          // We store a marker on the journal that the large file is pending
          long pendingRecordID = storePendingLargeMessage(id);
-         
+
          largeMessage.setPendingRecordID(pendingRecordID);
       }
 
@@ -478,33 +489,34 @@
    }
 
    // Non transactional operations
-   
+
    public long storePendingLargeMessage(final long messageID) throws Exception
    {
       long recordID = generateUniqueID();
-       
+
       messageJournal.appendAddRecord(recordID,
                                      ADD_LARGE_MESSAGE_PENDING,
                                      new PendingLargeMessageEncoding(messageID),
                                      true,
                                      getContext(true));
-      
+
       return recordID;
    }
-   
+
    public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
    {
       installLargeMessageConfirmationOnTX(tx, recordID);
-      messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+      messageJournal.appendDeleteRecordTransactional(tx.getID(),
+                                                     recordID,
+                                                     new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
    }
-   
-   
+
    /** We don't need messageID now but we are likely to need it we ever decide to support a database */
    public void confirmPendingLargeMessage(long recordID) throws Exception
    {
       messageJournal.appendDeleteRecord(recordID, true, getContext());
    }
-   
+
    public void storeMessage(final ServerMessage message) throws Exception
    {
       if (message.getMessageID() <= 0)
@@ -739,7 +751,8 @@
       messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
       if (!lineUpContext && !syncTransactional)
       {
-         // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if syncTransactional = false
+         // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if
+         // syncTransactional = false
          getContext(true).done();
       }
    }
@@ -785,11 +798,11 @@
          ref.setPersistedCount(ref.getDeliveryCount());
          DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
                                                                                   ref.getDeliveryCount());
-   
+
          messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                            JournalStorageManager.UPDATE_DELIVERY_COUNT,
                                            updateInfo,
-   
+
                                            syncNonTransactional,
                                            getContext(syncNonTransactional));
       }
@@ -902,9 +915,9 @@
             case ADD_LARGE_MESSAGE_PENDING:
             {
                PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
-               
+
                pending.decode(buff);
-               
+
                if (pendingLargeMessages != null)
                {
                   // it could be null on tests, and we don't need anything on that case
@@ -974,12 +987,14 @@
 
                if (queueMessages == null)
                {
-                  log.error("Cannot find queue messages for queueID=" + encoding.queueID + " on ack for messageID=" + messageID);
+                  log.error("Cannot find queue messages for queueID=" + encoding.queueID +
+                            " on ack for messageID=" +
+                            messageID);
                }
                else
                {
                   AddMessageRecord rec = queueMessages.remove(messageID);
-   
+
                   if (rec == null)
                   {
                      log.error("Cannot find message " + messageID);
@@ -1055,13 +1070,16 @@
 
                if (queueMessages == null)
                {
-                  log.error("Cannot find queue messages " + encoding.queueID + " for message " + messageID + " while processing scheduled messages");
+                  log.error("Cannot find queue messages " + encoding.queueID +
+                            " for message " +
+                            messageID +
+                            " while processing scheduled messages");
                }
                else
                {
-   
+
                   AddMessageRecord rec = queueMessages.get(messageID);
-   
+
                   if (rec == null)
                   {
                      log.error("Cannot find message " + messageID);
@@ -1189,19 +1207,20 @@
 
             continue;
          }
-         
-         // Redistribution could install a Redistributor while we are still loading records, what will be an issue with prepared ACKs
+
+         // Redistribution could install a Redistributor while we are still loading records, what will be an issue with
+         // prepared ACKs
          // We make sure te Queue is paused before we reroute values.
          queue.pause();
 
          Collection<AddMessageRecord> valueRecords = queueRecords.values();
-         
+
          long currentTime = System.currentTimeMillis();
 
          for (AddMessageRecord record : valueRecords)
          {
             long scheduledDeliveryTime = record.scheduledDeliveryTime;
-            
+
             if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime)
             {
                scheduledDeliveryTime = 0;
@@ -1277,7 +1296,7 @@
       {
          messageJournal.perfBlast(perfBlastPages);
       }
-      
+
       for (Queue queue : queues.values())
       {
          queue.resume();
@@ -1418,7 +1437,7 @@
    public static void describeBindingJournal(final String bindingsDir) throws Exception
    {
 
-      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null);
 
       JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "hornetq-bindings", "bindings", 1);
 
@@ -1428,7 +1447,7 @@
    public static void describeMessagesJournal(final String messagesDir) throws Exception
    {
 
-      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir);
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null);
 
       // Will use only default values. The load function should adapt to anything different
       ConfigurationImpl defaultValues = new ConfigurationImpl();
@@ -1495,7 +1514,6 @@
 
       return bindingsInfo;
    }
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
@@ -1505,7 +1523,6 @@
       messageJournal.lineUpContex(getContext());
    }
 
-
    // HornetQComponent implementation
    // ------------------------------------------------------
 
@@ -1535,14 +1552,19 @@
       started = true;
    }
 
-   public synchronized void stop() throws Exception
+   public  void stop() throws Exception
    {
+      stop(false);
+   }
+
+   public synchronized void stop(boolean ioCriticalError) throws Exception
+   {
       if (!started)
       {
          return;
       }
 
-      if (journalLoaded && idGenerator != null)
+      if (!ioCriticalError && journalLoaded && idGenerator != null)
       {
          // Must call close to make sure last id is persisted
          idGenerator.close();
@@ -1792,7 +1814,7 @@
                   }
 
                   MessageReference removed = queue.removeReferenceWithID(messageID);
-                  
+
                   if (removed == null)
                   {
                      log.warn("Failed to remove reference for " + messageID);
@@ -1914,12 +1936,12 @@
          for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
          {
             byte[] data = recordDeleted.data;
-            
+
             if (data.length > 0)
             {
                HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
                byte b = buff.readByte();
-               
+
                switch (b)
                {
                   case ADD_LARGE_MESSAGE_PENDING:
@@ -1933,7 +1955,7 @@
                      log.warn("can't locate recordType=" + b + " on loadPreparedTransaction//deleteRecords");
                }
             }
-            
+
          }
 
          for (MessageReference ack : referencesToAck)
@@ -2378,7 +2400,7 @@
       {
          return DataConstants.SIZE_LONG;
       }
-      
+
       public String toString()
       {
          return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
@@ -2476,9 +2498,9 @@
    public static class DeleteEncoding implements EncodingSupport
    {
       public byte recordType;
-      
+
       public long id;
-      
+
       public DeleteEncoding()
       {
          super();
@@ -2683,7 +2705,7 @@
 
          // SimpleString simpleStr = new SimpleString(duplID);
          // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
-         
+
          return "DuplicateIDEncoding [address=" + address + ", duplID=" + Arrays.toString(duplID) + "]";
       }
 
@@ -2986,7 +3008,7 @@
          {
             PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
             lmEncoding.decode(buffer);
-            
+
             return lmEncoding;
          }
          case ADD_LARGE_MESSAGE:
@@ -3466,7 +3488,7 @@
 
       journal.stop();
    }
-   
+
    private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
    {
       TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
@@ -3477,14 +3499,12 @@
       }
       txoper.confirmedMessages.add(recordID);
    }
-   
-   
-   
+
    class TXLargeMessageConfirmationOperation implements TransactionOperation
    {
-      
-      public List<Long> confirmedMessages = new LinkedList<Long>(); 
 
+      public List<Long> confirmedMessages = new LinkedList<Long>();
+
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
        */
@@ -3548,8 +3568,7 @@
       {
          return null;
       }
-      
+
    }
 
-
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -568,8 +568,6 @@
     */
    public void lineUpContext()
    {
-      // TODO Auto-generated method stub
-      
    }
 
    /* (non-Javadoc)
@@ -586,4 +584,11 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
+    */
+   public void stop(boolean ioCriticalError) throws Exception
+   {
+   }
+
  }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -19,6 +19,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.logging.Logger;
@@ -67,6 +68,8 @@
    // Attributes ----------------------------------------------------
 
    private static final boolean trace = ReplicationEndpointImpl.log.isTraceEnabled();
+   
+   private final IOCriticalErrorListener criticalErrorListener;
 
    private static void trace(final String msg)
    {
@@ -93,9 +96,10 @@
    private boolean deletePages = true;
 
    // Constructors --------------------------------------------------
-   public ReplicationEndpointImpl(final HornetQServer server)
+   public ReplicationEndpointImpl(final HornetQServer server, IOCriticalErrorListener criticalErrorListener)
    {
       this.server = server;
+      this.criticalErrorListener = criticalErrorListener;
    }
 
    // Public --------------------------------------------------------
@@ -207,7 +211,7 @@
    {
       Configuration config = server.getConfiguration();
 
-      storage = new JournalStorageManager(config, server.getExecutorFactory());
+      storage = new JournalStorageManager(config, server.getExecutorFactory(), criticalErrorListener);
       storage.start();
 
       server.getManagementService().setStorageManager(storage);
@@ -222,7 +226,7 @@
                                                                     config.getJournalBufferSize_NIO(),
                                                                     server.getScheduledPool(),
                                                                     server.getExecutorFactory(),
-                                                                    config.isJournalSyncNonTransactional()),
+                                                                    config.isJournalSyncNonTransactional(), criticalErrorListener),
                                           storage,
                                           server.getAddressSettingsRepository());
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -117,6 +117,8 @@
    Set<ServerSession> getSessions();
 
    boolean isStarted();
+   
+   boolean isStopped();
 
    HierarchicalRepository<Set<Role>> getSecurityRepository();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -83,7 +83,15 @@
 
       if (!file.exists())
       {
-         fileCreated = file.createNewFile();
+         try
+         {
+            fileCreated = file.createNewFile();
+         }
+         catch (Exception e)
+         {
+            log.warn("can't open file " + file, e);
+            throw e;
+         }
          if (!fileCreated)
          {
             throw new IllegalStateException("Unable to create server lock file");

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-28 16:32:50 UTC (rev 11440)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-29 03:25:40 UTC (rev 11441)
@@ -56,7 +56,9 @@
 import org.hornetq.core.deployers.impl.SecurityDeployer;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.impl.SyncSpeedTest;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -235,6 +237,8 @@
    private Thread backupActivationThread;
 
    private Activation activation;
+   
+   private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -454,6 +458,11 @@
 
    public void stop(boolean failoverOnServerShutdown) throws Exception
    {
+      stop(failoverOnServerShutdown, false);
+   }
+   
+   protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
+   {
       synchronized (this)
       {
          if (!started)
@@ -520,7 +529,7 @@
             pagingManager.stop();
          }
 
-         if (storageManager != null)
+         if (!criticalIOError && storageManager != null)
          {
             storageManager.stop();
          }
@@ -745,6 +754,11 @@
    {
       return started;
    }
+   
+   public boolean isStopped()
+   {
+      return stopped;
+   }
 
    public ClusterManager getClusterManager()
    {
@@ -1189,7 +1203,8 @@
                                                              (long)configuration.getJournalBufferSize_NIO(),
                                                              scheduledPool,
                                                              executorFactory,
-                                                             configuration.isJournalSyncNonTransactional()),
+                                                             configuration.isJournalSyncNonTransactional(),
+                                                             shutdownOnCriticalIO),
                                    storageManager,
                                    addressSettingsRepository);
    }
@@ -1201,7 +1216,7 @@
    {
       if (configuration.isPersistenceEnabled())
       {
-         return new JournalStorageManager(configuration, executorFactory, replicationManager);
+         return new JournalStorageManager(configuration, executorFactory, replicationManager, shutdownOnCriticalIO);
       }
       else
       {
@@ -2000,6 +2015,36 @@
          }
       }
    }
+   
+   private class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
+   {
+      boolean failedAlready = false;
+      
+      public synchronized void onIOException(int code, String message, SequentialFile file)
+      {
+         if (!failedAlready)
+         {
+            failedAlready = true;
+            
+            log.warn("Critical IO Error, shutting down the server. code=" + code + ", message=" + message);
+            
+            new Thread()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     HornetQServerImpl.this.stop(true, true);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn(e.getMessage(), e);
+                  }
+               }
+            }.run();
+         }
+      }
+   }
 
    private interface Activation extends Runnable
    {



More information about the hornetq-commits mailing list