[jboss-cvs] JBoss Messaging SVN: r4777 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 5 21:03:50 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-05 21:03:49 -0400 (Tue, 05 Aug 2008)
New Revision: 4777

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Backup point
(Tests are not working yet)

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -70,4 +70,8 @@
    
    void close() throws Exception;
    
+   void sync() throws Exception;
+   
+   long size() throws Exception;
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -245,6 +245,16 @@
       }		
    }
    
+   public void sync() throws Exception
+   {
+      throw new IllegalArgumentException("This method is not supported on AIO");
+   }
+
+   public long size() throws Exception
+   {
+      return aioFile.size();
+   }
+
    public String toString()
    {
       return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -164,7 +164,7 @@
       
       if (sync)
       {
-         channel.force(false);
+         sync();
       }
       
       return bytesRead;
@@ -190,6 +190,17 @@
       }
    }
    
+   public void sync() throws Exception
+   {
+      channel.force(false);
+   }
+   
+   public long size() throws Exception
+   {
+      return channel.size();
+   }
+
+   
    public void position(final int pos) throws Exception
    {
       channel.position(pos);

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface Page
+{
+   void queueMessage(ServerMessage message) throws Exception;
+   
+   ServerMessage[] dequeueMessages() throws Exception;
+   
+   void sync() throws Exception;
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -22,9 +22,8 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -35,7 +34,5 @@
  */
 public interface PagingManager extends MessagingComponent
 {
-   void pageReference(Queue queue, MessageReference ref);
-   
-   MessageReference depageReference(Queue queue);
+   public PagingStore getPaging(SimpleString address);
 }

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+public interface PagingStore extends Page
+{
+   
+}

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.util.VariableLatch;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImpl implements Page
+{
+   
+   // Constants -----------------------------------------------------
+   
+   private static final int SIZE_INTEGER = 4;
+   
+   public static final byte START_BYTE= (byte)'{';
+   public static final byte END_BYTE= (byte)'}';
+   
+   // Attributes ----------------------------------------------------
+   
+   private final SequentialFile file;
+   private final SequentialFileFactory fileFactory;
+   private final PagingCallback callback;
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   public PageImpl(SequentialFileFactory factory, SequentialFile file)
+   {
+      this.file = file;
+      this.fileFactory = factory;
+      if (factory.isSupportsCallbacks())
+      {
+         callback = new PagingCallback();
+      }
+      else
+      {
+         callback = null;
+      }
+   }
+   
+   
+   // Public --------------------------------------------------------
+
+   
+   // PagingFile implementation
+   
+   public ServerMessage[] dequeueMessages() throws Exception
+   {
+      
+      ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+      ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
+      
+      while (buffer.hasRemaining())
+      {
+         final int position = buffer.position();
+         
+         ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
+         
+         if (buffer.get() == START_BYTE)
+         {
+            if (buffer.position() + SIZE_INTEGER < buffer.limit())
+            {
+               int messageSize = buffer.getInt();
+               int oldPos = buffer.position();
+               if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
+               {
+                  ServerMessage msg = new ServerMessageImpl();
+                  msg.decode(messageBuffer);
+                  messages.add(msg);
+               }
+               else
+               {
+                  buffer.position(position + 1); 
+               }
+            }
+         }
+         else
+         {
+            buffer.position(position + 1); 
+         }
+      }
+      
+      return messages.toArray(new ServerMessage[messages.size()]);
+   }
+   
+   public void queueMessage(ServerMessage message) throws Exception
+   {
+      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6);
+      buffer.put(START_BYTE);
+      buffer.putInt(message.getEncodeSize());
+      message.encode(new ByteBufferWrapper(buffer));
+      buffer.put(END_BYTE);
+      if (callback != null)
+      {
+         callback.countUp();
+         file.write(buffer, callback);
+      }
+      else
+      {
+         file.write(buffer, false);
+      }
+      
+   }
+   
+   public void sync() throws Exception
+   {
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+      else
+      {
+         file.sync();
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+
+   private static class PagingCallback implements IOCallback
+   {      
+      private final VariableLatch countLatch = new VariableLatch();
+      
+      private volatile String errorMessage = null;
+      
+      private volatile int errorCode = 0;
+      
+      public void countUp()
+      {
+         countLatch.up();
+      }
+      
+      public void done()
+      {
+         countLatch.down();
+      }
+      
+      public void waitCompletion() throws InterruptedException
+      {
+         countLatch.waitCompletion();
+         
+         if (errorMessage != null)
+         {
+            throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
+         }
+      }
+      
+      public void onError(final int errorCode, final String errorMessage)
+      {
+         this.errorMessage = errorMessage;
+         this.errorCode = errorCode;
+         countLatch.down();
+      }
+      
+   }
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-05 23:22:42 UTC (rev 4776)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -409,6 +409,20 @@
          
       }
       
+      public void sync() throws Exception
+      {
+         if (supportsCallback)
+         {
+            throw new IllegalStateException("sync is not supported when supportsCallback=true");
+         }
+      }
+      
+      public long size() throws Exception
+      {
+         return data.limit();
+      }
+      
+      
       public int write(final ByteBuffer bytes, final boolean sync) throws Exception
       {
          return write(bytes, null);
@@ -459,6 +473,7 @@
       }
 
 
+
    }
 
 }

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-06 01:03:49 UTC (rev 4777)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageImplTest extends UnitTestCase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   
+   /** Validate if everything we add is recovered */
+   public void testSimpleAdd() throws Exception
+   {
+      FakeSequentialFileFactory fakeFactory = new FakeSequentialFileFactory(512, true);
+      
+      SequentialFile file = fakeFactory.createSequentialFile("testPage.page", 1);
+      file.open();
+      PageImpl impl = new PageImpl(fakeFactory, file);
+
+      ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      
+      SimpleString simpleDestination = new SimpleString("Test");
+      
+      for (int i = 0; i < 100; i++)
+      {
+         ByteBuffer buffer = ByteBuffer.allocate(10);
+         
+         for (int j = 0; j < buffer.limit(); j++)
+         {
+            buffer.put(RandomUtil.randomByte());
+         }
+         
+         buffers.add(buffer);
+
+         ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+               System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+         
+         msg.setMessageID((long)i);
+         
+         msg.setDestination(simpleDestination);
+         
+         impl.queueMessage(msg);
+      }
+      
+      
+
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}




More information about the jboss-cvs-commits mailing list