[jboss-cvs] JBoss Messaging SVN: r7669 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 6 04:44:47 EDT 2009
Author: ataylor
Date: 2009-08-06 04:44:47 -0400 (Thu, 06 Aug 2009)
New Revision: 7669
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1693 - fix
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-08-05 15:15:54 UTC (rev 7668)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-08-06 08:44:47 UTC (rev 7669)
@@ -22,9 +22,20 @@
package org.jboss.messaging.core.remoting.impl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -36,25 +47,10 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
/**
* A ChannelImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
*/
public class ChannelImpl implements Channel
{
@@ -114,7 +110,7 @@
this.windowSize = windowSize;
- this.confWindowSize = (int)(0.75 * windowSize);
+ this.confWindowSize = (int) (0.75 * windowSize);
if (this.windowSize != -1)
{
@@ -186,22 +182,10 @@
packet.setChannelID(id);
final MessagingBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ .createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
- // Must block on semaphore outside the main lock or this can prevent failover from occurring
- if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Semaphore interrupted");
- }
- }
lock.lock();
@@ -233,6 +217,20 @@
{
lock.unlock();
}
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
+ // packet is sent to assure we get some credits back
+ if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+
}
}
@@ -255,23 +253,10 @@
packet.setChannelID(id);
final MessagingBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ .createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
- // Must block on semaphore outside the main lock or this can prevent failover from occurring
- if (sendSemaphore != null)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Semaphore interrupted");
- }
- }
-
lock.lock();
try
@@ -331,19 +316,29 @@
if (response.getType() == PacketImpl.EXCEPTION)
{
- final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+ final MessagingExceptionMessage mem = (MessagingExceptionMessage) response;
throw mem.getException();
}
- else
- {
- return response;
- }
}
finally
{
lock.unlock();
}
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
+ // packet is sent to assure we get some credits back
+ if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+ return response;
}
}
@@ -373,7 +368,7 @@
}
final MessagingBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ .createBuffer(packet.getRequiredBufferSize());
packet.encode(buffer);
@@ -388,7 +383,7 @@
action.run();
}
}
-
+
public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
{
this.commandConfirmationHandler = handler;
@@ -488,7 +483,7 @@
// And switch it
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+ final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl) newConnection;
rnewConnection.putChannel(newChannelID, this);
@@ -549,28 +544,54 @@
}
}
- public void confirm(final Packet packet)
+ public synchronized void confirm(final Packet packet)
{
- if (resendCache != null && packet.isRequiresConfirmations())
+ if (packet.getType() == PacketImpl.SESS_ACKNOWLEDGE || packet.getType() == PacketImpl.CREATESESSION || packet.getType() == PacketImpl.SESS_CREATECONSUMER
+ || packet.getType() == PacketImpl.SESS_START || packet.getType() == PacketImpl.PING || packet.getType() == PacketImpl.SESS_FLOWTOKEN)
{
- lastReceivedCommandID++;
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ lastReceivedCommandID++;
- receivedBytes += packet.getPacketSize();
+ receivedBytes += packet.getPacketSize();
+ if (receivedBytes >= confWindowSize)
+ {
+ receivedBytes = 0;
- if (receivedBytes >= confWindowSize)
+ if (connection.isActive())
+ {
+ final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+ confirmed.setChannelID(id);
+
+ doWrite(confirmed);
+ }
+ }
+ }
+ }
+ else
+ {
+ if (resendCache != null && packet.isRequiresConfirmations())
{
- receivedBytes = 0;
+ lastReceivedCommandID++;
- if (connection.isActive())
+ receivedBytes += packet.getPacketSize();
+ if (receivedBytes >= confWindowSize)
{
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ receivedBytes = 0;
- confirmed.setChannelID(id);
+ if (connection.isActive())
+ {
+ final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ confirmed.setChannelID(id);
- doWrite(confirmed);
+ doWrite(confirmed);
+ }
}
}
+
}
+
}
public void handlePacket(final Packet packet)
@@ -579,7 +600,7 @@
{
if (resendCache != null)
{
- final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+ final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet;
clearUpTo(msg.getCommandID());
}
@@ -624,7 +645,7 @@
replicateComplete();
}
-
+
public void waitForAllReplicationResponse()
{
synchronized (replicationLock)
@@ -675,6 +696,7 @@
// TODO it's not ideal synchronizing this since it forms a contention point with replication
// but we need to do this to protect it w.r.t. the check on replicatingChannel
+
private void replicateResponseReceived()
{
Runnable result = null;
@@ -711,7 +733,7 @@
}
}
}
-
+
private void doWrite(final Packet packet)
{
final MessagingBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java 2009-08-05 15:15:54 UTC (rev 7668)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java 2009-08-06 08:44:47 UTC (rev 7669)
@@ -26,12 +26,21 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -222,7 +231,7 @@
((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
}
- /*public void testAcksWithSmallSendWindow() throws Exception
+ public void testAcksWithSmallSendWindow() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -232,26 +241,48 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 1000;
+ final int numMessages = 10000;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
- System.out.println("-----------------------------------------------------------------------------");
+ session.close();
+ sf.close();
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ server.getRemotingService().addInterceptor(new Interceptor()
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws MessagingException
+ {
+ if(packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
+ {
+ latch.countDown();
+ }
+ return true;
+ }
+ });
ClientSessionFactory sfReceive = createInVMFactory();
sfReceive.setProducerWindowSize(100);
sfReceive.setAckBatchSize(-1);
ClientSession sessionRec = sfReceive.createSession(false, true, true);
ClientConsumer consumer = sessionRec.createConsumer(QUEUE);
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
sessionRec.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
- System.out.println("message2 = " + message2);
- message2.acknowledge();
- }
- }*/
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java 2009-08-06 08:44:47 UTC (rev 7669)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ProducerTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
+ private MessagingServer server;
+
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ public void testProducerWithSmallWindowSizeAndLargeMessage() throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ server.getRemotingService().addInterceptor(new Interceptor()
+ {
+ public boolean intercept(Packet packet, RemotingConnection connection) throws MessagingException
+ {
+ if(packet.getType() == PacketImpl.SESS_SEND)
+ {
+ latch.countDown();
+ }
+ return true;
+ }
+ });
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setProducerWindowSize(100);
+ ClientSession session = cf.createSession(false, true, true);
+ ClientProducer producer = session.createProducer(QUEUE);
+ ClientMessage message = session.createClientMessage(true);
+ byte[] body = new byte[1000];
+ message.getBody().writeBytes(body);
+ producer.send(message);
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+}
More information about the jboss-cvs-commits
mailing list