[jboss-cvs] JBoss Messaging SVN: r4244 - trunk/src/main/org/jboss/messaging/core/remoting/impl/mina.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 19 20:17:54 EDT 2008
Author: trustin
Date: 2008-05-19 20:17:53 -0400 (Mon, 19 May 2008)
New Revision: 4244
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
Log:
MinaHandler.blocked should be managed on per-session basis rather than per-handler basis.
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-19 21:11:54 UTC (rev 4243)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-20 00:17:53 UTC (rev 4244)
@@ -11,24 +11,23 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import org.apache.mina.common.AttributeKey;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.util.OrderedExecutorFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public class MinaHandler extends IoHandlerAdapter implements
PacketHandlerRegistrationListener
@@ -37,13 +36,15 @@
private static final Logger log = Logger.getLogger(MinaHandler.class);
+ private static final AttributeKey BLOCKED = new AttributeKey(MinaHandler.class, "blocked");
+
private static boolean trace = log.isTraceEnabled();
-
+
// Attributes ----------------------------------------------------
private final PacketDispatcher dispatcher;
- private CleanUpNotifier failureNotifier;
+ private final CleanUpNotifier failureNotifier;
private final boolean closeSessionOnExceptionCaught;
@@ -51,17 +52,15 @@
// Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-
- private boolean blocked;
-
+
private final long blockTimeout;
-
+
//TODO - this is screwed - I want this to be zero, but unfortunately in messageSent, the current
- //messages byts haven't been substracted so this won't work!!
+ //messages bytes haven't been subtracted so this won't work!!
private final long bytesLow;
-
+
private final long bytesHigh;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -86,11 +85,11 @@
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
if (useExecutor)
{
- this.executorFactory = new OrderedExecutorFactory(executorService);
+ executorFactory = new OrderedExecutorFactory(executorService);
}
else
{
- this.executorFactory = null;
+ executorFactory = null;
}
this.dispatcher.setListener(this);
}
@@ -112,6 +111,12 @@
// IoHandlerAdapter overrides ------------------------------------
@Override
+ public void sessionCreated(IoSession session) throws Exception {
+ // Initialize the default attributes.
+ session.setAttribute(BLOCKED, Boolean.FALSE);
+ }
+
+ @Override
public void exceptionCaught(final IoSession session, final Throwable cause)
throws Exception
{
@@ -136,25 +141,25 @@
throws Exception
{
final Packet packet = (Packet) message;
-
+
if (executorFactory != null)
- {
+ {
long executorID = packet.getExecutorID();
-
+
Executor executor = executors.get(executorID);
if (executor == null)
{
executor = executorFactory.getOrderedExecutor();
-
+
Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-
+
if (oldExecutor != null)
{
//Avoid race
executor = oldExecutor;
}
}
-
+
executor.execute(new Runnable()
{
public void run()
@@ -178,54 +183,55 @@
@Override
public synchronized void messageSent(final IoSession session, final Object message) throws Exception
- {
+ {
+ boolean blocked = (Boolean) session.getAttribute(BLOCKED);
if (blocked)
{
long bytes = session.getScheduledWriteBytes();
-
+
if (bytes <= bytesLow)
{
- blocked = false;
-
+ session.setAttribute(BLOCKED, Boolean.FALSE);
+
//Note that we need to notify all since there may be more than one thread waiting on this
//E.g. the response from a blocking acknowledge and a delivery
- notifyAll();
+ notifyAll();
}
}
}
-
+
public synchronized void checkWrite(final IoSession session) throws Exception
{
while (session.getScheduledWriteBytes() >= bytesHigh)
{
- blocked = true;
-
+ session.setAttribute(BLOCKED, Boolean.TRUE);
+
long start = System.currentTimeMillis();
-
+
long toWait = blockTimeout;
-
+
do
{
wait(toWait);
-
+
if (session.getScheduledWriteBytes() < bytesHigh)
{
break;
}
-
+
long now = System.currentTimeMillis();
-
+
toWait -= now - start;
-
+
start = now;
}
while (toWait > 0);
-
+
if (toWait <= 0)
{
throw new IllegalStateException("Timed out waiting for MINA queue to free");
}
- }
+ }
}
// Package protected ---------------------------------------------
@@ -239,7 +245,7 @@
{
PacketReturner returner;
- if (packet.getResponseTargetID() != EmptyPacket.NO_ID_SET)
+ if (packet.getResponseTargetID() != Packet.NO_ID_SET)
{
returner = new PacketReturner()
{
@@ -253,7 +259,7 @@
{
log.error("Failed to acquire sem", e);
}
-
+
dispatcher.callFilters(p);
session.write(p);
@@ -275,7 +281,9 @@
returner = null;
}
- if (trace) log.trace("received packet " + packet);
+ if (trace) {
+ log.trace("received packet " + packet);
+ }
dispatcher.dispatch(packet, returner);
}
More information about the jboss-cvs-commits
mailing list