[jboss-cvs] JBoss Messaging SVN: r5125 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 16 17:42:43 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-16 17:42:42 -0400 (Thu, 16 Oct 2008)
New Revision: 5125

Removed:
   branches/Branch_Chunk_Clebert/jbm-large-messages/
Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Log:
Bug fixes and few tests

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -208,7 +208,7 @@
          
          FileClientMessageImpl message = new FileClientMessageImpl();
          message.decodeProperties(propertiesBuffer);
-         message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.getID() + ".jbm"));
+         message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
          return message;
       }
       else

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -401,9 +401,16 @@
                                                                   headerBuffer.array(),
                                                                   bodyBuffer.array(),
                                                                   bodyLength < bodySize,
-                                                                  true);
+                                                                  sendBlocking);
 
-      channel.sendBlocking(chunk);
+      if (sendBlocking)
+      {
+         channel.sendBlocking(chunk);
+      }
+      else
+      {
+         channel.send(chunk);
+      }
 
       for (int pos = bodyLength; pos < bodySize;)
       {
@@ -412,7 +419,7 @@
 
          msg.encodeBody(bodyBuffer, pos, bodyLength);
 
-         chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+         chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, sendBlocking);
 
          if (sendBlocking)
          {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -715,7 +715,12 @@
    public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
    {
       ClientConsumerInternal consumer = consumers.get(consumerID);
-
+      
+      if (consumer == null)
+      {
+         throw new IllegalStateException("No Consumer with ID = " + consumerID);
+      }
+      
       return consumer.createLargeMessage(header);
    }
    

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -86,7 +86,6 @@
             }
             case SESS_CHUNK_SEND:
             {
-               System.out.println("received a chunk");
                SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
 
                ClientMessage currentChunkMessage = null;
@@ -115,7 +114,7 @@
                {
                   ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
 
-                  currentChunkMessage = currentChunk.get(chunk.getMessageID());
+                  currentChunkMessage = currentChunk.get(chunk.getTargetID());
 
                   if (currentChunkMessage instanceof FileClientMessage)
                   {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -155,7 +155,7 @@
    @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
-      buffer.putLong(targetID);
+      targetID = buffer.getLong();
 
       final int headerLength = buffer.getInt();
 
@@ -164,6 +164,10 @@
          header = new byte[headerLength];
          buffer.getBytes(header);
       }
+      else
+      {
+         header = null;
+      }
 
       final int bodyLength = buffer.getInt();
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -356,6 +356,15 @@
             iterator.remove();
 
             removed = ref;
+            
+            try
+            {
+               referenceRemoved(removed);
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
 
             break;
          }
@@ -423,16 +432,8 @@
 
    public void referenceAcknowledged(MessageReference ref) throws Exception
    {
-      deliveringCount.decrementAndGet();
+      referenceRemoved(ref);
 
-      sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
-      
-      
-      if (ref.getMessage().decrementRefCount() == 0)
-      {
-         pagingManager.messageDone(ref.getMessage());
-      }
-
       // if (flowController != null)
       // {
       // flowController.messageAcknowledged();
@@ -840,6 +841,24 @@
       }
    }
 
+   /**
+    * To be called when a reference is removed from the queue.
+    * @param ref
+    * @throws Exception
+    */
+   private void referenceRemoved(MessageReference ref) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+
+      sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+      
+      
+      if (ref.getMessage().decrementRefCount() == 0)
+      {
+         pagingManager.messageDone(ref.getMessage());
+      }
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------
 

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.tests.integration.chunkmessage;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -35,6 +37,7 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -111,17 +114,152 @@
    {
       testInternal(true, false, 4, false);
    }
-   
+
    public void testTwoBindingsOneAckAndrestart() throws Exception
    {
-      // TODO: Write a test where there are two bindings.. one is ACKed, the other is not, the server is restarted
-      //       The other binding is acked... and we must delete the file.
+      // there are two bindings.. one is ACKed, the other is not, the server is restarted
+      // The other binding is acked... The file must be deleted
+
+      clearData();
       
-      ///      Play with the scenario over XA also.
+      try
+      {
+   
+         messagingService = createService(true);
+   
+         messagingService.start();
+   
+         SimpleString queue[] = new SimpleString[] {new SimpleString("queue1"), new SimpleString("queue2") };
+         
+         ClientSessionFactory sf = createInVMFactory();
+   
+         ClientSession session = sf.createSession(false, true, true, false);
+   
+         session.createQueue(ADDRESS, queue[0], null, true, false);
+         session.createQueue(ADDRESS, queue[1], null, true, false);
+   
+         FileClientMessage clientFile = session.createFileMessage(true);
+   
+         File tmpFile = new File(temporaryDir + "/" + "tmpUpload.data");
+   
+         RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
+         FileChannel channel = random.getChannel();
+   
+         ByteBuffer buffer = ByteBuffer.allocate(4);
+         
+         int numberOfIntegers = 10000; 
+   
+         for (int i = 0; i < numberOfIntegers; i++)
+         {
+            buffer.rewind();
+            buffer.putInt(i);
+            buffer.rewind();
+            channel.write(buffer);
+         }
+   
+         channel.close();
+         random.close();
+   
+         clientFile.setFile(tmpFile);
+   
+         ClientProducer producer = session.createProducer(ADDRESS);
+         producer.send(clientFile);
+         
+         producer.close();
+         
+         readMessage(session, queue[0], numberOfIntegers);
+   
+         session.close();
+         
+         messagingService.stop();
+   
+         messagingService = createService(true);
+   
+         messagingService.start();
+         
+         sf = createInVMFactory();
+   
+         session = sf.createSession(false, true, true, false);
+         
+         readMessage(session, queue[1], numberOfIntegers);
+         
+         session.close();
+         
+         File largeMessagesFileDir = new File(largeMessagesDir);
+         assertEquals(0, largeMessagesFileDir.listFiles().length);
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+
+   }
+
+   /**
+    * @param session
+    * @param queueToRead
+    * @param numberOfIntegers
+    * @throws MessagingException
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   private void readMessage(ClientSession session, SimpleString queueToRead, int numberOfIntegers) throws MessagingException,
+                                                                                                  FileNotFoundException,
+                                                                                                  IOException
+   {
+      ClientConsumer consumer = session.createConsumer(queueToRead);
       
-      // Validate Message counters
+      consumer.setLargeMessagesAsFiles(true);
+      consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+      
+      session.start();
+      
+      FileClientMessage clientMessage = (FileClientMessage) consumer.receive(4000);
+      
+      assertNotNull(clientMessage);
+      File receivedFile = clientMessage.getFile();
+      
+      checkFileRead(receivedFile, numberOfIntegers);
+      
+      clientMessage.processed();
+      
+      consumer.close();
    }
 
+   /**
+    * @param receivedFile
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   private void checkFileRead(File receivedFile, int numberOfIntegers) throws FileNotFoundException, IOException
+   {
+      RandomAccessFile random2 = new RandomAccessFile(receivedFile, "r");
+      FileChannel channel2 = random2.getChannel();
+
+      ByteBuffer buffer2 = ByteBuffer.allocate(4);
+      
+      channel2.position(0l);
+      
+      for (int i=0;i<numberOfIntegers;i++)
+      {
+         buffer2.rewind();
+         channel2.read(buffer2);
+         buffer2.rewind();
+         
+         assertEquals(i, buffer2.getInt());
+      }
+      
+      
+      channel2.close();
+   }
+
    public void testInternal(final boolean realFiles,
                             final boolean useFile,
                             final int numberOfIntegers,
@@ -217,7 +355,7 @@
          session.start();
 
          ClientMessage message2 = consumer.receive(0);
-         
+
          message2.processed();
 
          assertNotNull(message2);

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-10-16 21:42:42 UTC (rev 5125)
@@ -160,9 +160,9 @@
       do
       {
          msg = consumer.receive(1000);
-         msg.processed();
          if (msg != null)
          {
+            msg.processed();
             if (++msgs % 10000 == 0)
             {
                System.out.println("received " + msgs);




More information about the jboss-cvs-commits mailing list