[jboss-cvs] JBoss Messaging SVN: r2555 - in trunk: src/main/org/jboss/messaging/core and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 16 14:25:10 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-03-16 14:25:10 -0400 (Fri, 16 Mar 2007)
New Revision: 2555

Added:
   trunk/src/main/org/jboss/messaging/core/filepersist/
   trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
   trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
   trunk/tests/src/org/jboss/test/messaging/core/filepersist/
   trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
Modified:
   trunk/docs/examples/ejb3mdb/do-not-distribute.properties
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-543 - this is just an experiment of a file persistence based..
This will probably change a lot... adding an initial version just based on my current experiments.

Modified: trunk/docs/examples/ejb3mdb/do-not-distribute.properties
===================================================================
--- trunk/docs/examples/ejb3mdb/do-not-distribute.properties	2007-03-16 16:20:30 UTC (rev 2554)
+++ trunk/docs/examples/ejb3mdb/do-not-distribute.properties	2007-03-16 18:25:10 UTC (rev 2555)
@@ -5,4 +5,4 @@
 messaging.client.jar.path=../../../output/lib
 messaging.client.jar.name=jboss-messaging-client.jar
 jboss.configuration=messaging
-jboss.home=C:\\work\\src\\jboss-4.0.5.GA-src\\build\\output\\jboss-4.0.5.GA-ejb3
+jboss.home=/extra/clebert/jboss/jboss-install

Added: trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java	2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,168 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class BlockIndex implements Cloneable
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   public static final int REGISTER_SIZE = 4 * 4 + 8 + 1;
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   /** Immutable attribute */ 
+   private int blockId;
+   
+   /** Immutable attribute, you can't change a block's size after allocated */
+   private int blockSize;
+
+   /** Immutable attribute, you can't change where a block is written after  */
+   private long filePosition;
+
+   private boolean confirmed;
+   
+   private int nextBlock=-1;
+
+   private int previousBlock=-1;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   BlockIndex()
+   {
+   }
+
+   public BlockIndex(int blockId, int blockSize, long filePosition)
+   {
+      this.blockId = blockId;
+      this.blockSize = blockSize;
+      this.filePosition = filePosition;
+   }
+
+   public BlockIndex(int blockId, int blockSize, long filePosition,
+                     boolean confirmed, int nextBlock, int previousBlock)
+   {
+      this(blockId, blockSize, filePosition);
+      this.confirmed = confirmed;
+      this.nextBlock = nextBlock;
+      this.previousBlock = previousBlock;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+
+   public int getBlockId()
+   {
+      return blockId;
+   }
+
+   public int getBlockSize()
+   {
+      return blockSize;
+   }
+
+   public long getFilePosition()
+   {
+      return filePosition;
+   }
+
+   public int getNextBlock()
+   {
+      return nextBlock;
+   }
+
+   public void setNextBlock(int nextBlock)
+   {
+      this.nextBlock = nextBlock;
+   }
+
+
+   public boolean isConfirmed()
+   {
+      return confirmed;
+   }
+
+   public void setConfirmed(boolean confirmed)
+   {
+      this.confirmed = confirmed;
+   }
+
+
+   public int getPreviousBlock()
+   {
+      return previousBlock;
+   }
+
+   public void setPreviousBlock(int previousBlock)
+   {
+      this.previousBlock = previousBlock;
+   }
+
+   public void writeToBuffer(ByteBuffer buffer)
+   {
+      buffer.putInt(nextBlock);
+      buffer.putInt(previousBlock);
+      buffer.put(confirmed?(byte)1:(byte)0);
+      buffer.putLong(filePosition);
+      buffer.putInt(blockSize);
+      buffer.putInt(blockId);
+   }
+
+   public void readFromBuffer(ByteBuffer buffer)
+   {
+      nextBlock = buffer.getInt();
+      previousBlock = buffer.getInt();
+      confirmed = (buffer.get()==(byte)1);
+      filePosition = buffer.getLong();
+      blockSize = buffer.getInt();
+      blockId = buffer.getInt();
+   }
+
+   public Object clone()
+   {
+      return new BlockIndex(blockId, blockSize, filePosition, confirmed, nextBlock, previousBlock);
+   }
+
+   public String toString()
+   {
+      return "BlockIndex[" + this.blockId + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java	2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,270 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.io.File;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * This class can play with BlockIndex
+ * A BlockIndex could be added, and later be confirmed.
+ * When a block is confirmed its status is set to confirmed, and the list is then updated
+ *
+ * (This is just an experiment.. there is a lot of work to do here..
+ *  For example, we need to support multiple files...
+ *  and multiple messages in a single block)
+ *
+ * I think we should support writing messages in blocks, and confirm a single block.
+ *
+ * At this point confirming a block is a slow operation, but we can improve this... maybe avoiding
+ * double linkes lists what forces two updates on each insert
+ * (what is easy to support at this point).
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class DataFile
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Lock used for while the index is being updated. You can't have multiple handles 
+   Object lockIndex = new Object();
+
+   // We reuse the same instance of ByteBuffer used on indexes since the access is synchronized 
+   ByteBuffer indexBuffer = ByteBuffer.allocate(BlockIndex.REGISTER_SIZE);
+
+   int numberOfBlocks;
+
+   FileChannel indexChannel;
+   FileChannel dataChannel;
+
+   RandomAccessFile index;
+   RandomAccessFile data;
+
+   BlockIndex root;
+   BlockIndex last;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public DataFile(File indexFile, File dataFile) throws IOException
+   {
+      if (indexFile.exists() && dataFile.exists())
+      {
+         recover(indexFile, dataFile);
+      }
+      else
+      {
+         init(indexFile, dataFile);
+      }
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public BlockIndex addBlock(ByteBuffer dataBuffer) throws IOException
+   {
+      return addBlock(dataBuffer, null);
+   }
+
+   /**
+    * Use precedent only when adding multiple blocks as part of the same confirmation.
+    * Say if you are adding 10 messages, the first message will have no precedence until you
+    * confirm it. You don't need to confirm subsequent messages.
+    *  */
+   public BlockIndex addBlock(ByteBuffer dataBuffer, BlockIndex precedent) throws IOException
+   {
+      BlockIndex block = null;
+      
+      synchronized (lockIndex)
+      {
+         block = new BlockIndex(numberOfBlocks++, dataBuffer.capacity(), dataChannel.size());
+         if (precedent!=null)
+         {
+            precedent.setNextBlock(block.getBlockId());
+            block.setPreviousBlock(precedent.getBlockId());
+            block.setConfirmed(true);
+            updateIndex(precedent);
+         }
+         updateIndex(block);
+      }
+
+      dataBuffer.rewind();
+      dataChannel.write(dataBuffer, block.getFilePosition());
+      dataChannel.force(false);
+
+      return block;
+   }
+
+   public void confirmBlock(BlockIndex block) throws IOException
+   {
+      if (block.isConfirmed())
+      {
+         throw new IOException("Block already confirmed!");
+      }
+
+      synchronized (lockIndex)
+      {
+         last.setNextBlock(block.getBlockId());
+         BlockIndex currentBlock = null;
+         for (Iterator iter = new IteratorImpl(last);iter.hasNext();)
+         {
+            currentBlock = (BlockIndex)iter.next();
+         }
+         if (currentBlock==null)
+         {
+            throw new IOException("Couldn't find last element on index");
+         }
+         updateIndex(last);
+         last = (BlockIndex)currentBlock.clone();
+      }
+   }
+
+   public BlockIndex readBlock (int blockId) throws IOException
+   {
+      return readBlock(new BlockIndex(), blockId);
+   }
+
+   public BlockIndex readBlock(BlockIndex blockToRead, int blockId) throws IOException
+   {
+      synchronized (lockIndex)
+      {
+         indexBuffer.rewind();
+         indexChannel.read(indexBuffer,getPosition(blockId));
+         indexBuffer.rewind();
+         blockToRead.readFromBuffer(indexBuffer);
+         return blockToRead;
+      }
+   }
+
+   public void close() throws IOException
+   {
+      index.close();
+      data.close();
+   }
+
+   public long getPosition(int blockId)
+   {
+      return (blockId+1) * BlockIndex.REGISTER_SIZE;
+   }
+
+   public Iterator iterateIndexes()
+   {
+      return new IteratorImpl();
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void recover(File indexFile, File dataFile) throws IOException
+   {
+      index = new RandomAccessFile(indexFile,"rw");
+      data = new RandomAccessFile(dataFile,"rw");
+      indexChannel = index.getChannel();
+      dataChannel = data.getChannel();
+
+      root = readBlock(-1);
+   }
+
+   protected void init(File indexFile, File dataFile) throws IOException
+   {
+      index = new RandomAccessFile(indexFile,"rw");
+      data = new RandomAccessFile(dataFile,"rw");
+      indexChannel = index.getChannel();
+      dataChannel = data.getChannel();
+      root = new BlockIndex(-1,-1,-1l);
+      last = root;
+      updateIndex(root);
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   /** This method should be called within a synchronized(lockIndex)*/
+   private void updateIndex(BlockIndex block) throws IOException
+   {
+      indexBuffer.rewind();
+
+      block.writeToBuffer(indexBuffer);
+
+      indexBuffer.rewind();
+
+      indexChannel.write(indexBuffer, getPosition(block.getBlockId()));
+
+      indexChannel.force(false);
+   }
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   class IteratorImpl implements Iterator
+   {
+
+      BlockIndex currentBlock;
+
+      IteratorImpl()
+      {
+         currentBlock = (BlockIndex) DataFile.this.root.clone();
+      }
+
+      IteratorImpl(BlockIndex startAt)
+      {
+         currentBlock = startAt;
+      }
+
+      public boolean hasNext()
+      {
+         return currentBlock.getNextBlock()>=0;
+      }
+
+      public Object next()
+      {
+         try
+         {
+            if (!hasNext())
+            {
+               throw new IllegalStateException("Already reached end of blocks");
+            }
+            currentBlock = DataFile.this.readBlock(currentBlock.getNextBlock());
+            return currentBlock;
+         }
+         catch (IOException e)
+         {
+            throw new RuntimeException(e);
+         }
+      }
+
+      public void remove()
+      {
+         throw new RuntimeException("Not supported!");
+      }
+   }
+
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java	2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,278 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.core.filepersist;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.messaging.core.filepersist.DataFile;
+import org.jboss.messaging.core.filepersist.BlockIndex;
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class DataFileTest extends MessagingTestCase
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   File fileIndex = new File("/tmp/file-index.bin");
+   File fileData = new File("/tmp/file-data.bin");
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public DataFileTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void testAdd() throws Exception
+   {
+      DataFile transactioned = new DataFile(fileIndex, fileData);
+
+      assertEquals(BlockIndex.REGISTER_SIZE, transactioned.getPosition(0));
+
+      ByteBuffer buffer = ByteBuffer.allocateDirect(4 * 5);
+      for (int i = 0; i < 5; i++)
+      {
+         buffer.putInt(i);
+      }
+      buffer.mark();
+
+
+      BlockIndex firstBlock = transactioned.addBlock(buffer);
+      BlockIndex currentBlock = firstBlock;
+      for (int i = 0; i < 1000; i++)
+      {
+         currentBlock = transactioned.addBlock(buffer, currentBlock);
+      }
+
+      Iterator iter = transactioned.iterateIndexes();
+      assertFalse(iter.hasNext());
+
+      transactioned.confirmBlock(firstBlock);
+
+      int elements = 0;
+
+      for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+      {
+         iter.next();
+         elements++;
+      }
+
+      assertEquals(1001, elements);
+
+      currentBlock = transactioned.addBlock(buffer);
+
+      elements = 0;
+
+      for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+      {
+         iter.next();
+         elements++;
+      }
+
+      assertEquals(1001, elements);
+
+      transactioned.confirmBlock(currentBlock);
+
+      elements = 0;
+
+      for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+      {
+         iter.next();
+         elements++;
+      }
+
+      assertEquals(1002, elements);
+
+      transactioned.close();
+   }
+
+   public void testAddMultiThread() throws Exception
+   {
+      final DataFile transactioned = new DataFile(fileIndex, fileData);
+      final Object semaphore = new Object();
+      final ArrayList failures = new ArrayList();
+
+      Thread threads[] = new Thread[20];
+
+      for (int i=0;i<threads.length;i++)
+      {
+         threads[i] = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ByteBuffer buffer = ByteBuffer.allocate(100);
+                  for (byte i=0;i<100;i++)
+                  {
+                     buffer.put(i);
+                  }
+                  synchronized (semaphore)
+                  {
+                     semaphore.wait();
+                  }
+
+                  BlockIndex first = transactioned.addBlock(buffer);
+                  BlockIndex current = first;
+
+                  for (byte i=0;i<99;i++)
+                  {
+                     current = transactioned.addBlock(buffer, current);
+                  }
+
+                  transactioned.confirmBlock(first);
+                  
+               }
+               catch (Exception e)
+               {
+                  log.error(e);
+                  failures.add(e);
+               }
+            }
+         };
+
+      }
+
+      for (int counter=0;counter<threads.length;counter++)
+      {
+         threads[counter].start();
+      }
+
+
+      Thread.sleep(2000);
+
+      synchronized (semaphore)
+      {
+         semaphore.notifyAll();
+      }
+
+      for (int counter=0;counter<threads.length;counter++)
+      {
+         threads[counter].join();
+      }
+
+      if (failures.size()>0)
+      {
+         throw (Exception) failures.get(0);
+      }
+
+      int elements = 0;
+
+      HashSet set = new HashSet();
+      
+      for (Iterator iter = transactioned.iterateIndexes(); iter.hasNext();)
+      {
+         BlockIndex index = (BlockIndex)iter.next();
+         set.add(new Integer(index.getBlockId()));
+         elements++;
+      }
+
+      assertEquals(threads.length * 100, elements);
+
+      for (int counter = 0; counter < threads.length * 100; counter++)
+      {
+         Integer intKey = new Integer(counter);
+         assertTrue("Could not find intKey=" + intKey, set.contains(intKey));
+      }
+
+
+   }
+
+   /*public void testDeleteme() throws Exception
+   {
+      File fileIndex = new File("/tmp/file-tmp.bin");
+      deleteFile(fileIndex);
+      RandomAccessFile file1 = new RandomAccessFile(fileIndex,"rw");
+
+      ByteBuffer buffer = ByteBuffer.allocate(100*4);
+      for (int i=0;i<100;i++)
+      {
+         buffer.putInt(i);
+      }
+
+      System.out.println("pos(1) = " + buffer.position() + " lim = " + buffer.limit());
+      buffer.rewind();
+      System.out.println("pos(2) = " + buffer.position() + " lim = " + buffer.limit());
+
+      for (int i=0;i<100;i++)
+      {
+         buffer.putInt(i);
+      }
+
+      buffer.rewind();
+      System.out.println("pos(3) = " + buffer.position() + " lim = " + buffer.limit());
+
+
+      FileChannel channel = file1.getChannel();
+
+      channel.write(buffer);
+
+      System.out.println("Size on file1=" + file1.length());
+
+      file1.close();
+
+
+   } */
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      deleteFile(fileIndex);
+      deleteFile(fileData);
+   }
+
+   protected void deleteFile(File file)
+   {
+      try
+      {
+         file.delete();
+      }
+      catch (Exception e)
+      {
+
+      }
+   }
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision




More information about the jboss-cvs-commits mailing list