[jboss-cvs] JBoss Messaging SVN: r4783 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 7 18:25:44 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-07 18:25:43 -0400 (Thu, 07 Aug 2008)
New Revision: 4783
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
Removed:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Adding more methods.. more tests
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -23,19 +23,25 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.server.ServerMessage;
/**
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface Page<T extends EncodingSupport>
+public interface Page
{
- void queue(T message) throws Exception;
+ void write(ServerMessage message) throws Exception;
- T[] dequeue() throws Exception;
+ ServerMessage[] read() throws Exception;
void sync() throws Exception;
+ void open() throws Exception;
+
+ void close() throws Exception;
+
+ void delete() throws Exception;
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -22,9 +22,7 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.SimpleString;
/**
*
@@ -33,7 +31,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
+public interface PagingManager extends MessagingComponent
{
- public PagingStore<T> getPaging(SimpleString address);
+ public PagingStore getPageStore(String storeName);
}
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,31 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-
-public interface PagingStore<T extends EncodingSupport> extends Page<T>
-{
-
-}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ *
+ * The implementation will take care of details such as PageSize.
+ * The producers will write directly to PagingStore and that will decide what
+ * Page file should be used based on configured size
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface PagingStore
+{
+ void writeOnCurrentPage(ServerMessage message);
+
+
+ /**
+ * Remove the first page from the Writing Queue.
+ * The file will still exist until Page.delete is called,
+ * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+ * @return
+ */
+ Page getOnePage();
+}
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,226 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.util.VariableLatch;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public abstract class AbstractPage<T extends EncodingSupport> implements Page<T>
-{
-
- // Constants -----------------------------------------------------
-
- private static final int SIZE_INTEGER = 4;
-
- public static final byte START_BYTE= (byte)'{';
- public static final byte END_BYTE= (byte)'}';
-
- // Attributes ----------------------------------------------------
-
- private final SequentialFile file;
- private final SequentialFileFactory fileFactory;
- private final PagingCallback callback;
- private volatile long size;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public AbstractPage(final SequentialFileFactory factory, final SequentialFile file) throws Exception
- {
- this.file = file;
- this.size = file.size();
- this.fileFactory = factory;
- if (factory.isSupportsCallbacks())
- {
- callback = new PagingCallback();
- }
- else
- {
- callback = null;
- }
- }
-
-
- // Public --------------------------------------------------------
-
-
- // PagingFile implementation
-
- public T[] dequeue() throws Exception
- {
-
- ArrayList<T> messages = new ArrayList<T>();
-
- ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
- file.position(0);
- file.read(buffer);
-
- ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
-
- while (buffer.hasRemaining())
- {
- final int position = buffer.position();
-
- byte byteRead = buffer.get();
-
- if (byteRead == START_BYTE)
- {
- if (buffer.position() + SIZE_INTEGER < buffer.limit())
- {
- int messageSize = buffer.getInt();
- int oldPos = buffer.position();
- if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
- {
- T msg = instantiateObject();
- msg.decode(messageBuffer);
- messages.add(msg);
- }
- else
- {
- buffer.position(position + 1);
- }
- }
- }
- else
- {
- buffer.position(position + 1);
- }
- }
-
- return messages.toArray(instantiateArray(messages.size()));
- }
-
- public void queue(final T message) throws Exception
- {
- ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6);
- buffer.put(START_BYTE);
- buffer.putInt(message.getEncodeSize());
- message.encode(new ByteBufferWrapper(buffer));
- buffer.put(END_BYTE);
- buffer.rewind();
- if (callback != null)
- {
- callback.countUp();
- expandIfNeeded(buffer.limit());
- file.write(buffer, callback);
- }
- else
- {
- file.write(buffer, false);
- }
-
- }
-
- public void sync() throws Exception
- {
- if (callback != null)
- {
- callback.waitCompletion();
- }
- else
- {
- file.sync();
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected abstract T instantiateObject();
-
- protected abstract T[] instantiateArray(int size);
-
- // Private -------------------------------------------------------
-
- private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
- {
- while (file.position() + bytesToWrite > size)
- {
- final int position = file.position();
-
- file.fill((int)size, 1024*1024, (byte)0);
-
- size = file.size();
-
- file.position(position);
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private static class PagingCallback implements IOCallback
- {
- private final VariableLatch countLatch = new VariableLatch();
-
- private volatile String errorMessage = null;
-
- private volatile int errorCode = 0;
-
- public void countUp()
- {
- countLatch.up();
- }
-
- public void done()
- {
- countLatch.down();
- }
-
- public void waitCompletion() throws InterruptedException
- {
- countLatch.waitCompletion();
-
- if (errorMessage != null)
- {
- throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
- }
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- this.errorMessage = errorMessage;
- this.errorCode = errorCode;
- countLatch.down();
- }
-
- }
-
-}
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/MessagePageImpl.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -1,121 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class MessagePageImpl extends AbstractPage<MessagePageImpl.MessageContainer>
-{
-
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public MessagePageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
- {
- super(factory, file);
- }
-
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- @Override
- protected MessagePageImpl.MessageContainer[] instantiateArray(final int size)
- {
- return new MessagePageImpl.MessageContainer[size];
- }
-
- @Override
- protected MessageContainer instantiateObject()
- {
- return new MessagePageImpl.MessageContainer();
- }
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- /**
- * ServerMessageImpl doesn't store the ID by itself (as the RecordID is used on the Journal).
- * So, when paging we need to store the ID somehow.
- */
- public static class MessageContainer implements EncodingSupport
- {
-
- final ServerMessage containerMsg;
-
- public MessageContainer(ServerMessage containerMsg)
- {
- super();
- this.containerMsg = containerMsg;
- }
-
- public MessageContainer()
- {
- super();
- this.containerMsg = new ServerMessageImpl();
- }
-
- public void decode(final MessagingBuffer buffer)
- {
- containerMsg.setMessageID(buffer.getLong());
- containerMsg.decode(buffer);
- }
-
- public void encode(MessagingBuffer buffer)
- {
- buffer.putLong(containerMsg.getMessageID());
- containerMsg.encode(buffer);
- }
-
- public int getEncodeSize()
- {
- return 8 + containerMsg.getEncodeSize();
- }
-
- public ServerMessage getMessage()
- {
- return containerMsg;
- }
- }
-
-}
Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java (from rev 4782, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/AbstractPage.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -0,0 +1,253 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.util.VariableLatch;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImpl implements Page
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final int SIZE_INTEGER = 4;
+
+ public static final byte START_BYTE= (byte)'{';
+ public static final byte END_BYTE= (byte)'}';
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFile file;
+ private final SequentialFileFactory fileFactory;
+ private final PagingCallback callback;
+ private volatile long size = -1;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
+ {
+ this.file = file;
+ this.fileFactory = factory;
+ if (factory.isSupportsCallbacks())
+ {
+ callback = new PagingCallback();
+ }
+ else
+ {
+ callback = null;
+ }
+ }
+
+
+ // Public --------------------------------------------------------
+
+
+ // PagingFile implementation
+
+ public ServerMessage[] read() throws Exception
+ {
+
+ ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
+ file.position(0);
+ file.read(buffer);
+
+ ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
+
+ while (buffer.hasRemaining())
+ {
+ final int position = buffer.position();
+
+ byte byteRead = buffer.get();
+
+ if (byteRead == START_BYTE)
+ {
+ if (buffer.position() + SIZE_INTEGER < buffer.limit())
+ {
+ int messageSize = buffer.getInt();
+ int oldPos = buffer.position();
+ if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
+ {
+ ServerMessage msg = instantiateObject();
+ msg.setMessageID(buffer.getLong());
+ msg.decode(messageBuffer);
+ messages.add(msg);
+ }
+ else
+ {
+ buffer.position(position + 1);
+ }
+ }
+ }
+ else
+ {
+ buffer.position(position + 1);
+ }
+ }
+
+ return messages.toArray(instantiateArray(messages.size()));
+ }
+
+ public void write(final ServerMessage message) throws Exception
+ {
+ ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6 + 8);
+ buffer.put(START_BYTE);
+ buffer.putInt(message.getEncodeSize() + 8);
+ buffer.putLong(message.getMessageID());
+ message.encode(new ByteBufferWrapper(buffer));
+ buffer.put(END_BYTE);
+ buffer.rewind();
+ expandIfNeeded(buffer.limit());
+
+ if (callback != null)
+ {
+ callback.countUp();
+ file.write(buffer, callback);
+ }
+ else
+ {
+ file.write(buffer, false);
+ }
+
+ }
+
+ public void sync() throws Exception
+ {
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ else
+ {
+ file.sync();
+ }
+ }
+
+ public void open() throws Exception
+ {
+ file.open();
+ }
+
+ public void close() throws Exception
+ {
+ file.close();
+ }
+
+ public void delete() throws Exception
+ {
+ file.delete();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected ServerMessage instantiateObject()
+ {
+ return new ServerMessageImpl();
+ }
+
+ protected ServerMessage[] instantiateArray(int size)
+ {
+ return new ServerMessageImpl[size];
+ }
+
+ // Private -------------------------------------------------------
+
+ private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
+ {
+ if (size < 0)
+ {
+ size = file.size();
+ }
+
+ while (file.position() + bytesToWrite > size)
+ {
+ final int position = file.position();
+
+ file.fill((int)size, 1024*1024, (byte)0);
+
+ size = file.size();
+
+ file.position(position);
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private static class PagingCallback implements IOCallback
+ {
+ private final VariableLatch countLatch = new VariableLatch();
+
+ private volatile String errorMessage = null;
+
+ private volatile int errorCode = 0;
+
+ public void countUp()
+ {
+ countLatch.up();
+ }
+
+ public void done()
+ {
+ countLatch.down();
+ }
+
+ public void waitCompletion() throws InterruptedException
+ {
+ countLatch.waitCompletion();
+
+ if (errorMessage != null)
+ {
+ throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ this.errorMessage = errorMessage;
+ this.errorCode = errorCode;
+ countLatch.down();
+ }
+
+ }
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -54,12 +54,12 @@
System.getProperty("os.arch"),
System.getProperty("os.version")));
}
- testAdd(new AIOSequentialFileFactory(journalDir));
+ testAdd(new AIOSequentialFileFactory(journalDir), 1000);
}
public void testPageWithNIO() throws Exception
{
- testAdd(new NIOSequentialFileFactory(journalDir));
+ testAdd(new NIOSequentialFileFactory(journalDir), 1000);
}
// Package protected ---------------------------------------------
@@ -78,7 +78,7 @@
protected void tearDown() throws Exception
{
super.tearDown();
- deleteDirectory(new File(journalDir));
+ //deleteDirectory(new File(journalDir));
}
// Private -------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -462,46 +462,6 @@
}
- private ByteBuffer compareByteBuffer(final byte expectedArray[])
- {
-
- EasyMock.reportMatcher(new IArgumentMatcher()
- {
-
- public void appendTo(StringBuffer buffer)
- {
- buffer.append("ByteArray");
- }
-
- public boolean matches(Object argument)
- {
- ByteBuffer buffer = (ByteBuffer) argument;
-
- buffer.rewind();
- byte[] compareArray = new byte[buffer.limit()];
- buffer.get(compareArray);
-
- if (compareArray.length != expectedArray.length)
- {
- return false;
- }
-
- for (int i = 0; i < expectedArray.length; i++)
- {
- if (expectedArray[i] != compareArray[i])
- {
- return false;
- }
- }
-
- return true;
- }
-
- });
-
- return null;
- }
-
// Package protected ---------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -420,7 +420,14 @@
public long size() throws Exception
{
- return data.limit();
+ if (data == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return data.limit();
+ }
}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -23,6 +23,16 @@
package org.jboss.messaging.tests.unit.core.paging.impl;
+import java.nio.ByteBuffer;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@@ -46,14 +56,101 @@
public void testPageFakeWithCallbacks() throws Exception
{
- testAdd(new FakeSequentialFileFactory(512, true));
+ testAdd(new FakeSequentialFileFactory(512, true), 10);
}
public void testPageFakeWithoutCallbacks() throws Exception
{
- testAdd(new FakeSequentialFileFactory(512, false));
+ testAdd(new FakeSequentialFileFactory(1, false), 10);
}
+
+
+ public void testEasyMockPageWithCallback() throws Exception
+ {
+ testEasyMockOnPage(true);
+ }
+
+ public void testEasyMockPageWithoutCallback() throws Exception
+ {
+ testEasyMockOnPage(false);
+ }
+
+ private void testEasyMockOnPage(boolean callback) throws Exception
+ {
+ SequentialFileFactory factory = EasyMock.createMock(SequentialFileFactory.class);
+
+ EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
+
+ SequentialFile file = EasyMock.createMock(SequentialFile.class);
+
+ EasyMock.replay(factory, file);
+
+ PageImpl impl = new PageImpl(factory, file);
+
+ EasyMock.verify(factory, file);
+
+ EasyMock.reset(factory, file);
+
+
+ EasyMock.expect(factory.newBuffer(EasyMock.anyInt())).andStubAnswer(new IAnswer<ByteBuffer>() {
+
+ public ByteBuffer answer() throws Throwable
+ {
+ int size = (Integer)EasyMock.getCurrentArguments()[0];
+ return ByteBuffer.allocate(size);
+ }});
+
+ ServerMessage msg = EasyMock.createMock(ServerMessage.class);
+ EasyMock.expect(msg.getMessageID()).andStubReturn(1l);
+ EasyMock.expect(msg.getEncodeSize()).andStubReturn(2);
+ msg.encode(EasyMock.isA(MessagingBuffer.class));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Object>(){
+
+ public Object answer() throws Throwable
+ {
+ MessagingBuffer buffer = (MessagingBuffer)EasyMock.getCurrentArguments()[0];
+ buffer.putByte((byte)5);
+ buffer.putByte((byte)6);
+ return null;
+ }});
+
+ final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
+
+ EasyMock.expect(file.position()).andStubReturn(0);
+
+ EasyMock.expect(file.size()).andReturn(0l);
+
+ file.fill(0, 1024 * 1024, (byte)0);
+
+ EasyMock.expect(file.size()).andReturn(1024l * 1024l);
+
+ file.position(0);
+
+ if (callback)
+ {
+ EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
+
+ public Integer answer() throws Throwable
+ {
+ IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[1];
+ callback.done();
+ return expectedBytes.length;
+ }});
+ }
+ else
+ {
+ EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.eq(false))).andReturn(expectedBytes.length);
+ }
+
+ EasyMock.replay(factory, file, msg);
+
+ impl.write(msg);
+
+ EasyMock.verify(factory, file, msg);
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -28,7 +28,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.MessagePageImpl;
+import org.jboss.messaging.core.paging.impl.PageImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -54,18 +54,22 @@
// Protected -----------------------------------------------------
/** Validate if everything we add is recovered */
- protected void testAdd(SequentialFileFactory fakeFactory) throws Exception
+ protected void testAdd(SequentialFileFactory factory, int numberOfElements) throws Exception
{
- SequentialFile file = fakeFactory.createSequentialFile("testPage.page", 1);
- file.open();
- MessagePageImpl impl = new MessagePageImpl(fakeFactory, file);
+ SequentialFile file = factory.createSequentialFile("testPage.page", 1);
+
+ PageImpl impl = new PageImpl(factory, file);
+
+ impl.open();
+ assertEquals(1, factory.listFiles("page").size());
+
ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
SimpleString simpleDestination = new SimpleString("Test");
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < numberOfElements; i++)
{
ByteBuffer buffer = ByteBuffer.allocate(10);
@@ -83,30 +87,36 @@
msg.setDestination(simpleDestination);
- impl.queue(new MessagePageImpl.MessageContainer(msg));
+ impl.write(msg);
}
impl.sync();
- file.close();
+ impl.close();
- file = fakeFactory.createSequentialFile("testPage.page", 1);
+ file = factory.createSequentialFile("testPage.page", 1);
file.open();
- impl = new MessagePageImpl(fakeFactory, file);
+ impl = new PageImpl(factory, file);
- MessagePageImpl.MessageContainer msgs[] = impl.dequeue();
+ ServerMessage msgs[] = impl.read();
for (int i = 0; i < msgs.length; i++)
{
- assertEquals((long)i, msgs[i].getMessage().getMessageID());
+ assertEquals((long)i, msgs[i].getMessageID());
- assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
+ assertEquals(simpleDestination, msgs[i].getDestination());
+
+ assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
}
- assertEquals(100, msgs.length);
+ assertEquals(numberOfElements, msgs.length);
+
+ impl.delete();
+
+ assertEquals(0, factory.listFiles(".page").size());
+
}
-
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-08-07 17:01:03 UTC (rev 4782)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-08-07 22:25:43 UTC (rev 4783)
@@ -234,7 +234,49 @@
return buffer.array();
}
+
+ protected ByteBuffer compareByteBuffer(final byte expectedArray[])
+ {
+
+ EasyMock.reportMatcher(new IArgumentMatcher()
+ {
+ public void appendTo(StringBuffer buffer)
+ {
+ buffer.append("ByteArray");
+ }
+
+ public boolean matches(Object argument)
+ {
+ ByteBuffer buffer = (ByteBuffer) argument;
+
+ buffer.rewind();
+ byte[] compareArray = new byte[buffer.limit()];
+ buffer.get(compareArray);
+
+ if (compareArray.length != expectedArray.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < expectedArray.length; i++)
+ {
+ if (expectedArray[i] != compareArray[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ });
+
+ return null;
+ }
+
+
+
protected boolean deleteDirectory(File directory)
{
if (directory.isDirectory())
More information about the jboss-cvs-commits
mailing list