[jboss-cvs] JBoss Messaging SVN: r5125 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 16 17:42:43 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-16 17:42:42 -0400 (Thu, 16 Oct 2008)
New Revision: 5125
Removed:
branches/Branch_Chunk_Clebert/jbm-large-messages/
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Log:
Bug fixes and few tests
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -208,7 +208,7 @@
FileClientMessageImpl message = new FileClientMessageImpl();
message.decodeProperties(propertiesBuffer);
- message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.getID() + ".jbm"));
+ message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
return message;
}
else
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -401,9 +401,16 @@
headerBuffer.array(),
bodyBuffer.array(),
bodyLength < bodySize,
- true);
+ sendBlocking);
- channel.sendBlocking(chunk);
+ if (sendBlocking)
+ {
+ channel.sendBlocking(chunk);
+ }
+ else
+ {
+ channel.send(chunk);
+ }
for (int pos = bodyLength; pos < bodySize;)
{
@@ -412,7 +419,7 @@
msg.encodeBody(bodyBuffer, pos, bodyLength);
- chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+ chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, sendBlocking);
if (sendBlocking)
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -715,7 +715,12 @@
public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
{
ClientConsumerInternal consumer = consumers.get(consumerID);
-
+
+ if (consumer == null)
+ {
+ throw new IllegalStateException("No Consumer with ID = " + consumerID);
+ }
+
return consumer.createLargeMessage(header);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -86,7 +86,6 @@
}
case SESS_CHUNK_SEND:
{
- System.out.println("received a chunk");
SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
ClientMessage currentChunkMessage = null;
@@ -115,7 +114,7 @@
{
ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
- currentChunkMessage = currentChunk.get(chunk.getMessageID());
+ currentChunkMessage = currentChunk.get(chunk.getTargetID());
if (currentChunkMessage instanceof FileClientMessage)
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -155,7 +155,7 @@
@Override
public void decodeBody(final MessagingBuffer buffer)
{
- buffer.putLong(targetID);
+ targetID = buffer.getLong();
final int headerLength = buffer.getInt();
@@ -164,6 +164,10 @@
header = new byte[headerLength];
buffer.getBytes(header);
}
+ else
+ {
+ header = null;
+ }
final int bodyLength = buffer.getInt();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -356,6 +356,15 @@
iterator.remove();
removed = ref;
+
+ try
+ {
+ referenceRemoved(removed);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
break;
}
@@ -423,16 +432,8 @@
public void referenceAcknowledged(MessageReference ref) throws Exception
{
- deliveringCount.decrementAndGet();
+ referenceRemoved(ref);
- sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
-
-
- if (ref.getMessage().decrementRefCount() == 0)
- {
- pagingManager.messageDone(ref.getMessage());
- }
-
// if (flowController != null)
// {
// flowController.messageAcknowledged();
@@ -840,6 +841,24 @@
}
}
+ /**
+ * To be called when a reference is removed from the queue.
+ * @param ref
+ * @throws Exception
+ */
+ private void referenceRemoved(MessageReference ref) throws Exception
+ {
+ deliveringCount.decrementAndGet();
+
+ sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+
+
+ if (ref.getMessage().decrementRefCount() == 0)
+ {
+ pagingManager.messageDone(ref.getMessage());
+ }
+ }
+
// Inner classes
// --------------------------------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -23,6 +23,8 @@
package org.jboss.messaging.tests.integration.chunkmessage;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -35,6 +37,7 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -111,17 +114,152 @@
{
testInternal(true, false, 4, false);
}
-
+
public void testTwoBindingsOneAckAndrestart() throws Exception
{
- // TODO: Write a test where there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... and we must delete the file.
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ clearData();
- /// Play with the scenario over XA also.
+ try
+ {
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ SimpleString queue[] = new SimpleString[] {new SimpleString("queue1"), new SimpleString("queue2") };
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(ADDRESS, queue[0], null, true, false);
+ session.createQueue(ADDRESS, queue[1], null, true, false);
+
+ FileClientMessage clientFile = session.createFileMessage(true);
+
+ File tmpFile = new File(temporaryDir + "/" + "tmpUpload.data");
+
+ RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
+ FileChannel channel = random.getChannel();
+
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+
+ int numberOfIntegers = 10000;
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ buffer.rewind();
+ buffer.putInt(i);
+ buffer.rewind();
+ channel.write(buffer);
+ }
+
+ channel.close();
+ random.close();
+
+ clientFile.setFile(tmpFile);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+ producer.send(clientFile);
+
+ producer.close();
+
+ readMessage(session, queue[0], numberOfIntegers);
+
+ session.close();
+
+ messagingService.stop();
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(false, true, true, false);
+
+ readMessage(session, queue[1], numberOfIntegers);
+
+ session.close();
+
+ File largeMessagesFileDir = new File(largeMessagesDir);
+ assertEquals(0, largeMessagesFileDir.listFiles().length);
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws MessagingException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private void readMessage(ClientSession session, SimpleString queueToRead, int numberOfIntegers) throws MessagingException,
+ FileNotFoundException,
+ IOException
+ {
+ ClientConsumer consumer = session.createConsumer(queueToRead);
- // Validate Message counters
+ consumer.setLargeMessagesAsFiles(true);
+ consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+
+ session.start();
+
+ FileClientMessage clientMessage = (FileClientMessage) consumer.receive(4000);
+
+ assertNotNull(clientMessage);
+ File receivedFile = clientMessage.getFile();
+
+ checkFileRead(receivedFile, numberOfIntegers);
+
+ clientMessage.processed();
+
+ consumer.close();
}
+ /**
+ * @param receivedFile
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private void checkFileRead(File receivedFile, int numberOfIntegers) throws FileNotFoundException, IOException
+ {
+ RandomAccessFile random2 = new RandomAccessFile(receivedFile, "r");
+ FileChannel channel2 = random2.getChannel();
+
+ ByteBuffer buffer2 = ByteBuffer.allocate(4);
+
+ channel2.position(0l);
+
+ for (int i=0;i<numberOfIntegers;i++)
+ {
+ buffer2.rewind();
+ channel2.read(buffer2);
+ buffer2.rewind();
+
+ assertEquals(i, buffer2.getInt());
+ }
+
+
+ channel2.close();
+ }
+
public void testInternal(final boolean realFiles,
final boolean useFile,
final int numberOfIntegers,
@@ -217,7 +355,7 @@
session.start();
ClientMessage message2 = consumer.receive(0);
-
+
message2.processed();
assertNotNull(message2);
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-16 19:15:01 UTC (rev 5124)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-16 21:42:42 UTC (rev 5125)
@@ -160,9 +160,9 @@
do
{
msg = consumer.receive(1000);
- msg.processed();
if (msg != null)
{
+ msg.processed();
if (++msgs % 10000 == 0)
{
System.out.println("received " + msgs);
More information about the jboss-cvs-commits
mailing list