[jboss-cvs] JBoss Messaging SVN: r2555 - in trunk: src/main/org/jboss/messaging/core and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 16 14:25:10 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-03-16 14:25:10 -0400 (Fri, 16 Mar 2007)
New Revision: 2555
Added:
trunk/src/main/org/jboss/messaging/core/filepersist/
trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
trunk/tests/src/org/jboss/test/messaging/core/filepersist/
trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
Modified:
trunk/docs/examples/ejb3mdb/do-not-distribute.properties
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-543 - this is just an experiment of a file persistence based..
This will probably change a lot... adding an initial version just based on my current experiments.
Modified: trunk/docs/examples/ejb3mdb/do-not-distribute.properties
===================================================================
--- trunk/docs/examples/ejb3mdb/do-not-distribute.properties 2007-03-16 16:20:30 UTC (rev 2554)
+++ trunk/docs/examples/ejb3mdb/do-not-distribute.properties 2007-03-16 18:25:10 UTC (rev 2555)
@@ -5,4 +5,4 @@
messaging.client.jar.path=../../../output/lib
messaging.client.jar.name=jboss-messaging-client.jar
jboss.configuration=messaging
-jboss.home=C:\\work\\src\\jboss-4.0.5.GA-src\\build\\output\\jboss-4.0.5.GA-ejb3
+jboss.home=/extra/clebert/jboss/jboss-install
Added: trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java 2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class BlockIndex implements Cloneable
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ public static final int REGISTER_SIZE = 4 * 4 + 8 + 1;
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ /** Immutable attribute */
+ private int blockId;
+
+ /** Immutable attribute, you can't change a block's size after allocated */
+ private int blockSize;
+
+ /** Immutable attribute, you can't change where a block is written after */
+ private long filePosition;
+
+ private boolean confirmed;
+
+ private int nextBlock=-1;
+
+ private int previousBlock=-1;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ BlockIndex()
+ {
+ }
+
+ public BlockIndex(int blockId, int blockSize, long filePosition)
+ {
+ this.blockId = blockId;
+ this.blockSize = blockSize;
+ this.filePosition = filePosition;
+ }
+
+ public BlockIndex(int blockId, int blockSize, long filePosition,
+ boolean confirmed, int nextBlock, int previousBlock)
+ {
+ this(blockId, blockSize, filePosition);
+ this.confirmed = confirmed;
+ this.nextBlock = nextBlock;
+ this.previousBlock = previousBlock;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+
+ public int getBlockId()
+ {
+ return blockId;
+ }
+
+ public int getBlockSize()
+ {
+ return blockSize;
+ }
+
+ public long getFilePosition()
+ {
+ return filePosition;
+ }
+
+ public int getNextBlock()
+ {
+ return nextBlock;
+ }
+
+ public void setNextBlock(int nextBlock)
+ {
+ this.nextBlock = nextBlock;
+ }
+
+
+ public boolean isConfirmed()
+ {
+ return confirmed;
+ }
+
+ public void setConfirmed(boolean confirmed)
+ {
+ this.confirmed = confirmed;
+ }
+
+
+ public int getPreviousBlock()
+ {
+ return previousBlock;
+ }
+
+ public void setPreviousBlock(int previousBlock)
+ {
+ this.previousBlock = previousBlock;
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+ buffer.putInt(nextBlock);
+ buffer.putInt(previousBlock);
+ buffer.put(confirmed?(byte)1:(byte)0);
+ buffer.putLong(filePosition);
+ buffer.putInt(blockSize);
+ buffer.putInt(blockId);
+ }
+
+ public void readFromBuffer(ByteBuffer buffer)
+ {
+ nextBlock = buffer.getInt();
+ previousBlock = buffer.getInt();
+ confirmed = (buffer.get()==(byte)1);
+ filePosition = buffer.getLong();
+ blockSize = buffer.getInt();
+ blockId = buffer.getInt();
+ }
+
+ public Object clone()
+ {
+ return new BlockIndex(blockId, blockSize, filePosition, confirmed, nextBlock, previousBlock);
+ }
+
+ public String toString()
+ {
+ return "BlockIndex[" + this.blockId + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java 2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,270 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.io.File;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * This class can play with BlockIndex
+ * A BlockIndex could be added, and later be confirmed.
+ * When a block is confirmed its status is set to confirmed, and the list is then updated
+ *
+ * (This is just an experiment.. there is a lot of work to do here..
+ * For example, we need to support multiple files...
+ * and multiple messages in a single block)
+ *
+ * I think we should support writing messages in blocks, and confirm a single block.
+ *
+ * At this point confirming a block is a slow operation, but we can improve this... maybe avoiding
+ * double linkes lists what forces two updates on each insert
+ * (what is easy to support at this point).
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DataFile
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Lock used for while the index is being updated. You can't have multiple handles
+ Object lockIndex = new Object();
+
+ // We reuse the same instance of ByteBuffer used on indexes since the access is synchronized
+ ByteBuffer indexBuffer = ByteBuffer.allocate(BlockIndex.REGISTER_SIZE);
+
+ int numberOfBlocks;
+
+ FileChannel indexChannel;
+ FileChannel dataChannel;
+
+ RandomAccessFile index;
+ RandomAccessFile data;
+
+ BlockIndex root;
+ BlockIndex last;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public DataFile(File indexFile, File dataFile) throws IOException
+ {
+ if (indexFile.exists() && dataFile.exists())
+ {
+ recover(indexFile, dataFile);
+ }
+ else
+ {
+ init(indexFile, dataFile);
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public BlockIndex addBlock(ByteBuffer dataBuffer) throws IOException
+ {
+ return addBlock(dataBuffer, null);
+ }
+
+ /**
+ * Use precedent only when adding multiple blocks as part of the same confirmation.
+ * Say if you are adding 10 messages, the first message will have no precedence until you
+ * confirm it. You don't need to confirm subsequent messages.
+ * */
+ public BlockIndex addBlock(ByteBuffer dataBuffer, BlockIndex precedent) throws IOException
+ {
+ BlockIndex block = null;
+
+ synchronized (lockIndex)
+ {
+ block = new BlockIndex(numberOfBlocks++, dataBuffer.capacity(), dataChannel.size());
+ if (precedent!=null)
+ {
+ precedent.setNextBlock(block.getBlockId());
+ block.setPreviousBlock(precedent.getBlockId());
+ block.setConfirmed(true);
+ updateIndex(precedent);
+ }
+ updateIndex(block);
+ }
+
+ dataBuffer.rewind();
+ dataChannel.write(dataBuffer, block.getFilePosition());
+ dataChannel.force(false);
+
+ return block;
+ }
+
+ public void confirmBlock(BlockIndex block) throws IOException
+ {
+ if (block.isConfirmed())
+ {
+ throw new IOException("Block already confirmed!");
+ }
+
+ synchronized (lockIndex)
+ {
+ last.setNextBlock(block.getBlockId());
+ BlockIndex currentBlock = null;
+ for (Iterator iter = new IteratorImpl(last);iter.hasNext();)
+ {
+ currentBlock = (BlockIndex)iter.next();
+ }
+ if (currentBlock==null)
+ {
+ throw new IOException("Couldn't find last element on index");
+ }
+ updateIndex(last);
+ last = (BlockIndex)currentBlock.clone();
+ }
+ }
+
+ public BlockIndex readBlock (int blockId) throws IOException
+ {
+ return readBlock(new BlockIndex(), blockId);
+ }
+
+ public BlockIndex readBlock(BlockIndex blockToRead, int blockId) throws IOException
+ {
+ synchronized (lockIndex)
+ {
+ indexBuffer.rewind();
+ indexChannel.read(indexBuffer,getPosition(blockId));
+ indexBuffer.rewind();
+ blockToRead.readFromBuffer(indexBuffer);
+ return blockToRead;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ index.close();
+ data.close();
+ }
+
+ public long getPosition(int blockId)
+ {
+ return (blockId+1) * BlockIndex.REGISTER_SIZE;
+ }
+
+ public Iterator iterateIndexes()
+ {
+ return new IteratorImpl();
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void recover(File indexFile, File dataFile) throws IOException
+ {
+ index = new RandomAccessFile(indexFile,"rw");
+ data = new RandomAccessFile(dataFile,"rw");
+ indexChannel = index.getChannel();
+ dataChannel = data.getChannel();
+
+ root = readBlock(-1);
+ }
+
+ protected void init(File indexFile, File dataFile) throws IOException
+ {
+ index = new RandomAccessFile(indexFile,"rw");
+ data = new RandomAccessFile(dataFile,"rw");
+ indexChannel = index.getChannel();
+ dataChannel = data.getChannel();
+ root = new BlockIndex(-1,-1,-1l);
+ last = root;
+ updateIndex(root);
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ /** This method should be called within a synchronized(lockIndex)*/
+ private void updateIndex(BlockIndex block) throws IOException
+ {
+ indexBuffer.rewind();
+
+ block.writeToBuffer(indexBuffer);
+
+ indexBuffer.rewind();
+
+ indexChannel.write(indexBuffer, getPosition(block.getBlockId()));
+
+ indexChannel.force(false);
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ class IteratorImpl implements Iterator
+ {
+
+ BlockIndex currentBlock;
+
+ IteratorImpl()
+ {
+ currentBlock = (BlockIndex) DataFile.this.root.clone();
+ }
+
+ IteratorImpl(BlockIndex startAt)
+ {
+ currentBlock = startAt;
+ }
+
+ public boolean hasNext()
+ {
+ return currentBlock.getNextBlock()>=0;
+ }
+
+ public Object next()
+ {
+ try
+ {
+ if (!hasNext())
+ {
+ throw new IllegalStateException("Already reached end of blocks");
+ }
+ currentBlock = DataFile.this.readBlock(currentBlock.getNextBlock());
+ return currentBlock;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new RuntimeException("Not supported!");
+ }
+ }
+
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java 2007-03-16 18:25:10 UTC (rev 2555)
@@ -0,0 +1,278 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core.filepersist;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.messaging.core.filepersist.DataFile;
+import org.jboss.messaging.core.filepersist.BlockIndex;
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DataFileTest extends MessagingTestCase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ File fileIndex = new File("/tmp/file-index.bin");
+ File fileData = new File("/tmp/file-data.bin");
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public DataFileTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void testAdd() throws Exception
+ {
+ DataFile transactioned = new DataFile(fileIndex, fileData);
+
+ assertEquals(BlockIndex.REGISTER_SIZE, transactioned.getPosition(0));
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(4 * 5);
+ for (int i = 0; i < 5; i++)
+ {
+ buffer.putInt(i);
+ }
+ buffer.mark();
+
+
+ BlockIndex firstBlock = transactioned.addBlock(buffer);
+ BlockIndex currentBlock = firstBlock;
+ for (int i = 0; i < 1000; i++)
+ {
+ currentBlock = transactioned.addBlock(buffer, currentBlock);
+ }
+
+ Iterator iter = transactioned.iterateIndexes();
+ assertFalse(iter.hasNext());
+
+ transactioned.confirmBlock(firstBlock);
+
+ int elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1001, elements);
+
+ currentBlock = transactioned.addBlock(buffer);
+
+ elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1001, elements);
+
+ transactioned.confirmBlock(currentBlock);
+
+ elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1002, elements);
+
+ transactioned.close();
+ }
+
+ public void testAddMultiThread() throws Exception
+ {
+ final DataFile transactioned = new DataFile(fileIndex, fileData);
+ final Object semaphore = new Object();
+ final ArrayList failures = new ArrayList();
+
+ Thread threads[] = new Thread[20];
+
+ for (int i=0;i<threads.length;i++)
+ {
+ threads[i] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+ for (byte i=0;i<100;i++)
+ {
+ buffer.put(i);
+ }
+ synchronized (semaphore)
+ {
+ semaphore.wait();
+ }
+
+ BlockIndex first = transactioned.addBlock(buffer);
+ BlockIndex current = first;
+
+ for (byte i=0;i<99;i++)
+ {
+ current = transactioned.addBlock(buffer, current);
+ }
+
+ transactioned.confirmBlock(first);
+
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ failures.add(e);
+ }
+ }
+ };
+
+ }
+
+ for (int counter=0;counter<threads.length;counter++)
+ {
+ threads[counter].start();
+ }
+
+
+ Thread.sleep(2000);
+
+ synchronized (semaphore)
+ {
+ semaphore.notifyAll();
+ }
+
+ for (int counter=0;counter<threads.length;counter++)
+ {
+ threads[counter].join();
+ }
+
+ if (failures.size()>0)
+ {
+ throw (Exception) failures.get(0);
+ }
+
+ int elements = 0;
+
+ HashSet set = new HashSet();
+
+ for (Iterator iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ BlockIndex index = (BlockIndex)iter.next();
+ set.add(new Integer(index.getBlockId()));
+ elements++;
+ }
+
+ assertEquals(threads.length * 100, elements);
+
+ for (int counter = 0; counter < threads.length * 100; counter++)
+ {
+ Integer intKey = new Integer(counter);
+ assertTrue("Could not find intKey=" + intKey, set.contains(intKey));
+ }
+
+
+ }
+
+ /*public void testDeleteme() throws Exception
+ {
+ File fileIndex = new File("/tmp/file-tmp.bin");
+ deleteFile(fileIndex);
+ RandomAccessFile file1 = new RandomAccessFile(fileIndex,"rw");
+
+ ByteBuffer buffer = ByteBuffer.allocate(100*4);
+ for (int i=0;i<100;i++)
+ {
+ buffer.putInt(i);
+ }
+
+ System.out.println("pos(1) = " + buffer.position() + " lim = " + buffer.limit());
+ buffer.rewind();
+ System.out.println("pos(2) = " + buffer.position() + " lim = " + buffer.limit());
+
+ for (int i=0;i<100;i++)
+ {
+ buffer.putInt(i);
+ }
+
+ buffer.rewind();
+ System.out.println("pos(3) = " + buffer.position() + " lim = " + buffer.limit());
+
+
+ FileChannel channel = file1.getChannel();
+
+ channel.write(buffer);
+
+ System.out.println("Size on file1=" + file1.length());
+
+ file1.close();
+
+
+ } */
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ deleteFile(fileIndex);
+ deleteFile(fileData);
+ }
+
+ protected void deleteFile(File file)
+ {
+ try
+ {
+ file.delete();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
More information about the jboss-cvs-commits
mailing list