[jboss-cvs] JBoss Messaging SVN: r4230 - in trunk/src: main/org/jboss/messaging/core/client/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun May 18 09:34:40 EDT 2008
Author: timfox
Date: 2008-05-18 09:34:39 -0400 (Sun, 18 May 2008)
New Revision: 4230
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
Log:
Various tweaks and reverted OrderedExecutorFactory change
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/config/jbm-configuration.xml 2008-05-18 13:34:39 UTC (rev 4230)
@@ -44,9 +44,9 @@
<remoting-writequeue-block-timeout>10000</remoting-writequeue-block-timeout>
- <remoting-writequeue-minbytes>32768</remoting-writequeue-minbytes>
+ <remoting-writequeue-minbytes>4096</remoting-writequeue-minbytes>
- <remoting-writequeue-maxbytes>65536</remoting-writequeue-maxbytes>
+ <remoting-writequeue-maxbytes>8192</remoting-writequeue-maxbytes>
<!-- if ssl is enabled, all remoting-ssl-* properties must be set -->
<remoting-enable-ssl>false</remoting-enable-ssl>
@@ -69,7 +69,7 @@
<create-journal-dir>true</create-journal-dir>
- <journal-type>asyncio</journal-type>
+ <journal-type>nio</journal-type>
<journal-sync>true</journal-sync>
@@ -84,4 +84,4 @@
</configuration>
-</deployment>
\ No newline at end of file
+</deployment>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -835,7 +835,7 @@
remotingConnection.sendBlocking(serverTargetID, serverTargetID, message);
}
else
- {
+ {
remotingConnection.sendOneWay(serverTargetID, serverTargetID, message);
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -123,9 +123,11 @@
if (config.getJournalType() == JournalType.ASYNCIO)
{
+ log.info("AIO journal selected");
if (!AIOSequentialFileFactory.isSupported())
{
- log.warn("AIO wasn't located on this platform, using just standard Java NIO. If you are on Linux, install LibAIO and the required wrapper and you will get a lot of performance benefit");
+ log.warn("AIO wasn't located on this platform, will fall back to Java NIO." +
+ "If you are on Linux, install LibAIO to enable the AIO journal");
journalFF = new NIOSequentialFileFactory(journalDir);
}
else
@@ -136,10 +138,12 @@
}
else if (config.getJournalType() == JournalType.NIO)
{
+ log.info("NIO Journal selected");
journalFF = new NIOSequentialFileFactory(bindingsDir);
}
else if (config.getJournalType() == JournalType.JDBC)
{
+ log.info("JDBC Journal selected");
// Sanity check only... this is previously tested
throw new IllegalArgumentException("JDBC Journal is not supported yet");
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -25,12 +25,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionAttributeMap;
import org.apache.mina.common.IoSessionDataStructureFactory;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestQueue;
+import org.apache.mina.util.CircularQueue;
/**
*
@@ -134,34 +137,34 @@
}
-// private static class DefaultWriteRequestQueue implements WriteRequestQueue
-// {
-// private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
-//
-// public void dispose(IoSession session) {
-// }
-//
-// public void clear(IoSession session) {
-// q.clear();
-// }
-//
-// public synchronized boolean isEmpty(IoSession session) {
-// return q.isEmpty();
-// }
-//
-// public synchronized void offer(IoSession session, WriteRequest writeRequest) {
-// q.offer(writeRequest);
-// }
-//
-// public synchronized WriteRequest poll(IoSession session) {
-// return q.poll();
-// }
-//
-// @Override
-// public String toString() {
-// return q.toString();
-// }
-// }
+ private static class DefaultWriteRequestQueue implements WriteRequestQueue
+ {
+ private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
+
+ public void dispose(IoSession session) {
+ }
+
+ public void clear(IoSession session) {
+ q.clear();
+ }
+
+ public synchronized boolean isEmpty(IoSession session) {
+ return q.isEmpty();
+ }
+
+ public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+ q.offer(writeRequest);
+ }
+
+ public synchronized WriteRequest poll(IoSession session) {
+ return q.poll();
+ }
+
+ @Override
+ public String toString() {
+ return q.toString();
+ }
+ }
private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
{
@@ -191,5 +194,55 @@
return q.toString();
}
}
+
+ private static class SynchronousWriteRequestQueue implements WriteRequestQueue
+ {
+ private final LinkedBlockingQueue<WriteRequest> q = new LinkedBlockingQueue<WriteRequest>(1);
+
+ public void dispose(IoSession session) {
+ }
+
+ public void clear(IoSession session) {
+ q.clear();
+ }
+ public synchronized boolean isEmpty(IoSession session) {
+ return q.isEmpty();
+ }
+
+ public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+ try
+ {
+ boolean ok = q.offer(writeRequest, 5000L, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new IllegalStateException("Timed out trying to offer to queue");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Failed to offer");
+ }
+ }
+
+ public synchronized WriteRequest poll(IoSession session) {
+ try
+ {
+ WriteRequest request = q.poll(5000L, TimeUnit.MILLISECONDS);
+
+ return request;
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Failed to offer");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return q.toString();
+ }
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -51,7 +51,7 @@
// Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
- private volatile boolean blocked;
+ private boolean blocked;
private final long blockTimeout;
@@ -133,10 +133,9 @@
throws Exception
{
final Packet packet = (Packet) message;
-
+
if (executorFactory != null)
- {
-
+ {
long executorID = packet.getExecutorID();
Executor executor = executors.get(executorID);
@@ -152,7 +151,7 @@
executor = oldExecutor;
}
}
-
+
executor.execute(new Runnable()
{
public void run()
@@ -160,7 +159,8 @@
try
{
messageReceivedInternal(session, packet);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
log.error("unexpected error", e);
}
@@ -173,42 +173,51 @@
}
}
+ private final int maxSize = 2;
+
+ private int size;
+
@Override
- public void messageSent(final IoSession session, final Object message) throws Exception
+ public synchronized void messageSent(final IoSession session, final Object message) throws Exception
{
- if (blocked)
+// if (blocked)
+// {
+// long bytes = session.getScheduledWriteBytes();
+//
+// if (bytes <= bytesLow)
+// {
+// blocked = false;
+//
+// //Note that we need to notify all since there may be more than one thread waiting on this
+// //E.g. the response from a blocking acknowledge and a delivery
+// notifyAll();
+// }
+// }
+
+ size--;
+
+ if (blocked && size == 0)
{
- long bytes = session.getScheduledWriteBytes();
-
- if (bytes <= bytesLow)
- {
- blocked = false;
-
- synchronized (this)
- {
- notify();
- }
- }
+ notifyAll();
}
}
- public void checkWrite(final IoSession session) throws Exception
+ public synchronized void checkWrite(final IoSession session) throws Exception
{
- if (session.getScheduledWriteBytes() >= bytesHigh)
+// if (session.getScheduledWriteBytes() >= bytesHigh)
+// {
+// blocked = true;
+//
+// wait();
+// }
+ if (size == maxSize)
{
blocked = true;
-
- synchronized (this)
- {
- wait(blockTimeout);
- }
- if (session.getScheduledWriteBytes() >= bytesHigh)
- {
- //TODO should really cope with spurious wakeups
- throw new IllegalStateException("Timed out waiting for MINA write queue to free up");
- }
- }
+ wait();
+ }
+
+ size++;
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -21,7 +21,7 @@
// Attributes ----------------------------------------------------
private int tokens;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -208,7 +208,7 @@
{
// We delivered all the messages - go into direct delivery
direct = true;
-
+
promptDelivery = false;
}
return;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -273,12 +273,12 @@
}
public void receiveTokens(final int tokens) throws Exception
- {
+ {
if (availableTokens != null)
{
int previous = availableTokens.getAndAdd(tokens);
-
- if (previous <= 0)
+
+ if (previous <= 0 && (previous + tokens) > 0)
{
promptDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.server.impl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
@@ -38,6 +39,8 @@
*/
public class ServerConsumerPacketHandler extends ServerPacketHandlerSupport
{
+ private static final Logger log = Logger.getLogger(ServerConsumerPacketHandler.class);
+
private final ServerConsumer consumer;
public ServerConsumerPacketHandler(final ServerConsumer consumer)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.XAException;
@@ -228,6 +229,7 @@
public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
Delivery delivery;
+
synchronized (rollbackCancelLock)
{
long nextID = deliveryIDSequence.getAndIncrement();
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -399,7 +399,7 @@
ackBatchSize = 1;
- blockOnAcknowledge = true;
+ blockOnAcknowledge = false;
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-05-18 00:35:55 UTC (rev 4229)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-05-18 13:34:39 UTC (rev 4230)
@@ -8,10 +8,9 @@
import java.util.Collections;
import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* This factory creates a hierarchy of Executor which shares the threads of the
@@ -40,14 +39,17 @@
private final class ChildExecutor implements Executor, Runnable
{
- private final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
-
+ private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
public void execute(Runnable command)
{
- tasks.offer(command);
- if (tasks.size() == 1 && runningChildren.add(this))
+ synchronized (tasks)
{
- parent.execute(this);
+ tasks.add(command);
+ if (tasks.size() == 1 && runningChildren.add(this))
+ {
+ parent.execute(this);
+ }
}
}
@@ -55,11 +57,15 @@
{
for (;;)
{
- final Runnable task = tasks.poll();
- if (task == null)
+ final Runnable task;
+ synchronized (tasks)
{
- runningChildren.remove(this);
- return;
+ task = tasks.poll();
+ if (task == null)
+ {
+ runningChildren.remove(this);
+ return;
+ }
}
task.run();
}
More information about the jboss-cvs-commits
mailing list