[jboss-cvs] JBoss Messaging SVN: r4783 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 7 18:25:44 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-07 18:25:43 -0400 (Thu, 07 Aug 2008)
New Revision: 4783

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
Removed:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Adding more methods.. more tests

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -23,19 +23,25 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.server.ServerMessage;
 
 /**
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface Page<T extends EncodingSupport>
+public interface Page
 {
-   void queue(T message) throws Exception;
+   void write(ServerMessage message) throws Exception;
    
-   T[] dequeue() throws Exception;
+   ServerMessage[] read() throws Exception;
    
    void sync() throws Exception;
    
+   void open() throws Exception;
+   
+   void close() throws Exception;
+   
+   void delete() throws Exception;
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -22,9 +22,7 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -33,7 +31,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
+public interface PagingManager extends MessagingComponent
 {
-   public PagingStore<T> getPaging(SimpleString address);
+   public PagingStore getPageStore(String storeName);
 }

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,31 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-
-public interface PagingStore<T extends EncodingSupport> extends Page<T>
-{
-   
-}

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ * 
+ * The implementation will take care of details such as PageSize.
+ * The producers will write directly to PagingStore and that will decide what
+ * Page file should be used based on configured size
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface PagingStore
+{
+   void writeOnCurrentPage(ServerMessage message);
+   
+   
+   /** 
+    * Remove the first page from the Writing Queue.
+    * The file will still exist until Page.delete is called, 
+    * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+    * @return
+    */
+   Page getOnePage();
+}

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,226 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.util.VariableLatch;
-
-/**
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public abstract class AbstractPage<T extends EncodingSupport> implements Page<T>
-{
-   
-   // Constants -----------------------------------------------------
-   
-   private static final int SIZE_INTEGER = 4;
-   
-   public static final byte START_BYTE= (byte)'{';
-   public static final byte END_BYTE= (byte)'}';
-   
-   // Attributes ----------------------------------------------------
-   
-   private final SequentialFile file;
-   private final SequentialFileFactory fileFactory;
-   private final PagingCallback callback;
-   private volatile long size;
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   public AbstractPage(final SequentialFileFactory factory, final SequentialFile file) throws Exception
-   {
-      this.file = file;
-      this.size = file.size();
-      this.fileFactory = factory;
-      if (factory.isSupportsCallbacks())
-      {
-         callback = new PagingCallback();
-      }
-      else
-      {
-         callback = null;
-      }
-   }
-   
-   
-   // Public --------------------------------------------------------
-
-   
-   // PagingFile implementation
-   
-   public T[] dequeue() throws Exception
-   {
-      
-      ArrayList<T> messages = new ArrayList<T>();
-
-      ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
-      file.position(0);
-      file.read(buffer);
-      
-      ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
-      
-      while (buffer.hasRemaining())
-      {
-         final int position = buffer.position();
-         
-         byte byteRead = buffer.get();
-         
-         if (byteRead == START_BYTE)
-         {
-            if (buffer.position() + SIZE_INTEGER < buffer.limit())
-            {
-               int messageSize = buffer.getInt();
-               int oldPos = buffer.position();
-               if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
-               {
-                  T msg = instantiateObject();
-                  msg.decode(messageBuffer);
-                  messages.add(msg);
-               }
-               else
-               {
-                  buffer.position(position + 1); 
-               }
-            }
-         }
-         else
-         {
-            buffer.position(position + 1); 
-         }
-      }
-      
-      return messages.toArray(instantiateArray(messages.size()));
-   }
-   
-   public void queue(final T message) throws Exception
-   {
-      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6);
-      buffer.put(START_BYTE);
-      buffer.putInt(message.getEncodeSize());
-      message.encode(new ByteBufferWrapper(buffer));
-      buffer.put(END_BYTE);
-      buffer.rewind();
-      if (callback != null)
-      {
-         callback.countUp();
-         expandIfNeeded(buffer.limit());
-         file.write(buffer, callback);
-      }
-      else
-      {
-         file.write(buffer, false);
-      }
-      
-   }
-   
-   public void sync() throws Exception
-   {
-      if (callback != null)
-      {
-         callback.waitCompletion();
-      }
-      else
-      {
-         file.sync();
-      }
-   }
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   protected abstract T instantiateObject();
-   
-   protected abstract T[] instantiateArray(int size);
-   
-   // Private -------------------------------------------------------
-   
-   private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
-   {
-      while (file.position() + bytesToWrite > size)
-      {
-         final int position = file.position();
-         
-         file.fill((int)size, 1024*1024, (byte)0);
-         
-         size = file.size();
-         
-         file.position(position);
-      }
-   }
-   
-   // Inner classes -------------------------------------------------
-
-   private static class PagingCallback implements IOCallback
-   {      
-      private final VariableLatch countLatch = new VariableLatch();
-      
-      private volatile String errorMessage = null;
-      
-      private volatile int errorCode = 0;
-      
-      public void countUp()
-      {
-         countLatch.up();
-      }
-      
-      public void done()
-      {
-         countLatch.down();
-      }
-      
-      public void waitCompletion() throws InterruptedException
-      {
-         countLatch.waitCompletion();
-         
-         if (errorMessage != null)
-         {
-            throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
-         }
-      }
-      
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         this.errorMessage = errorMessage;
-         this.errorCode = errorCode;
-         countLatch.down();
-      }
-      
-   }
-   
-}

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,121 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-
-/**
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class MessagePageImpl extends AbstractPage<MessagePageImpl.MessageContainer>
-{
-
-   
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-
-   public MessagePageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
-   {
-      super(factory, file);
-   }
-
-   
-   // Public --------------------------------------------------------
-   
-   // Package protected ---------------------------------------------
-   
-   @Override
-   protected MessagePageImpl.MessageContainer[] instantiateArray(final int size)
-   {
-      return new MessagePageImpl.MessageContainer[size];
-   }
-
-   @Override
-   protected MessageContainer instantiateObject()
-   {
-      return new MessagePageImpl.MessageContainer();
-   }
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-   
-   /**
-    * ServerMessageImpl doesn't store the ID by itself (as the RecordID is used on the Journal).
-    * So, when paging we need to store the ID somehow.
-    */
-   public static class MessageContainer implements EncodingSupport
-   {
-
-      final ServerMessage containerMsg;
-      
-      public MessageContainer(ServerMessage containerMsg)
-      {
-         super();
-         this.containerMsg = containerMsg;
-      }
-
-      public MessageContainer()
-      {
-         super();
-         this.containerMsg = new ServerMessageImpl();
-      }
-
-      public void decode(final MessagingBuffer buffer)
-      {
-         containerMsg.setMessageID(buffer.getLong());
-         containerMsg.decode(buffer);
-      }
-
-      public void encode(MessagingBuffer buffer)
-      {
-         buffer.putLong(containerMsg.getMessageID());
-         containerMsg.encode(buffer);
-      }
-
-      public int getEncodeSize()
-      {
-         return 8 + containerMsg.getEncodeSize();
-      }
-      
-      public ServerMessage getMessage()
-      {
-         return containerMsg;
-      }
-   }
-   
-}

Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java (from rev 4782, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -0,0 +1,253 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.util.VariableLatch;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImpl implements Page
+{
+   
+   // Constants -----------------------------------------------------
+   
+   private static final int SIZE_INTEGER = 4;
+   
+   public static final byte START_BYTE= (byte)'{';
+   public static final byte END_BYTE= (byte)'}';
+   
+   // Attributes ----------------------------------------------------
+   
+   private final SequentialFile file;
+   private final SequentialFileFactory fileFactory;
+   private final PagingCallback callback;
+   private volatile long size = -1;
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   public PageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
+   {
+      this.file = file;
+      this.fileFactory = factory;
+      if (factory.isSupportsCallbacks())
+      {
+         callback = new PagingCallback();
+      }
+      else
+      {
+         callback = null;
+      }
+   }
+   
+   
+   // Public --------------------------------------------------------
+
+   
+   // PagingFile implementation
+   
+   public ServerMessage[] read() throws Exception
+   {
+      
+      ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+      ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
+      file.position(0);
+      file.read(buffer);
+      
+      ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
+      
+      while (buffer.hasRemaining())
+      {
+         final int position = buffer.position();
+         
+         byte byteRead = buffer.get();
+         
+         if (byteRead == START_BYTE)
+         {
+            if (buffer.position() + SIZE_INTEGER < buffer.limit())
+            {
+               int messageSize = buffer.getInt();
+               int oldPos = buffer.position();
+               if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
+               {
+                  ServerMessage msg = instantiateObject();
+                  msg.setMessageID(buffer.getLong());
+                  msg.decode(messageBuffer);
+                  messages.add(msg);
+               }
+               else
+               {
+                  buffer.position(position + 1); 
+               }
+            }
+         }
+         else
+         {
+            buffer.position(position + 1); 
+         }
+      }
+      
+      return messages.toArray(instantiateArray(messages.size()));
+   }
+   
+   public void write(final ServerMessage message) throws Exception
+   {
+      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6 + 8);
+      buffer.put(START_BYTE);
+      buffer.putInt(message.getEncodeSize() + 8);
+      buffer.putLong(message.getMessageID());
+      message.encode(new ByteBufferWrapper(buffer));
+      buffer.put(END_BYTE);
+      buffer.rewind();
+      expandIfNeeded(buffer.limit());
+
+      if (callback != null)
+      {
+         callback.countUp();
+         file.write(buffer, callback);
+      }
+      else
+      {
+         file.write(buffer, false);
+      }
+      
+   }
+   
+   public void sync() throws Exception
+   {
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+      else
+      {
+         file.sync();
+      }
+   }
+   
+   public void open() throws Exception
+   {
+      file.open();
+   }
+   
+   public void close() throws Exception
+   {
+      file.close();
+   }
+   
+   public void delete() throws Exception
+   {
+      file.delete();
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   protected ServerMessage instantiateObject()
+   {
+      return new ServerMessageImpl();
+   }
+   
+   protected ServerMessage[] instantiateArray(int size)
+   {
+      return new ServerMessageImpl[size];
+   }
+   
+   // Private -------------------------------------------------------
+   
+   private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
+   {
+      if (size < 0)
+      {
+         size = file.size();
+      }
+      
+      while (file.position() + bytesToWrite > size)
+      {
+         final int position = file.position();
+         
+         file.fill((int)size, 1024*1024, (byte)0);
+         
+         size = file.size();
+         
+         file.position(position);
+      }
+   }
+   
+   // Inner classes -------------------------------------------------
+
+   private static class PagingCallback implements IOCallback
+   {      
+      private final VariableLatch countLatch = new VariableLatch();
+      
+      private volatile String errorMessage = null;
+      
+      private volatile int errorCode = 0;
+      
+      public void countUp()
+      {
+         countLatch.up();
+      }
+      
+      public void done()
+      {
+         countLatch.down();
+      }
+      
+      public void waitCompletion() throws InterruptedException
+      {
+         countLatch.waitCompletion();
+         
+         if (errorMessage != null)
+         {
+            throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
+         }
+      }
+      
+      public void onError(final int errorCode, final String errorMessage)
+      {
+         this.errorMessage = errorMessage;
+         this.errorCode = errorCode;
+         countLatch.down();
+      }
+      
+   }
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -54,12 +54,12 @@
                System.getProperty("os.arch"), 
                System.getProperty("os.version")));
       }
-      testAdd(new AIOSequentialFileFactory(journalDir));
+      testAdd(new AIOSequentialFileFactory(journalDir), 1000);
    }
    
    public void testPageWithNIO() throws Exception
    {
-      testAdd(new NIOSequentialFileFactory(journalDir));
+      testAdd(new NIOSequentialFileFactory(journalDir), 1000);
    }
    
    // Package protected ---------------------------------------------
@@ -78,7 +78,7 @@
    protected void tearDown() throws Exception
    {
       super.tearDown();
-      deleteDirectory(new File(journalDir));
+      //deleteDirectory(new File(journalDir));
    }
 
    // Private -------------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -462,46 +462,6 @@
    }
    
    
-   private ByteBuffer compareByteBuffer(final byte expectedArray[])
-   {
-      
-      EasyMock.reportMatcher(new IArgumentMatcher()
-      {
-
-         public void appendTo(StringBuffer buffer)
-         {
-            buffer.append("ByteArray");
-         }
-
-         public boolean matches(Object argument)
-         {
-            ByteBuffer buffer = (ByteBuffer) argument;
-            
-            buffer.rewind();
-            byte[] compareArray = new byte[buffer.limit()];
-            buffer.get(compareArray);
-            
-            if (compareArray.length != expectedArray.length)
-            {
-               return false;
-            }
-            
-            for (int i = 0; i < expectedArray.length; i++)
-            {
-               if (expectedArray[i] != compareArray[i])
-               {
-                  return false;
-               }
-            }
-            
-            return true;
-         }
-         
-      });
-      
-      return null;
-   }
-
    // Package protected ---------------------------------------------
    
    // Inner classes -------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -420,7 +420,14 @@
       
       public long size() throws Exception
       {
-         return data.limit();
+         if (data == null)
+         {
+            return 0;
+         }
+         else
+         {
+            return data.limit();
+         }
       }
       
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -23,6 +23,16 @@
 
 package org.jboss.messaging.tests.unit.core.paging.impl;
 
+import java.nio.ByteBuffer;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 
 
@@ -46,14 +56,101 @@
    
    public void testPageFakeWithCallbacks() throws Exception
    {
-      testAdd(new FakeSequentialFileFactory(512, true));
+      testAdd(new FakeSequentialFileFactory(512, true), 10);
    }
    
    public void testPageFakeWithoutCallbacks() throws Exception
    {
-      testAdd(new FakeSequentialFileFactory(512, false));
+      testAdd(new FakeSequentialFileFactory(1, false), 10);
    }
    
+   
+   
+   public void testEasyMockPageWithCallback() throws Exception
+   {
+      testEasyMockOnPage(true);
+   }
+   
+   public void testEasyMockPageWithoutCallback() throws Exception
+   {
+      testEasyMockOnPage(false);
+   }
+   
+   private void testEasyMockOnPage(boolean callback) throws Exception
+   {
+      SequentialFileFactory factory = EasyMock.createMock(SequentialFileFactory.class);
+      
+      EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
+      
+      SequentialFile file = EasyMock.createMock(SequentialFile.class);
+      
+      EasyMock.replay(factory, file);
+      
+      PageImpl impl = new PageImpl(factory, file);
+      
+      EasyMock.verify(factory, file);
+      
+      EasyMock.reset(factory, file);
+
+      
+      EasyMock.expect(factory.newBuffer(EasyMock.anyInt())).andStubAnswer(new IAnswer<ByteBuffer>() {
+
+         public ByteBuffer answer() throws Throwable
+         {
+            int size = (Integer)EasyMock.getCurrentArguments()[0];
+            return ByteBuffer.allocate(size);
+         }});
+      
+      ServerMessage msg = EasyMock.createMock(ServerMessage.class);
+      EasyMock.expect(msg.getMessageID()).andStubReturn(1l);
+      EasyMock.expect(msg.getEncodeSize()).andStubReturn(2);
+      msg.encode(EasyMock.isA(MessagingBuffer.class));
+      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>(){
+
+         public Object answer() throws Throwable
+         {
+            MessagingBuffer buffer = (MessagingBuffer)EasyMock.getCurrentArguments()[0];
+            buffer.putByte((byte)5);            
+            buffer.putByte((byte)6);            
+            return null;
+         }});
+      
+      final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
+      
+      EasyMock.expect(file.position()).andStubReturn(0);
+
+      EasyMock.expect(file.size()).andReturn(0l);
+      
+      file.fill(0, 1024 * 1024, (byte)0);
+      
+      EasyMock.expect(file.size()).andReturn(1024l * 1024l);
+
+      file.position(0);
+      
+      if (callback)
+      {
+         EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
+
+            public Integer answer() throws Throwable
+            {
+               IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[1];
+               callback.done();
+               return expectedBytes.length;
+            }});
+      }
+      else
+      {
+         EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.eq(false))).andReturn(expectedBytes.length);
+      }
+      
+      EasyMock.replay(factory, file, msg);
+      
+      impl.write(msg);
+      
+      EasyMock.verify(factory, file, msg);
+      
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -28,7 +28,7 @@
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.MessagePageImpl;
+import org.jboss.messaging.core.paging.impl.PageImpl;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -54,18 +54,22 @@
    // Protected -----------------------------------------------------
    
    /** Validate if everything we add is recovered */
-   protected void testAdd(SequentialFileFactory fakeFactory) throws Exception
+   protected void testAdd(SequentialFileFactory factory, int numberOfElements) throws Exception
    {
       
-      SequentialFile file = fakeFactory.createSequentialFile("testPage.page", 1);
-      file.open();
-      MessagePageImpl impl = new MessagePageImpl(fakeFactory, file);
+      SequentialFile file = factory.createSequentialFile("testPage.page", 1);
+      
+      PageImpl impl = new PageImpl(factory, file);
+      
+      impl.open();
 
+      assertEquals(1, factory.listFiles("page").size());
+
       ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
       
       SimpleString simpleDestination = new SimpleString("Test");
       
-      for (int i = 0; i < 100; i++)
+      for (int i = 0; i < numberOfElements; i++)
       {
          ByteBuffer buffer = ByteBuffer.allocate(10);
          
@@ -83,30 +87,36 @@
          
          msg.setDestination(simpleDestination);
          
-         impl.queue(new MessagePageImpl.MessageContainer(msg));
+         impl.write(msg);
       }
       
       impl.sync();
-      file.close();
+      impl.close();
       
-      file = fakeFactory.createSequentialFile("testPage.page", 1);
+      file = factory.createSequentialFile("testPage.page", 1);
       file.open();
-      impl = new MessagePageImpl(fakeFactory, file);
+      impl = new PageImpl(factory, file);
       
-      MessagePageImpl.MessageContainer msgs[] = impl.dequeue();
+      ServerMessage msgs[] = impl.read();
       
       for (int i = 0; i < msgs.length; i++)
       {
-         assertEquals((long)i, msgs[i].getMessage().getMessageID());
+         assertEquals((long)i, msgs[i].getMessageID());
          
-         assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
+         assertEquals(simpleDestination, msgs[i].getDestination());
+         
+         assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
       }
       
-      assertEquals(100, msgs.length);
+      assertEquals(numberOfElements, msgs.length);
+
+      impl.delete();
       
+      
+      assertEquals(0, factory.listFiles(".page").size());
+      
    }
-
- 
+   
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-08-07 22:25:43 UTC (rev 4783)
@@ -234,7 +234,49 @@
       return buffer.array();
    }
    
+   
+   protected ByteBuffer compareByteBuffer(final byte expectedArray[])
+   {
+      
+      EasyMock.reportMatcher(new IArgumentMatcher()
+      {
 
+         public void appendTo(StringBuffer buffer)
+         {
+            buffer.append("ByteArray");
+         }
+
+         public boolean matches(Object argument)
+         {
+            ByteBuffer buffer = (ByteBuffer) argument;
+            
+            buffer.rewind();
+            byte[] compareArray = new byte[buffer.limit()];
+            buffer.get(compareArray);
+            
+            if (compareArray.length != expectedArray.length)
+            {
+               return false;
+            }
+            
+            for (int i = 0; i < expectedArray.length; i++)
+            {
+               if (expectedArray[i] != compareArray[i])
+               {
+                  return false;
+               }
+            }
+            
+            return true;
+         }
+         
+      });
+      
+      return null;
+   }
+
+   
+
    protected boolean deleteDirectory(File directory)
    {
       if (directory.isDirectory())




More information about the jboss-cvs-commits mailing list