[jboss-cvs] JBoss Messaging SVN: r4777 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 5 21:03:50 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-05 21:03:49 -0400 (Tue, 05 Aug 2008)
New Revision: 4777
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Backup point
(Tests are not working yet)
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -70,4 +70,8 @@
void close() throws Exception;
+ void sync() throws Exception;
+
+ long size() throws Exception;
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -245,6 +245,16 @@
}
}
+ public void sync() throws Exception
+ {
+ throw new IllegalArgumentException("This method is not supported on AIO");
+ }
+
+ public long size() throws Exception
+ {
+ return aioFile.size();
+ }
+
public String toString()
{
return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -164,7 +164,7 @@
if (sync)
{
- channel.force(false);
+ sync();
}
return bytesRead;
@@ -190,6 +190,17 @@
}
}
+ public void sync() throws Exception
+ {
+ channel.force(false);
+ }
+
+ public long size() throws Exception
+ {
+ return channel.size();
+ }
+
+
public void position(final int pos) throws Exception
{
channel.position(pos);
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface Page
+{
+ void queueMessage(ServerMessage message) throws Exception;
+
+ ServerMessage[] dequeueMessages() throws Exception;
+
+ void sync() throws Exception;
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -22,9 +22,8 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -35,7 +34,5 @@
*/
public interface PagingManager extends MessagingComponent
{
- void pageReference(Queue queue, MessageReference ref);
-
- MessageReference depageReference(Queue queue);
+ public PagingStore getPaging(SimpleString address);
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+public interface PagingStore extends Page
+{
+
+}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.util.VariableLatch;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImpl implements Page
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final int SIZE_INTEGER = 4;
+
+ public static final byte START_BYTE= (byte)'{';
+ public static final byte END_BYTE= (byte)'}';
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFile file;
+ private final SequentialFileFactory fileFactory;
+ private final PagingCallback callback;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageImpl(SequentialFileFactory factory, SequentialFile file)
+ {
+ this.file = file;
+ this.fileFactory = factory;
+ if (factory.isSupportsCallbacks())
+ {
+ callback = new PagingCallback();
+ }
+ else
+ {
+ callback = null;
+ }
+ }
+
+
+ // Public --------------------------------------------------------
+
+
+ // PagingFile implementation
+
+ public ServerMessage[] dequeueMessages() throws Exception
+ {
+
+ ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
+
+ while (buffer.hasRemaining())
+ {
+ final int position = buffer.position();
+
+ ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
+
+ if (buffer.get() == START_BYTE)
+ {
+ if (buffer.position() + SIZE_INTEGER < buffer.limit())
+ {
+ int messageSize = buffer.getInt();
+ int oldPos = buffer.position();
+ if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
+ {
+ ServerMessage msg = new ServerMessageImpl();
+ msg.decode(messageBuffer);
+ messages.add(msg);
+ }
+ else
+ {
+ buffer.position(position + 1);
+ }
+ }
+ }
+ else
+ {
+ buffer.position(position + 1);
+ }
+ }
+
+ return messages.toArray(new ServerMessage[messages.size()]);
+ }
+
+ public void queueMessage(ServerMessage message) throws Exception
+ {
+ ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6);
+ buffer.put(START_BYTE);
+ buffer.putInt(message.getEncodeSize());
+ message.encode(new ByteBufferWrapper(buffer));
+ buffer.put(END_BYTE);
+ if (callback != null)
+ {
+ callback.countUp();
+ file.write(buffer, callback);
+ }
+ else
+ {
+ file.write(buffer, false);
+ }
+
+ }
+
+ public void sync() throws Exception
+ {
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ else
+ {
+ file.sync();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private static class PagingCallback implements IOCallback
+ {
+ private final VariableLatch countLatch = new VariableLatch();
+
+ private volatile String errorMessage = null;
+
+ private volatile int errorCode = 0;
+
+ public void countUp()
+ {
+ countLatch.up();
+ }
+
+ public void done()
+ {
+ countLatch.down();
+ }
+
+ public void waitCompletion() throws InterruptedException
+ {
+ countLatch.waitCompletion();
+
+ if (errorMessage != null)
+ {
+ throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ this.errorMessage = errorMessage;
+ this.errorCode = errorCode;
+ countLatch.down();
+ }
+
+ }
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -409,6 +409,20 @@
}
+ public void sync() throws Exception
+ {
+ if (supportsCallback)
+ {
+ throw new IllegalStateException("sync is not supported when supportsCallback=true");
+ }
+ }
+
+ public long size() throws Exception
+ {
+ return data.limit();
+ }
+
+
public int write(final ByteBuffer bytes, final boolean sync) throws Exception
{
return write(bytes, null);
@@ -459,6 +473,7 @@
}
+
}
}
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImplTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ /** Validate if everything we add is recovered */
+ public void testSimpleAdd() throws Exception
+ {
+ FakeSequentialFileFactory fakeFactory = new FakeSequentialFileFactory(512, true);
+
+ SequentialFile file = fakeFactory.createSequentialFile("testPage.page", 1);
+ file.open();
+ PageImpl impl = new PageImpl(fakeFactory, file);
+
+ ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ SimpleString simpleDestination = new SimpleString("Test");
+
+ for (int i = 0; i < 100; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+
+ for (int j = 0; j < buffer.limit(); j++)
+ {
+ buffer.put(RandomUtil.randomByte());
+ }
+
+ buffers.add(buffer);
+
+ ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+ System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+
+ msg.setMessageID((long)i);
+
+ msg.setDestination(simpleDestination);
+
+ impl.queueMessage(msg);
+ }
+
+
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list