[jboss-cvs] JBoss Messaging SVN: r4677 - in trunk: src/main/org/jboss/messaging/core/journal and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 11 21:14:16 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-11 21:14:15 -0400 (Fri, 11 Jul 2008)
New Revision: 4677

Added:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
Removed:
   trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
Some journal work (tests, fixes and improvements)

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/config/jbm-configuration.xml	2008-07-12 01:14:15 UTC (rev 4677)
@@ -101,8 +101,9 @@
            - closing Asynchronous files
            - Transaction awaits
            - Awaits on non transactional writes
+           This shouldn't be higher than the remoting timeout
       -->
-      <journal-aio-timeout>60000</journal-aio-timeout>
+      <journal-aio-timeout>4000</journal-aio-timeout>
 
       <journal-task-period>5000</journal-task-period>
       

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -42,8 +42,10 @@
 
 	void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
 	
-	void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
-	
+   void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+   
+   void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
+   
 	void appendDeleteRecord(long id) throws Exception;
 	
 	// Transactional operations
@@ -54,8 +56,10 @@
    
 	void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 	
-	void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-	
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+   
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   
 	void appendDeleteRecordTransactional(long txID, long id) throws Exception;
 	
 	void appendCommitRecord(long txID) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -45,5 +45,7 @@
 
    // Avoid using this method in production as it creates an unecessary copy 
    ByteBuffer wrapBuffer(byte[] bytes);
+   
+   int getAlignment();
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -62,5 +62,7 @@
    
    int getMaxAIO();
    
-   long getAIOTimeout();   	
+   long getAIOTimeout();
+   
+   void forceMoveNextFile() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -238,6 +238,10 @@
 	   }		
 	}
 
+	public String toString()
+	{
+	   return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;
+	}
 	
 	// Private methods
 	// -----------------------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -65,11 +65,16 @@
       return ByteBuffer.allocateDirect(size);
    }
    
+   public int getAlignment()
+   {
+      return 512;
+   }
+   
    // For tests only
    public ByteBuffer wrapBuffer(final byte[] bytes)
    {
       ByteBuffer newbuffer = newBuffer(bytes.length);
       newbuffer.put(bytes);
       return newbuffer;
-   };
    }
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -173,13 +173,13 @@
 
 	private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
 	
-   private final ExecutorService closingExecutor = Executors.newSingleThreadExecutor();
+   private ExecutorService closingExecutor = null;
    
    /** 
     * We have a separated executor for open, as if we used the same executor this would still represent
     * a point of wait between the closing and open.
     * */
-   private final ExecutorService openExecutor = Executors.newSingleThreadExecutor();
+   private ExecutorService openExecutor = null;
    
 	/*
     * We use a semaphore rather than synchronized since it performs better when
@@ -187,6 +187,7 @@
     */
 	
 	//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+   // this locks access to currentFile
 	private final Semaphore lock = new Semaphore(1, true);
 	
 	private volatile JournalFile currentFile ;
@@ -314,37 +315,68 @@
 		posFilesMap.put(id, new PosFiles(usedFile));
 	}
 	
-	public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		PosFiles posFiles = posFilesMap.get(id);
-		
-		if (posFiles == null)
-		{
-			throw new IllegalStateException("Cannot find add info " + id);
-		}
-		
-		int size = SIZE_UPDATE_RECORD + record.length;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(UPDATE_RECORD);     
-		bb.putLong(id);      
-		bb.put(recordType);
-		bb.putInt(record.length);     
-		bb.put(record);      
-		bb.put(DONE);     
-		bb.rewind();
-		   
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      PosFiles posFiles = posFilesMap.get(id);
+      
+      if (posFiles == null)
+      {
+         throw new IllegalStateException("Cannot find add info " + id);
+      }
+      
+      int size = SIZE_UPDATE_RECORD + record.length;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(UPDATE_RECORD);     
+      bb.putLong(id);      
+      bb.put(recordType);
+      bb.putInt(record.length);     
+      bb.put(record);      
+      bb.put(DONE);     
+      bb.rewind();
+         
       JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-      		
-		posFiles.addUpdateFile(usedFile);
-	}
-	
+            
+      posFiles.addUpdateFile(usedFile);
+   }
+   
+   public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      PosFiles posFiles = posFilesMap.get(id);
+      
+      if (posFiles == null)
+      {
+         throw new IllegalStateException("Cannot find add info " + id);
+      }
+      
+      int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+      
+      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+      
+      bb.putByte(UPDATE_RECORD);     
+      bb.putLong(id);      
+      bb.putByte(recordType);
+      bb.putInt(record.getEncodeSize());
+      record.encode(bb);
+      bb.putByte(DONE);     
+      bb.rewind();
+         
+      JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+            
+      posFiles.addUpdateFile(usedFile);
+   }
+   
 	public void appendDeleteRecord(long id) throws Exception
 	{
 		if (state != STATE_LOADED)
@@ -359,8 +391,6 @@
 			throw new IllegalStateException("Cannot find add info " + id);
 		}
 		
-		posFiles.addDelete(currentFile);
-		
 		int size = SIZE_DELETE_RECORD;
 		
 		ByteBuffer bb = fileFactory.newBuffer(size); 
@@ -370,7 +400,8 @@
 		bb.put(DONE);     
 		bb.rewind();
 		
-      appendRecord(bb, syncNonTransactional, null);      
+      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+      posFiles.addDelete(usedFile);
 	}     
 	
 	public long getTransactionID()
@@ -439,35 +470,65 @@
 	   tx.addPos(usedFile, id);
    }
 	
-	public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		int size = SIZE_UPDATE_RECORD_TX + record.length; 
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(UPDATE_RECORD_TX);     
-		bb.putLong(txID);
-		bb.put(recordType);
-		bb.putLong(id);      
-		bb.putInt(record.length);     
-		bb.put(record);
-		bb.put(DONE);     
-		bb.rewind();
-		
+   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_UPDATE_RECORD_TX + record.length; 
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(UPDATE_RECORD_TX);     
+      bb.putLong(txID);
+      bb.put(recordType);
+      bb.putLong(id);      
+      bb.putInt(record.length);     
+      bb.put(record);
+      bb.put(DONE);     
+      bb.rewind();
+      
       JournalFile usedFile;
       
       usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-		
+      
       TransactionNegPos tx = getTransactionInfo(txID);
       
-		tx.addPos(usedFile, id);
-	}
-	
+      tx.addPos(usedFile, id);
+   }
+   
+   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize(); 
+      
+      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size)); 
+      
+      
+      bb.putByte(UPDATE_RECORD_TX);     
+      bb.putLong(txID);
+      bb.putByte(recordType);
+      bb.putLong(id);      
+      bb.putInt(record.getEncodeSize());
+      record.encode(bb);
+      bb.putByte(DONE);     
+      bb.rewind();
+      
+      JournalFile usedFile;
+      
+      usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+      
+      TransactionNegPos tx = getTransactionInfo(txID);
+      
+      tx.addPos(usedFile, id);
+   }
+   
 	public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
 	{
 		if (state != STATE_LOADED)
@@ -647,7 +708,7 @@
       
       for (JournalFile file: orderedFiles)
       {  
-         file.getFile().open();//aki
+         file.getFile().open();
             
          ByteBuffer bb = fileFactory.newBuffer(fileSize);
          
@@ -1112,6 +1173,9 @@
             
             //Reverse the refs
             transactionInfo.forget();
+            
+            // Remove the transactionInfo
+            transactionInfos.remove(transaction.transactionID);
          }
          else
          {
@@ -1132,7 +1196,7 @@
 
 	public int getAlignment() throws Exception
 	{
-		return this.currentFile.getFile().getAlignment();
+		return this.fileFactory.getAlignment();
 	}
 	
 	public synchronized void checkReclaimStatus() throws Exception
@@ -1158,12 +1222,25 @@
          }
       }
       
+      for (JournalFile file: freeFiles)
+      {
+         builder.append("FreeFile:" + file + "\n");
+      }
+      
       builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
-      builder.append(((JournalFileImpl)currentFile).debug());
+      
+      if (currentFile instanceof JournalFileImpl)
+      {
+         builder.append(((JournalFileImpl)currentFile).debug());
+      }
+      
+      builder.append("#Opened Files:" + this.openedFiles.size());
             
       return builder.toString();
    }
    
+   // TestableJournal implementation --------------------------------------------------------------
+	
    /** Method for use on testcases.
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
@@ -1173,7 +1250,7 @@
          callback.waitCompletion(aioTimeout);
       }
       
-      if (!closingExecutor.isShutdown())
+      if (closingExecutor != null && !closingExecutor.isShutdown())
       {
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
@@ -1189,7 +1266,7 @@
          latch.await();
       }
 
-      if (!openExecutor.isShutdown())
+      if (openExecutor != null && !openExecutor.isShutdown())
       {
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
@@ -1207,8 +1284,6 @@
    
    }
 
-   // TestableJournal implementation --------------------------------------------------------------
-	
 	public synchronized void checkAndReclaimFiles() throws Exception
 	{
 		checkReclaimStatus();
@@ -1219,20 +1294,23 @@
 			{
 				//File can be reclaimed or deleted
 				
-				if (trace) log.trace("Reclaiming file " + file);
+            if (trace) log.trace("Reclaiming file " + file);
+            log.info("Reclaiming file " + file); // remove this
 				
 				dataFiles.remove(file);
 				
 				//FIXME - size() involves a scan!!!
-				if (freeFiles.size() + dataFiles.size() + 1 < minFiles)
-				{              
+				if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+				{
 					//Re-initialise it
 					
 					long newOrderingID = generateOrderingID();
 					
 					SequentialFile sf = file.getFile();
 					
-					sf.open();
+               log.info("Adding " + sf + "to freeFiles");  // remove this
+
+               sf.open();
 					
 					ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG); 
 					
@@ -1258,6 +1336,8 @@
 				}
 				else
 				{
+               log.info("Deleting " + file.getFile());  // remove this
+
 					file.getFile().open();
 					
 					file.getFile().delete();
@@ -1330,7 +1410,24 @@
    {
       return aioTimeout;
    }
-	
+   
+   // In some tests we need to force the journal to move to a next file
+   public void forceMoveNextFile() throws Exception
+   {
+      lock.acquire();
+      
+      try
+      {
+         moveNextFile();
+      }
+      finally
+      {
+         lock.release();
+      }
+      
+      debugWait();
+   }
+
 	// MessagingComponent implementation ---------------------------------------------------
 	
 	public synchronized boolean isStarted()
@@ -1345,6 +1442,9 @@
 			throw new IllegalStateException("Journal is not stopped");
 		}
 		
+		this.openExecutor =  Executors.newSingleThreadExecutor();
+		this.closingExecutor = Executors.newSingleThreadExecutor();
+		
 		state = STATE_STARTED;
 	}
 	
@@ -1358,7 +1458,7 @@
 		stopReclaimer();
 		
 		closingExecutor.shutdown();
-		if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.SECONDS))
+		if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
 		{
 		   throw new IllegalStateException("Time out waiting for closing executor to finish");
 		}
@@ -1369,12 +1469,11 @@
 		}
 
 		openExecutor.shutdown();
-      if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.SECONDS))
+      if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
       {
          throw new IllegalStateException("Time out waiting for open executor to finish");
       }
       
-
 		for (JournalFile file: openedFiles)
 		{
 			file.getFile().close();
@@ -1522,6 +1621,7 @@
 		return orderingID;
 	}
 
+	// You need to guarantee lock.acquire() over currentFile before calling this method
 	private void checkFile(final int size) throws Exception
 	{		
 		if (size % currentFile.getFile().getAlignment() != 0)
@@ -1537,21 +1637,21 @@
 		
 		if (currentFile == null || fileSize - currentFile.getOffset() < size)
 		{
-		   closeFile(currentFile);
+		   moveNextFile();
 
-		   enqueueOpenFile();
-		   
-		   currentFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
-		   
-		   if (currentFile == null)
-		   {
-		      throw new IllegalStateException("Timed out waiting for an opened file");
-		   }
-
 		}     
 	}
 	
-	private void enqueueOpenFile()
+	// You need to guarantee lock.acquire() before calling this method
+   private void moveNextFile() throws InterruptedException
+   {
+      closeFile(currentFile);
+
+      currentFile = enqueueOpenFile();
+   }
+	
+   // You need to guarantee lock.acquire() before calling this method
+	private JournalFile enqueueOpenFile() throws InterruptedException
 	{
 	   if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
 	   openExecutor.execute(new Runnable()
@@ -1568,12 +1668,21 @@
             }
          }
       });
+	   
+	   JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
+	   
+	   if (nextFile == null)
+	   {
+         throw new IllegalStateException("Timed out waiting for an opened file");
+	   }
+	   
+	   return nextFile;
 	}
 	
 	
    /** 
     * 
-    * Open a file an place it into the openedFiles queue
+    * Open a file and place it into the openedFiles queue
     * */
    private void pushOpenedFile() throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -63,5 +63,10 @@
       return ByteBuffer.wrap(bytes);
    }
    
+   public int getAlignment()
+   {
+      return 1;
+   }
+   
 	
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.Journal;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
@@ -199,7 +200,7 @@
 
 	public void storeAcknowledge(final long queueID, final long messageID) throws Exception
 	{		
-		byte[] record = ackBytes(queueID, messageID);
+	   EncodingSupport record = ackBytes(queueID, messageID);
 		
 		messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, record);					
 	}
@@ -218,7 +219,7 @@
    
    public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
    {
-   	byte[] record = ackBytes(queueID, messageID);
+   	EncodingSupport record = ackBytes(queueID, messageID);
 		
 		messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, record);	
    }
@@ -601,17 +602,9 @@
 	
 	// Private ----------------------------------------------------------------------------------
 	
-	private byte[] ackBytes(final long queueID, final long messageID)
+	private EncodingSupport ackBytes(final long queueID, final long messageID)
    {
-      byte[] record = new byte[SIZE_LONG + SIZE_LONG];
-      
-      ByteBuffer bb = ByteBuffer.wrap(record);
-      
-      bb.putLong(queueID);
-      
-      bb.putLong(messageID);
-      
-      return record;
+      return new ACKRecord(queueID, messageID);
    }
 	
 	private void checkAndCreateDir(String dir, boolean create)
@@ -643,5 +636,40 @@
 			log.info("Directory " + dir + " already exists");
 		}
 	}
+	
+   // Inner Classes ----------------------------------------------------------------------------
 
+	class ACKRecord implements EncodingSupport
+   {
+      private long queueID;
+      private long messageID;
+      
+      
+
+      public ACKRecord(long queueID, long messageID)
+      {
+         super();
+         this.queueID = queueID;
+         this.messageID = messageID;
+      }
+
+      public void decode(MessagingBuffer buffer)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putLong(queueID);
+         buffer.putLong(messageID);
+      }
+
+      public int getEncodeSize()
+      {
+         return SIZE_LONG * 2;
+      }
+      
+   }
+   
+
 }

Deleted: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -1,119 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.tools.ant;
-
-import java.io.OutputStream;
-
-import junit.framework.AssertionFailedError;
-import junit.framework.Test;
-
-import org.apache.tools.ant.BuildException;
-import org.apache.tools.ant.taskdefs.optional.junit.JUnitResultFormatter;
-import org.apache.tools.ant.taskdefs.optional.junit.JUnitTest;
-
-/**
- * This class is a hack.
- *
- * I needed a way to intercept the end of a forked ant JUnit test run, in order to perform some
- * clean-up, and this is it: register this class as a JUnit batchtest formatter, and it will get
- * notified on a endTestSuite() event. Very important, it is run in the same address space as the
- * tests themselves.
- *
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class JUnitTestSuiteListener implements JUnitResultFormatter
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // JUnitResultFormatter implementation ---------------------------
-
-   public void endTestSuite(JUnitTest suite) throws BuildException
-   {
-//      try
-//      {
-//         List destroyed = ServerManagement.destroySpawnedServers();
-//         if (destroyed.size() > 0)
-//         {
-//            StringBuffer sb = new StringBuffer("Destroyed spawned test servers ");
-//            for(Iterator i = destroyed.iterator(); i.hasNext();)
-//            {
-//               sb.append(i.next());
-//               if (i.hasNext())
-//               {
-//                  sb.append(',');
-//               }
-//            }
-//            System.out.println(sb);
-//         }
-//      }
-//      catch(Throwable t)
-//      {
-//         t.printStackTrace();
-//      }
-   }
-
-   public void startTestSuite(JUnitTest suite) throws BuildException
-   {
-      // noop
-   }
-
-   public void setOutput(OutputStream out)
-   {
-      // noop
-   }
-
-   public void setSystemOutput(String out)
-   {
-      // noop
-   }
-
-   public void setSystemError(String err)
-   {
-      // noop
-   }
-
-   // TestListener implementation -----------------------------------
-
-   public void addError(Test test, Throwable t)
-   {
-      // noop
-   }
-
-   public void addFailure(Test test, AssertionFailedError t)
-   {
-      // noop
-   }
-
-   public void endTest(Test test)
-   {
-      // noop
-   }
-
-   public void startTest(Test test)
-   {
-      // noop
-   }
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}


Property changes on: trunk/tests/src/org/jboss/messaging/tests
___________________________________________________________________
Name: svn:ignore
   + local


Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -174,7 +174,7 @@
       
       double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
       
-      log.debug("Rate of " + rate + " adds/removes per sec");
+      log.info("Rate of " + rate + " adds/removes per sec");
       
       log.debug("Reclaim status = " + debugJournal());
                
@@ -238,13 +238,13 @@
          
          for (double rate: rates)
          {
-            log.debug("Transaction Rate = " + rate + " records/sec");
+            log.info("Transaction Rate = " + rate + " records/sec");
             
          }
          
          double rate = 1000 * (double)numMessages / (end - start);
          
-         log.debug("Rate " + rate + " records/sec");
+         log.info("Rate " + rate + " records/sec");
       }
       finally
       {
@@ -285,7 +285,7 @@
       
       double rate = 1000 * (double)numMessages / (end - start);
       
-      log.debug("Rate " + rate + " records/sec");
+      log.info("Rate " + rate + " records/sec");
 
       journal.stop();
       

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -0,0 +1,462 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class AlignedJournalImplTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private int alignment = 0;
+   
+   private FakeSequentialFileFactory factory;
+
+   JournalImpl journalImpl = null;
+   
+   private ArrayList<RecordInfo> records = null;
+
+   private ArrayList<PreparedTransactionInfo> transactions = null;
+   
+   
+   // Static --------------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(AlignedJournalImplTest.class);
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   // This test just validates basic alignment on the FakeSequentialFile itself
+   public void testBasicAlignment() throws Exception
+   {
+      
+      FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200,
+            true, false);
+      
+      SequentialFile file = factory.createSequentialFile("test1", 100, 10000);
+
+      file.open();
+
+      
+      
+      try
+      {
+         ByteBuffer buffer = ByteBuffer.allocateDirect(57);
+         file.write(buffer, true);
+         fail("Exception expected");
+      }
+      catch (Exception ignored)
+      {
+      }
+      
+      try
+      {
+         ByteBuffer buffer = ByteBuffer.allocateDirect(200);
+         for (int i = 0; i < 200; i++)
+         {
+            buffer.put(i, (byte) 1);
+         }
+         
+         file.write(buffer, true);
+         
+         buffer = ByteBuffer.allocate(400);
+         for (int i = 0; i < 400; i++)
+         {
+            buffer.put(i, (byte) 2);
+         }
+         
+         file.write(buffer, true);
+         
+         buffer = ByteBuffer.allocate(600);
+
+         file.position(0);
+         
+         file.read(buffer);
+         
+         for (int i = 0; i < 200; i++)
+         {
+            assertEquals((byte)1, buffer.get(i));
+         }
+         
+         for (int i = 201; i < 600; i++)
+         {
+            assertEquals("Position " + i, (byte)2, buffer.get(i));
+         }
+         
+      }
+      catch (Exception ignored)
+      {
+      }
+   }
+   
+   public void testAppendAndUpdateRecords() throws Exception
+   {
+      
+      final int JOURNAL_SIZE = 51 * 1024;
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+
+      for (int i = 0; i < 25; i++)
+      {
+         byte[] bytes = new byte[100];
+         for (int j=0; j<bytes.length; j++)
+         {
+            bytes[j] = (byte)i;
+         }
+         journalImpl.appendAddRecord(i * 100l, (byte)i, bytes);
+      }
+      
+      for (int i = 25; i < 50; i++)
+      {
+         EncodingSupport support = new SimpleEncoding(100, (byte) i);
+         journalImpl.appendAddRecord(i * 100l, (byte)i, support);
+      }
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(50, records.size());
+      
+      int i=0;
+      for (RecordInfo recordItem: records)
+      {
+         assertEquals(i * 100l, recordItem.id);
+         assertEquals(i, recordItem.getUserRecordType());
+         assertEquals(100, recordItem.data.length);
+         for (int j=0;j<100;j++)
+         {
+            assertEquals((byte)i, recordItem.data[j]);
+         }
+         
+         i++;
+      }
+      
+      for (i = 40; i < 50; i++)
+      {
+         byte[] bytes = new byte[10];
+         for (int j = 0; j < 10; j++)
+         {
+            bytes[j] = (byte)'x';
+         }
+         
+         journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes);
+      }
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      i=0;
+      for (RecordInfo recordItem: records)
+      {
+         
+         if (i < 50)
+         {
+            assertEquals(i * 100l, recordItem.id);
+            assertEquals(i, recordItem.getUserRecordType());
+            assertEquals(100, recordItem.data.length);
+            for (int j=0;j<100;j++)
+            {
+               assertEquals((byte)i, recordItem.data[j]);
+            }
+         }
+         else
+         {
+            assertEquals((i - 10) * 100l, recordItem.id);
+            assertEquals(i - 10, recordItem.getUserRecordType());
+            assertTrue(recordItem.isUpdate);
+            assertEquals(10, recordItem.data.length);
+            for (int j=0;j<10;j++)
+            {
+               assertEquals((byte)'x', recordItem.data[j]);
+            }
+         }
+         
+         i++;
+      }
+      
+      journalImpl.stop();
+      
+   }
+   
+   public void testAddAndDeleteReclaimWithoutTransactions() throws Exception
+   {
+      final int JOURNAL_SIZE = 51 * 1024;
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      journalImpl.debugWait();
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+      log.debug("Initial:--> " + journalImpl.debug());
+      
+      log.debug("_______________________________");
+      
+      for (int i = 0; i < 50; i++)
+      {
+         journalImpl.appendAddRecord((long)i, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+      }
+   
+      // as the request to a new file is asynchronous, we need to make sure the async requests are done
+      journalImpl.debugWait();
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+      for (int i = 0; i < 50; i++)
+      {
+         journalImpl.appendDeleteRecord((long)i);
+      }
+      
+      journalImpl.appendAddRecord((long)1000, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+      
+      journalImpl.debugWait();
+      
+      assertEquals(4, factory.listFiles("tt").size());
+
+      journalImpl.checkReclaimStatus();
+
+      log.debug(journalImpl.debug());
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      log.debug(journalImpl.debug());
+      
+      journalImpl.debugWait();
+      
+      log.debug("Final:--> " + journalImpl.debug());
+      
+      log.debug("_______________________________");
+
+      log.debug("Files size:" + factory.listFiles("tt").size());
+      
+      assertEquals(2, factory.listFiles("tt").size());
+
+   }
+
+   public void testReloadWithTransaction() throws Exception
+   {
+      final int JOURNAL_SIZE = 51 * 1024;
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+
+      try
+      {
+         journalImpl.appendCommitRecord(1l);
+         // This was supposed to throw an exception, as the transaction was forgotten (interrupted by a reload).
+         fail("Supposed to throw exception");
+      }
+      catch (Exception e)
+      {
+         log.warn(e);
+      }
+
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+   }
+   
+   public void testReclaimWithInterruptedTransaction() throws Exception
+   {
+      final int JOURNAL_SIZE = 51 * 1024;
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      for (int i = 0; i < 10; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(50,(byte) 1));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      journalImpl.debugWait();
+      
+      //System.out.println("files = " + journalImpl.debug());
+      
+      assertEquals(12, factory.listFiles("tt").size());
+      
+      journalImpl.appendAddRecordTransactional(2, 1, (byte) 1, new SimpleEncoding(200,(byte) 1));
+
+      assertEquals(12, factory.listFiles("tt").size());
+      
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      //System.out.println("Journal - " + journalImpl.debug());
+
+      try
+      {
+         journalImpl.appendCommitRecord(1l);
+         // This was supposed to throw an exception, as the transaction was forgotten (interrupted by a reload).
+         fail("Supposed to throw exception");
+      }
+      catch (Exception e)
+      {
+         log.debug("Expected exception " + e, e);
+      }
+
+      setupJournal(JOURNAL_SIZE, 1024);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      assertEquals(12, factory.listFiles("tt").size());
+
+      journalImpl.checkAndReclaimFiles();
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      records = new ArrayList<RecordInfo>();
+
+      transactions = new ArrayList<PreparedTransactionInfo>();
+      
+      factory = null;
+      
+      journalImpl = null;
+      
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+   
+  
+   // Private -------------------------------------------------------
+
+   private void setupJournal(final int journalSize, final int alignment) throws Exception
+   {
+      if (factory == null)
+      {
+         this.alignment = alignment;
+         
+         factory = new FakeSequentialFileFactory(alignment,
+               true, false);
+      }
+      
+      if (journalImpl != null)
+      {
+         journalImpl.stop();
+      }
+      
+      journalImpl = new JournalImpl(journalSize, 2,
+            true, true,
+            factory, 1000,
+            "tt", "tt", 1000, 1000);
+      
+      journalImpl.start();
+      
+      records.clear();
+      transactions.clear();
+      
+      journalImpl.load(records, transactions);
+   }
+   
+
+   // Inner classes -------------------------------------------------
+   
+   private class SimpleEncoding implements EncodingSupport
+   {
+
+      private final int size;
+      private final byte bytetosend;
+      
+      public SimpleEncoding(int size, byte bytetosend)
+      {
+         this.size = size;
+         this.bytetosend = bytetosend;
+      }
+      
+      public void decode(MessagingBuffer buffer)
+      {
+         throw new UnsupportedOperationException();
+         
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         for (int i = 0; i < size; i++)
+         {
+            buffer.putByte(bytetosend);
+         }
+      }
+
+      public int getEncodeSize()
+      {
+         return size;
+      }
+      
+   }
+   
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -115,6 +115,7 @@
 	{
 	   journal.debugWait();
 	   journal.checkAndReclaimFiles();
+      journal.debugWait();
 	}
 	
 	protected abstract SequentialFileFactory getFileFactory() throws Exception;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -99,6 +99,28 @@
 		stopJournal();    
 	}
 	
+	public void testRestartJournal() throws Exception
+	{
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      stopJournal();
+      startJournal();
+      load();
+      byte[] record = new byte[1000];
+      for (int i = 0; i < record.length; i++)
+      {
+         record[i] = (byte)'a';
+      }
+      // Appending records after restart should be valid (not throwing any exceptions)
+      for (int i = 0; i < 100; i++)
+      {
+         journal.appendAddRecord(1, (byte)1, new byte[] {(byte)'a', (byte)'a'});
+      }
+      stopJournal();
+	}
+	
 	public void testParams() throws Exception
 	{
 		try
@@ -609,12 +631,12 @@
 		//10
 		
 		assertEquals(journal.getAlignment()==1?6:7, journal.getDataFilesCount());
-		assertEquals(journal.getAlignment()==1?3:2, journal.getFreeFilesCount());
+		assertEquals(journal.getAlignment()==1?2:1, journal.getFreeFilesCount());
 		assertEquals(initialNumberOfAddRecords /2 + 10, journal.getIDMapSize());
 		
 		List<String> files5 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(11, files5.size());
+		assertEquals(10, files5.size());
 		assertEquals(1, journal.getOpenedFilesCount());
 		
 		//Now delete the rest
@@ -634,13 +656,13 @@
 		
 		checkAndReclaimFiles();
 		
-		assertEquals(0, journal.getDataFilesCount());
-		assertEquals(9, journal.getFreeFilesCount());
+		assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+		assertEquals(journal.getAlignment()==1?8:7, journal.getFreeFilesCount());
 		assertEquals(0, journal.getIDMapSize());
 		
 		List<String> files6 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(11, files6.size());
+		assertEquals(10, files6.size());
 		assertEquals(1, journal.getOpenedFilesCount());
 		
 		stopJournal(); 
@@ -671,13 +693,13 @@
 		List<String> files2 = fileFactory.listFiles(fileExtension);
 		
 		// 1 file for nextOpenedFile
-		assertEquals(3, files2.size());
+		assertEquals(4, files2.size());
 		assertEquals(1, journal.getOpenedFilesCount());
 		
 		//1 gets deleted and 1 gets reclaimed
 		
-		assertEquals(0, journal.getDataFilesCount());
-		assertEquals(1, journal.getFreeFilesCount());
+		assertEquals(2, journal.getDataFilesCount());
+		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(0, journal.getIDMapSize());
 		
 		stopJournal();
@@ -911,13 +933,13 @@
 		//Most Should now be reclaimed - leaving 10 left in total
 		
 		assertEquals(journal.getAlignment()==1?1:2, journal.getDataFilesCount());
-		assertEquals(journal.getAlignment()==1?8:7, journal.getFreeFilesCount());
+		assertEquals(journal.getAlignment()==1?7:6, journal.getFreeFilesCount());
 		assertEquals(10, journal.getIDMapSize());
 		
 		List<String> files10 = fileFactory.listFiles(fileExtension);
 		
 		// The journal will aways keep one file opened (even if there are no more files on freeFiles)
-		assertEquals(11, files10.size());   
+		assertEquals(10, files10.size());   
 		assertEquals(1, journal.getOpenedFilesCount());
 	}
 	
@@ -1531,14 +1553,15 @@
 		
 		checkAndReclaimFiles();
 		
+		
 		List<String> files6 = fileFactory.listFiles(fileExtension);
 		
 		// files 0 and 1 should be deleted
 		
-		assertEquals(3, files6.size());
+		assertEquals(journal.getAlignment()==1?2:3, files6.size());
 		
-		assertEquals(0, journal.getDataFilesCount());
-		assertEquals(1, journal.getFreeFilesCount());
+		assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+		assertEquals(0, journal.getFreeFilesCount());
       assertEquals(1, journal.getOpenedFilesCount());
 		assertEquals(1, journal.getIDMapSize());  
 		
@@ -1551,10 +1574,10 @@
 		
 		List<String> files7 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(3, files7.size());
+		assertEquals(journal.getAlignment()==1?2:3, files7.size());
 		
-		assertEquals(0, journal.getDataFilesCount());
-		assertEquals(1, journal.getFreeFilesCount());
+		assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+		assertEquals(0, journal.getFreeFilesCount());
       assertEquals(1, journal.getOpenedFilesCount());
 		assertEquals(1, journal.getIDMapSize());
 	}
@@ -1688,7 +1711,7 @@
 	
 	public void testPrepareReclaim() throws Exception
 	{
-		setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
+		setup(2, 100 * 1024, true);
 		createJournal();
 		startJournal();
 		load();
@@ -1714,17 +1737,21 @@
 		assertEquals(0, journal.getIDMapSize());
 		
 		//Make sure we move on to the next file
-		
+
+      journal.forceMoveNextFile();
+      
+      journal.debugWait();
+      
 		addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2);               // in file 1
 		
 		List<String> files2 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength) , journal.getDataFilesCount());
+		assertEquals(3 , files2.size());
 		
-		assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+		assertEquals(1, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(1, journal.getIDMapSize());
-		
+      
 		prepare(1);          // in file 1
 		
 		List<String> files3 = fileFactory.listFiles(fileExtension);
@@ -1740,23 +1767,19 @@
 		
 		List<String> files4 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 
-				2, recordLength,
-				1, JournalImpl.SIZE_PREPARE_RECORD,
-				1, JournalImpl.SIZE_DELETE_RECORD) + 2 , files4.size());
+		assertEquals(3, files4.size());
 		
 		assertEquals(1, journal.getOpenedFilesCount());
 		
-		assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 
-				2, recordLength,
-				1, JournalImpl.SIZE_PREPARE_RECORD,
-				1, JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+		assertEquals(1, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(0, journal.getIDMapSize());
 		
 		//Move on to another file
 		
-		addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,3);                // in file 2
+      journal.forceMoveNextFile();
+
+      addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,3);                // in file 2
 		
 		checkAndReclaimFiles();
 		
@@ -1779,6 +1802,8 @@
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(1, journal.getIDMapSize());
       assertEquals(1, journal.getOpenedFilesCount());
+      
+      journal.forceMoveNextFile();
 		
 		addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,4);    // in file 3
 		
@@ -1806,9 +1831,9 @@
 		
 		List<String> files9 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(journal.getAlignment()==1?5:6, files9.size());
+		assertEquals(5, files9.size());
 		
-		assertEquals(journal.getAlignment()==1?3:4, journal.getDataFilesCount());
+		assertEquals(3, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
       assertEquals(1, journal.getOpenedFilesCount());
 		assertEquals(2, journal.getIDMapSize());
@@ -1817,19 +1842,21 @@
 		
 		List<String> files10 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(journal.getAlignment()==1?5:4, files10.size());
+		assertEquals(journal.getAlignment()==1?5:5, files10.size());
 		
-		assertEquals(journal.getAlignment()==1?3:2, journal.getDataFilesCount());
+		assertEquals(journal.getAlignment()==1?3:3, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(2, journal.getIDMapSize());
 		
+      journal.forceMoveNextFile();
+      
 		addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,5);       // in file 4
 		
 		List<String> files11 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(journal.getAlignment()==1?6:4, files11.size());
+		assertEquals(6, files11.size());
 		
-		assertEquals(journal.getAlignment()==1?4:2, journal.getDataFilesCount());
+		assertEquals(4, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
       assertEquals(1, journal.getOpenedFilesCount());
 		assertEquals(3, journal.getIDMapSize());
@@ -1859,11 +1886,11 @@
 		
 		List<String> files13 = fileFactory.listFiles(fileExtension);
 		
-		assertEquals(journal.getAlignment()==1?4:5, files13.size());
+		assertEquals(4, files13.size());
 		
 		assertEquals(1, journal.getOpenedFilesCount());
 		
-		assertEquals(journal.getAlignment()==1?2:3, journal.getDataFilesCount());
+		assertEquals(2, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(2, journal.getIDMapSize());
 		
@@ -1873,10 +1900,10 @@
 		
 		log.debug("Debug journal on testPrepareReclaim ->\n" + debugJournal());
 		
-		assertEquals(5, files14.size());
+		assertEquals(4, files14.size());
 		
 		assertEquals(1, journal.getOpenedFilesCount());
-		assertEquals(3, journal.getDataFilesCount());
+		assertEquals(2, journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(3, journal.getIDMapSize());
 		

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -38,34 +38,75 @@
  * A FakeSequentialFileFactory
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public class FakeSequentialFileFactory implements SequentialFileFactory
 {
    private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
       
-   private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+   // Constants -----------------------------------------------------
    
+   // Attributes ----------------------------------------------------
+
+   private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+   
+   private final int alignment;
+   
+   private final boolean supportsCallback; 
+   
+   private final boolean holdCallbacks;
+   
+   private final List<Runnable> callbacksInHold;
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback, final boolean holdCallback)
+   {
+      this.alignment = alignment;
+      this.supportsCallback = supportsCallback;
+      this.holdCallbacks = holdCallback;
+      if (holdCallbacks)
+      {
+         callbacksInHold = new ArrayList<Runnable>();
+      }
+      else
+      {
+         callbacksInHold = null;
+      }
+   }
+
+   public FakeSequentialFileFactory()
+   {
+      this(1, false, false);
+   }
+
+   
+   
+   // Public --------------------------------------------------------
+   
    public SequentialFile createSequentialFile(final String fileName, final int maxAIO, final long timeout) throws Exception
    {
       FakeSequentialFile sf = fileMap.get(fileName);
       
       if (sf == null)
       {                 
-         sf = new FakeSequentialFile(fileName);
+         sf = newSequentialFile(fileName);
          
          fileMap.put(fileName, sf);
       }
       else
-      {     
-         sf.data.position(0);
+      { 
+          sf.getData().position(0);
          
          //log.debug("positioning data to 0");
       }
                   
       return sf;
    }
-   
+
    public List<String> listFiles(final String extension)
    {
       List<String> files = new ArrayList<String>();
@@ -93,26 +134,72 @@
    
    public boolean isSupportsCallbacks()
    {
-      return false;
+      return supportsCallback;
    }
    
    public ByteBuffer newBuffer(int size)
    {
-      return ByteBuffer.allocate(size);
+      if (size % alignment != 0)
+      {
+         size = (size / alignment + 1) * alignment;
+      }
+      return ByteBuffer.allocateDirect(size);
    }
-
+   
    public ByteBuffer wrapBuffer(byte[] bytes)
    {
       return ByteBuffer.wrap(bytes);
    }
+   
+   public void flushAllCallbacks()
+   {
+      for (Runnable action : callbacksInHold)
+      {
+         action.run();
+      }
+      
+      callbacksInHold.clear();
+   }
 
+   public void flushCallback(int position)
+   {
+      Runnable run = callbacksInHold.get(position);
+      run.run();
+      callbacksInHold.remove(run);
+   }
+   
+   public int getNumberOfCallbacks()
+   {
+      return callbacksInHold.size();
+   }
+   
+   public int getAlignment()
+   {
+      return alignment;
+   }
+   
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+
+   protected FakeSequentialFile newSequentialFile(final String fileName)
+   {
+      return new FakeSequentialFile(fileName);
+   }
+   
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
    public class FakeSequentialFile implements SequentialFile
    {
       private volatile boolean open;
       
       private final String fileName;
       
-      private volatile ByteBuffer data;
+      private ByteBuffer data;
       
       public ByteBuffer getData()
       {
@@ -161,7 +248,7 @@
          open = true;
       }
 
-      public void fill(int pos, int size, byte fillCharacter) throws Exception
+      public void fill(final int pos, final int size, final byte fillCharacter) throws Exception
       {     
          if (!open)
          {
@@ -180,24 +267,20 @@
          }                 
       }
       
-      public int read(ByteBuffer bytes) throws Exception
+      public int read(final ByteBuffer bytes) throws Exception
       {
          return read(bytes, null);
       }
       
-      public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+      public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
       {
          if (!open)
          {
             throw new IllegalStateException("Is closed");
          }
          
-         //log.debug("read called " + bytes.array().length);
+         byte[] bytesRead = new byte[bytes.limit()];
          
-         byte[] bytesRead = new byte[bytes.array().length];
-         
-         //log.debug("reading, data pos is " + data.position() + " data size is " + data.array().length);
-         
          data.get(bytesRead);
          
          bytes.put(bytesRead);
@@ -209,45 +292,65 @@
          return bytesRead.length;
       }
 
-      public void position(int pos) throws Exception
+      public void position(final int pos) throws Exception
       {
          if (!open)
          {
             throw new IllegalStateException("Is closed");
          }
          
-         //log.debug("reset called");
+         checkAlignment(pos);
          
          data.position(pos);
       }
 
-      public int write(ByteBuffer bytes, IOCallback callback) throws Exception
+      public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
       {
          if (!open)
          {
             throw new IllegalStateException("Is closed");
          }
          
-         int position = data == null ? 0 : data.position();
+         final int position = data == null ? 0 : data.position();
          
+         checkAlignment(position);
+         
+         checkAlignment(bytes.limit());
+         
          checkAndResize(bytes.capacity() + position);
          
-         //log.debug("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+         Runnable action = new Runnable()
+         {
+
+            public void run()
+            {
+               
+               data.put(bytes);
+               
+               if (callback!=null) callback.done();
+            }
+            
+         };
          
-         data.put(bytes);
+         if (holdCallbacks && callback != null)
+         {
+            FakeSequentialFileFactory.this.callbacksInHold.add(action);
+         }
+         else
+         {
+            action.run();
+         }
          
-         if (callback!=null) callback.done();
+         return bytes.limit();
          
-         return bytes.array().length;
-         
       }
       
-      public int write(ByteBuffer bytes, boolean sync) throws Exception
+      public int write(final ByteBuffer bytes, final boolean sync) throws Exception
       {
          return write(bytes, null);
       }
       
-      private void checkAndResize(int size)
+      private void checkAndResize(final int size)
       {
          int oldpos = data == null ? 0 : data.position();
          
@@ -268,18 +371,28 @@
 
       public int getAlignment() throws Exception
       {
-         return 1;
+         return alignment;
       }
 
-      public int calculateBlockStart(int position) throws Exception
+      public int calculateBlockStart(final int position) throws Exception
       {
-         return position;
+         int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+         
+         return pos;
       }
       
       public String toString()
       {
          return "FakeSequentialFile:" + this.fileName;
       }
+      
+      private void checkAlignment (final int position)
+      {
+         if ( position % alignment != 0)
+         {
+            throw new IllegalStateException("Position is not aligned to " + alignment);
+         }
+      }
 
 
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-07-12 01:14:15 UTC (rev 4677)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.Journal;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
@@ -96,7 +97,7 @@
       bb.putLong(queueID);      
       bb.putLong(messageID);
       
-      messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), EasyMock.aryEq(record));  
+      messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));  
       EasyMock.replay(messageJournal, bindingsJournal);      
       jsm.storeAcknowledge(queueID, messageID);     
       EasyMock.verify(messageJournal, bindingsJournal);
@@ -147,7 +148,7 @@
       bb.putLong(messageID);
       
       final long txID = 12091921;
-      messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), EasyMock.aryEq(record));  
+      messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));  
       EasyMock.replay(messageJournal, bindingsJournal);      
       jsm.storeAcknowledgeTransactional(txID, queueID, messageID);     
       EasyMock.verify(messageJournal, bindingsJournal);
@@ -820,6 +821,53 @@
       assertEquals(1, bindingsJournal.getAIOTimeout());
    }
    
+   private EncodingSupport encodingMatch(final byte expectedRecord[])
+   {
+      
+      EasyMock.reportMatcher(new IArgumentMatcher()
+      {
+
+         public void appendTo(StringBuffer buffer)
+         {
+         }
+
+         public boolean matches(Object argument)
+         {
+            EncodingSupport support = (EncodingSupport)argument;
+            
+            if (support.getEncodeSize() != expectedRecord.length)
+            {
+               return false;
+            }
+
+            byte newByte[] = new byte[expectedRecord.length];
+            
+            ByteBuffer buffer = ByteBuffer.wrap(newByte);
+            
+            ByteBufferWrapper wrapper = new ByteBufferWrapper(buffer);
+            
+            support.encode(wrapper);
+            
+            byte encodingBytes[] = wrapper.array();
+            
+            for (int i = 0; i < encodingBytes.length; i++)
+            {
+               if (encodingBytes[i] != expectedRecord[i])
+               {
+                  return false;
+               }
+            }
+            
+            return true;
+         }
+         
+      });
+      
+      return null;
+   }
+   
+
+   
    public static ServerMessage eqServerMessage(ServerMessage serverMessage)
    {
       EasyMock.reportMatcher(new ServerMessageMatcher(serverMessage));




More information about the jboss-cvs-commits mailing list