[jboss-cvs] JBoss Messaging SVN: r7730 - in trunk: src/main/org/jboss/messaging/core/client/impl and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 14 10:51:59 EDT 2009
Author: timfox
Date: 2009-08-14 10:51:58 -0400 (Fri, 14 Aug 2009)
New Revision: 7730
Added:
trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java
trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
Modified:
trunk/src/config/common/schema/jbm-configuration.xsd
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
multiple changes to pinging
Modified: trunk/src/config/common/schema/jbm-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/jbm-configuration.xsd 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/config/common/schema/jbm-configuration.xsd 2009-08-14 14:51:58 UTC (rev 7730)
@@ -47,6 +47,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl-override" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="async-connection-execution-enabled" type="xsd:boolean">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="transaction-timeout" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="transaction-timeout-scan-period" type="xsd:long">
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -58,7 +58,7 @@
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
- public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 5000;
+ public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
// 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
// or backup without fear of session having already been closed when connection having timed out.
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -51,7 +51,6 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -146,11 +145,13 @@
private volatile boolean closed;
private Set<FailureListener> listeners = new ConcurrentHashSet<FailureListener>();
-
+
private Connector connector;
- private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
+ private Future<?> pingerFuture;
+ private PingRunnable pingRunnable;
+
// debug
private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
@@ -229,7 +230,7 @@
this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
}
-
+
// ConnectionLifeCycleListener implementation --------------------------------------------------
public void connectionCreated(final Connection connection)
@@ -450,7 +451,7 @@
}
public int numSessions()
- {
+ {
return sessions.size();
}
@@ -473,7 +474,7 @@
closed = true;
}
-
+
public void addFailureListener(FailureListener listener)
{
listeners.add(listener);
@@ -484,17 +485,9 @@
return listeners.remove(listener);
}
-
// Public
// ---------------------------------------------------------------------------------------
- public void cancelPingerForConnectionID(final Object connectionID)
- {
- Pinger pinger = pingers.get(connectionID);
-
- pinger.close();
- }
-
@Override
protected void finalize() throws Throwable
{
@@ -504,6 +497,13 @@
super.finalize();
}
+ private volatile boolean stopPingingAfterOne;
+
+ public void stopPingingAfterOne()
+ {
+ this.stopPingingAfterOne = true;
+ }
+
// Protected
// ------------------------------------------------------------------------------------
@@ -610,7 +610,7 @@
oldConnections.add(entry.connection);
}
- closePingers();
+ // closePingers();
connections.clear();
@@ -661,8 +661,8 @@
{
connection.destroy();
}
-
- closeConnectionsAndCallFailureListeners(me);
+
+ closeConnectionsAndCallFailureListeners(me);
}
}
else
@@ -671,24 +671,24 @@
}
}
}
-
+
private void closeConnectionsAndCallFailureListeners(final MessagingException me)
{
refCount = 0;
mapIterator = null;
checkCloseConnections();
-
- //TODO (after beta5) should really execute on different thread then remove the async in JBossConnection
-
-// threadPool.execute(new Runnable()
-// {
-// public void run()
-// {
- callFailureListeners(me);
-// }
-// });
+
+ // TODO (after beta5) should really execute on different thread then remove the async in JBossConnection
+
+ // threadPool.execute(new Runnable()
+ // {
+ // public void run()
+ // {
+ callFailureListeners(me);
+ // }
+ // });
}
-
+
private void callFailureListeners(final MessagingException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(listeners);
@@ -709,16 +709,6 @@
}
}
- private void closePingers()
- {
- for (Pinger pinger : pingers.values())
- {
- pinger.close();
- }
-
- pingers.clear();
- }
-
/*
* Re-attach sessions all pre-existing sessions to new remoting connections
*/
@@ -867,8 +857,17 @@
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
- closePingers();
+ if (pingerFuture != null)
+ {
+ pingRunnable.cancel();
+ pingerFuture.cancel(false);
+
+ pingRunnable = null;
+
+ pingerFuture = null;
+ }
+
connections.clear();
for (ConnectionEntry entry : copy)
@@ -989,33 +988,21 @@
conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
- connections.put(conn.getID(), new ConnectionEntry(conn, connector));
+ conn.getChannel(0, -1, false).setHandler(new Channel0Handler(conn));
- // Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
- // the server needs this in order to do pinging and failure checking
+ connections.put(conn.getID(), new ConnectionEntry(conn,
+ connector,
+ clientFailureCheckPeriod,
+ System.currentTimeMillis()));
- Pinger pinger = new Pinger(conn,
- clientFailureCheckPeriod,
- new Channel0Handler(conn),
- new FailedConnectionAction(conn),
- 0);
-
- pingers.put(conn.getID(), pinger);
-
- Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
-
- Channel channel0 = conn.getChannel(0, -1, false);
-
- channel0.send(ping);
-
- if (clientFailureCheckPeriod != -1)
+ if (clientFailureCheckPeriod != -1 && pingerFuture == null)
{
- Future<?> pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(pinger,
- clientFailureCheckPeriod,
- clientFailureCheckPeriod,
- TimeUnit.MILLISECONDS);
+ pingRunnable = new PingRunnable();
- pinger.setFuture(pingerFuture);
+ pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(pingRunnable,
+ 0,
+ clientFailureCheckPeriod,
+ TimeUnit.MILLISECONDS);
}
if (debug)
@@ -1144,51 +1131,33 @@
"The connection was closed by the server"));
}
});
- }
- else
- {
- throw new IllegalArgumentException("Invalid packet: " + packet);
- }
+ }
}
}
- private class FailedConnectionAction implements Runnable
- {
- private RemotingConnection conn;
-
- FailedConnectionAction(final RemotingConnection conn)
- {
- this.conn = conn;
- }
-
- public synchronized void run()
- {
- final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Did not receive ping from server for " + conn.getTransportConnection());
-
- threadPool.execute(new Runnable()
- {
- // Must be executed on different thread
- public void run()
- {
- conn.fail(me);
- }
- });
- }
- }
-
private static class ConnectionEntry
{
- ConnectionEntry(final RemotingConnection connection, final Connector connector)
+ ConnectionEntry(final RemotingConnection connection,
+ final Connector connector,
+ final long expiryPeriod,
+ final long createTime)
{
this.connection = connection;
this.connector = connector;
+
+ this.expiryPeriod = expiryPeriod;
+
+ this.lastCheck = createTime;
}
final RemotingConnection connection;
final Connector connector;
+
+ volatile long lastCheck;
+
+ final long expiryPeriod;
}
private class DelegatingBufferHandler extends AbstractBufferHandler
@@ -1263,4 +1232,68 @@
}
}
+ private class PingRunnable implements Runnable
+ {
+ private boolean cancelled;
+
+ private boolean first;
+
+ public synchronized void run()
+ {
+ if (cancelled || (stopPingingAfterOne && !first))
+ {
+ return;
+ }
+
+ first = false;
+
+ synchronized (connections)
+ {
+ long now = System.currentTimeMillis();
+
+ for (ConnectionEntry entry : connections.values())
+ {
+ final RemotingConnection connection = entry.connection;
+
+ if (entry.expiryPeriod != -1 && now >= entry.lastCheck + entry.expiryPeriod)
+ {
+ if (!connection.checkDataReceived())
+ {
+ final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Did not receive data from server for " + connection.getTransportConnection());
+
+ threadPool.execute(new Runnable()
+ {
+ // Must be executed on different thread
+ public void run()
+ {
+ connection.fail(me);
+ }
+ });
+
+ return;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
+ }
+
+ // Send a ping
+
+ Ping ping = new Ping(connectionTTL);
+
+ Channel channel0 = connection.getChannel(0, -1, false);
+
+ channel0.send(ping);
+ }
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ cancelled = true;
+ }
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -103,6 +103,10 @@
long getConnectionTTLOverride();
void setConnectionTTLOverride(long ttl);
+
+ boolean isAsyncConnectionExecutionEnabled();
+
+ void setEnabledAsyncConnectionExecution(boolean enabled);
Set<TransportConfiguration> getAcceptorConfigurations();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -66,6 +66,8 @@
public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
+
+ public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -182,7 +184,9 @@
protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
protected long connectionTTLOverride = DEFAULT_CONNECTION_TTL_OVERRIDE;
-
+
+ protected boolean asyncConnectionExecutionEnabled = DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED;
+
protected long messageExpiryScanPeriod = DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
@@ -410,6 +414,16 @@
{
connectionTTLOverride = ttl;
}
+
+ public boolean isAsyncConnectionExecutionEnabled()
+ {
+ return asyncConnectionExecutionEnabled;
+ }
+
+ public void setEnabledAsyncConnectionExecution(final boolean enabled)
+ {
+ asyncConnectionExecutionEnabled = enabled;
+ }
public List<String> getInterceptorClassNames()
{
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -135,6 +135,8 @@
securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval, GT_ZERO);
connectionTTLOverride = getLong(e, "connection-ttl-override", connectionTTLOverride, MINUS_ONE_OR_GT_ZERO);
+
+ asyncConnectionExecutionEnabled = getBoolean(e, "async-connection-execution-enabled", asyncConnectionExecutionEnabled);
transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout, GT_ZERO);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -390,7 +390,7 @@
if (info == null)
{
- throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+ return;
}
info.decrementConsumers();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -75,5 +75,7 @@
long getBlockingCallTimeout();
- Object getTransferLock();
+ Object getTransferLock();
+
+ boolean checkDataReceived();
}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -1,143 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.remoting.impl;
-
-import java.util.concurrent.Future;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-
-/**
- * A Pinger
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class Pinger implements Runnable, ChannelHandler
-{
- private static final Logger log = Logger.getLogger(Pinger.class);
-
- private volatile boolean closed;
-
- private Future<?> future;
-
- private long lastPingReceived;
-
- private final long expiryPeriod;
-
- private final ChannelHandler extraHandler;
-
- private final Runnable connectionFailedAction;
-
- private final Channel channel0;
-
- private boolean first = true;
-
- private boolean stopPinging;
-
- public Pinger(final RemotingConnection conn, final long expiryPeriod, final ChannelHandler extraHandler,
- final Runnable connectionFailedAction, final long lastPingReceived)
- {
- this.expiryPeriod = expiryPeriod;
-
- this.extraHandler = extraHandler;
-
- this.connectionFailedAction = connectionFailedAction;
-
- this.channel0 = conn.getChannel(0, -1, false);
-
- this.lastPingReceived = lastPingReceived;
-
- channel0.setHandler(this);
- }
-
- public synchronized void setFuture(final Future<?> future)
- {
- this.future = future;
- }
-
- public synchronized void handlePacket(final Packet packet)
- {
- if (closed)
- {
- return;
- }
-
- if (packet.getType() == PacketImpl.PING)
- {
- lastPingReceived = System.currentTimeMillis();
- }
- else if (extraHandler != null)
- {
- extraHandler.handlePacket(packet);
- }
- else
- {
- throw new IllegalStateException("Invalid packet " + packet.getType());
- }
- }
-
- public synchronized void run()
- {
- if (closed)
- {
- return;
- }
-
- if (!first && ( System.currentTimeMillis() - lastPingReceived > expiryPeriod))
- {
- connectionFailedAction.run();
- }
- else if (!stopPinging)
- {
- channel0.send(new Ping());
- }
-
- first = false;
- }
-
- public void close()
- {
- if (future != null)
- {
- future.cancel(false);
- }
-
- channel0.setHandler(null);
-
- closed = true;
- }
-
- public synchronized void stopPinging()
- {
- this.stopPinging = true;
- }
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -26,6 +27,7 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.SimpleIDGenerator;
@@ -79,9 +81,13 @@
private boolean frozen;
private final Object failLock = new Object();
-
+
private final PacketDecoder decoder = new PacketDecoder();
-
+
+ private volatile boolean dataReceived;
+
+ private final Executor executor;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -92,7 +98,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, interceptors, true, true);
+ this(transportConnection, blockingCallTimeout, interceptors, true, true, null);
}
/*
@@ -100,17 +106,19 @@
*/
public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
- final boolean active)
+ final boolean active,
+ final Executor executor)
{
- this(transportConnection, -1, interceptors, active, false);
+ this(transportConnection, -1, interceptors, active, false, executor);
}
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean active,
- final boolean client)
+ final boolean client,
+ final Executor executor)
{
this.transportConnection = transportConnection;
@@ -122,6 +130,8 @@
this.active = active;
this.client = client;
+
+ this.executor = executor;
}
// RemotingConnection implementation
@@ -294,27 +304,36 @@
{
return transferLock;
}
-
+
public boolean isActive()
{
return active;
}
-
+
public boolean isClient()
{
return client;
}
-
+
public boolean isDestroyed()
{
return destroyed;
}
-
+
public long getBlockingCallTimeout()
{
return blockingCallTimeout;
}
+ public boolean checkDataReceived()
+ {
+ boolean res = dataReceived;
+
+ dataReceived = false;
+
+ return res;
+ }
+
// Buffer Handler implementation
// ----------------------------------------------------
@@ -322,6 +341,28 @@
{
final Packet packet = decoder.decode(buffer);
+ if (executor == null || packet.getType() == PacketImpl.PING)
+ {
+ // Pings must always be handled out of band so we can send pings back to the client quickly
+ // otherwise they would get in the queue with everything else which might give a intolerable delay
+ doBufferReceived(packet);
+ }
+ else
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ doBufferReceived(packet);
+ }
+ });
+ }
+
+ dataReceived = true;
+ }
+
+ private void doBufferReceived(final Packet packet)
+ {
synchronized (transferLock)
{
if (!frozen)
@@ -432,5 +473,5 @@
channel.close();
}
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -40,19 +40,15 @@
private long connectionTTL;
- private long clientFailureCheckPeriod;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public Ping(final long clientFailureCheckPeriod, final long connectionTTL)
+ public Ping(final long connectionTTL)
{
super(PING);
this.connectionTTL = connectionTTL;
-
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
}
public Ping()
@@ -66,32 +62,25 @@
{
return true;
}
-
+
public long getConnectionTTL()
{
return connectionTTL;
}
- public long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + 2 * DataConstants.SIZE_LONG;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
}
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeLong(connectionTTL);
- buffer.writeLong(clientFailureCheckPeriod);
}
public void decodeBody(final MessagingBuffer buffer)
{
connectionTTL = buffer.readLong();
- clientFailureCheckPeriod = buffer.readLong();
}
@Override
@@ -99,7 +88,6 @@
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", connectionTTL=" + connectionTTL);
- buf.append(", clientFailureCheckPeriod=" + clientFailureCheckPeriod);
buf.append("]");
return buf.toString();
}
@@ -113,8 +101,7 @@
Ping r = (Ping)other;
- return super.equals(other) && this.connectionTTL == r.connectionTTL &&
- this.clientFailureCheckPeriod == r.clientFailureCheckPeriod;
+ return super.equals(other) && this.connectionTTL == r.connectionTTL;
}
public final boolean isRequiresConfirmations()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -36,8 +36,6 @@
*/
public interface RemotingService extends MessagingComponent
{
- RemotingConnection getConnection(Object remotingConnectionID);
-
/**
* Remove a connection from the connections held by the remoting service.
* <strong>This method must be used only from the management API.
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -13,7 +13,6 @@
package org.jboss.messaging.core.remoting.server.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
import java.util.ArrayList;
import java.util.HashMap;
@@ -23,9 +22,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -38,7 +35,6 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -66,7 +62,7 @@
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
- private static final long INITIAL_PING_TIMEOUT = 10000;
+ private static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
// Attributes ----------------------------------------------------
@@ -78,7 +74,7 @@
private final Set<Acceptor> acceptors = new HashSet<Acceptor>();
- private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+ private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
private final BufferHandler bufferHandler = new DelegatingBufferHandler();
@@ -94,10 +90,10 @@
private final ScheduledExecutorService scheduledThreadPool;
- private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
-
private final int managementConnectorID;
+ private FailureCheckThread failureCheckThread;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -152,7 +148,11 @@
AcceptorFactory factory = (AcceptorFactory)clazz.newInstance();
- Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this, threadPool, scheduledThreadPool);
+ Acceptor acceptor = factory.createAcceptor(info.getParams(),
+ bufferHandler,
+ this,
+ threadPool,
+ scheduledThreadPool);
acceptors.add(acceptor);
@@ -198,6 +198,10 @@
a.start();
}
+ failureCheckThread = new FailureCheckThread(CONNECTION_TTL_CHECK_INTERVAL);
+
+ failureCheckThread.start();
+
started = true;
}
@@ -224,32 +228,22 @@
{
return;
}
-
+
+ failureCheckThread.close();
+
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
acceptor.stop();
}
- for (RemotingConnection connection : connections.values())
+ for (ConnectionEntry entry : connections.values())
{
- connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
+ entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
-// for (Acceptor acceptor : acceptors)
-// {
-// acceptor.stop();
-// }
-
acceptors.clear();
- for (Pinger pinger : pingers.values())
- {
- pinger.close();
- }
-
- pingers.clear();
-
connections.clear();
started = false;
@@ -260,19 +254,30 @@
return started;
}
- public RemotingConnection getConnection(final Object remotingConnectionID)
+ public RemotingConnection removeConnection(final Object remotingConnectionID)
{
- return connections.get(remotingConnectionID);
- }
+ ConnectionEntry entry = this.connections.remove(remotingConnectionID);
- public RemotingConnection removeConnection(final Object remotingConnectionID)
- {
- return closeConnection(remotingConnectionID);
+ if (entry != null)
+ {
+ return entry.connection;
+ }
+ else
+ {
+ return null;
+ }
}
public synchronized Set<RemotingConnection> getConnections()
{
- return new HashSet<RemotingConnection>(connections.values());
+ Set<RemotingConnection> conns = new HashSet<RemotingConnection>();
+
+ for (ConnectionEntry entry : this.connections.values())
+ {
+ conns.add(entry.connection);
+ }
+
+ return conns;
}
public RemotingConnection getServerSideReplicatingConnection()
@@ -288,25 +293,48 @@
{
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
}
+
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ !config.isBackup(),
+ server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
+ .getExecutor()
+ : null);
- RemotingConnection rc = new RemotingConnectionImpl(connection, interceptors, !config.isBackup());
-
Channel channel1 = rc.getChannel(1, -1, false);
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
channel1.setHandler(handler);
- connections.put(connection.getID(), rc);
+ final ConnectionEntry entry = new ConnectionEntry(rc,
+ System.currentTimeMillis(),
+ config.getConnectionTTLOverride());
- InitialPingTimeout runnable = new InitialPingTimeout(rc);
+ connections.put(connection.getID(), entry);
- // We schedule an initial ping timeout. An inital ping is always sent from the client as the first thing it
- // does after creating a connection, this contains the ping period and connection TTL, if it doesn't
- // arrive the connection will get closed
+ final Channel channel0 = rc.getChannel(0, -1, false);
- scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
-
+ channel0.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.PING)
+ {
+ Ping ping = (Ping)packet;
+
+ if (config.getConnectionTTLOverride() == -1)
+ {
+ // Allow clients to specify connection ttl
+ entry.ttl = ping.getConnectionTTL();
+ }
+
+ // Just send a ping back
+ channel0.send(packet);
+ }
+ }
+ });
+
if (config.isBackup())
{
serverSideReplicatingConnection = rc;
@@ -315,19 +343,19 @@
public void connectionDestroyed(final Object connectionID)
{
- RemotingConnection conn = connections.get(connectionID);
-
+ ConnectionEntry conn = connections.get(connectionID);
+
if (conn != null)
{
// if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
// shutdown, therefore we can destroy the connection
// otherwise client might have crashed/exited without closing connections so we leave them for connection TTL
- if (conn.getFailureListeners().isEmpty())
+ if (conn.connection.getFailureListeners().isEmpty())
{
- closeConnection(connectionID);
+ connections.remove(connectionID);
- conn.destroy();
+ conn.connection.destroy();
}
}
}
@@ -356,142 +384,137 @@
// Public --------------------------------------------------------
- public void stopPingingForConnectionID(final Object connectionID)
- {
- Pinger pinger = pingers.get(connectionID);
-
- pinger.stopPinging();
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- private void setupPinger(final RemotingConnection conn,
- final long clientFailureCheckPeriod,
- final long connectionTTL)
+ // Inner classes -------------------------------------------------
+
+ private final class DelegatingBufferHandler extends AbstractBufferHandler
{
- if ((connectionTTL <= 0 || clientFailureCheckPeriod <= 0) && connectionTTL != -1 &&
- clientFailureCheckPeriod != -1)
+ public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
- log.warn("Invalid values of connectionTTL/clientFailureCheckPeriod");
+ ConnectionEntry conn = connections.get(connectionID);
- closeConnection(conn.getID());
-
- return;
+ if (conn != null)
+ {
+ conn.connection.bufferReceived(connectionID, buffer);
+ }
}
+ }
- long connectionTTLToUse = config.getConnectionTTLOverride() != -1 ? config.getConnectionTTLOverride()
- : connectionTTL;
+ private static final class ConnectionEntry
+ {
+ final RemotingConnection connection;
- long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
+ volatile long lastCheck;
- Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
+ volatile long ttl;
- Future<?> pingFuture = scheduledThreadPool.scheduleWithFixedDelay(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
+ ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+ {
+ this.connection = connection;
- pingRunnable.setFuture(pingFuture);
+ this.lastCheck = lastCheck;
- pingers.put(conn.getID(), pingRunnable);
- }
-
- private RemotingConnection closeConnection(final Object connectionID)
- {
- RemotingConnection connection = connections.remove(connectionID);
-
- Pinger pinger = pingers.remove(connectionID);
-
- if (pinger != null)
- {
- pinger.close();
+ this.ttl = ttl;
}
-
- return connection;
}
- // Inner classes -------------------------------------------------
-
- private class InitialPingTimeout implements Runnable, ChannelHandler
+ private final class FailureCheckThread extends Thread
{
- private final RemotingConnection conn;
+ private long pauseInterval;
- private boolean gotInitialPing;
+ private volatile boolean closed;
- private InitialPingTimeout(final RemotingConnection conn)
+ FailureCheckThread(final long pauseInterval)
{
- this.conn = conn;
-
- conn.getChannel(0, -1, false).setHandler(this);
+ this.pauseInterval = pauseInterval;
}
-
- public synchronized void handlePacket(final Packet packet)
+
+ public synchronized void close()
{
- final byte type = packet.getType();
+ closed = true;
- if (type == PING)
+ synchronized (this)
{
- if (!gotInitialPing)
- {
- Ping ping = (Ping)packet;
+ notify();
+ }
- setupPinger(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
-
- gotInitialPing = true;
- }
+ try
+ {
+ join();
}
- else
+ catch (InterruptedException ignore)
{
- throw new IllegalArgumentException("Invalid packet: " + packet);
}
}
- public synchronized void run()
+ public void run()
{
- if (!gotInitialPing)
- {
- // Never received initial ping
- log.warn("Did not receive initial ping from " + conn.getRemoteAddress() + ", connection will be closed");
+ while (!closed)
+ {
+ long now = System.currentTimeMillis();
- closeConnection(conn);
+ Set<Object> idsToRemove = new HashSet<Object>();
- conn.destroy();
- }
- }
- }
+ for (ConnectionEntry entry : connections.values())
+ {
+ if (entry.ttl != -1)
+ {
+ if (now >= entry.lastCheck + entry.ttl)
+ {
+ RemotingConnection conn = entry.connection;
- private class FailedConnectionAction implements Runnable
- {
- private RemotingConnection conn;
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
+ }
+ }
+ }
- FailedConnectionAction(final RemotingConnection conn)
- {
- this.conn = conn;
- }
+ for (Object id : idsToRemove)
+ {
+ RemotingConnection conn = removeConnection(id);
- public synchronized void run()
- {
- removeConnection(conn.getID());
+ MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Did not receive ping from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ conn.fail(me);
+ }
- MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Did not receive ping from " + conn.getRemoteAddress() + ". It is likely the client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- conn.fail(me);
- }
- }
+ long start = System.currentTimeMillis();
- private class DelegatingBufferHandler extends AbstractBufferHandler
- {
- public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
- {
- RemotingConnection conn = connections.get(connectionID);
+ while (!closed && toWait > 0)
+ {
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- if (conn != null)
- {
- conn.bufferReceived(connectionID, buffer);
+ now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+ }
}
}
}
-
}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/MemoryManager.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+
+/**
+ * A MemoryManager
+
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2796 $</tt>
+ *
+ * $Id: MemoryManager.java 2796 2007-06-25 22:24:41Z timfox $
+ *
+ */
+public interface MemoryManager extends MessagingComponent
+{
+ boolean isMemoryLow();
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -18,7 +18,6 @@
import javax.management.MBeanServer;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.deployers.DeploymentManager;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.impl.MessagingServerControlImpl;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -35,6 +34,7 @@
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
@@ -63,11 +63,11 @@
Version getVersion();
MessagingServerControlImpl getMessagingServerControl();
-
+
void registerActivateCallback(ActivateCallback callback);
-
+
void unregisterActivateCallback(ActivateCallback callback);
-
+
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
CreateSessionResponseMessage createSession(String name,
@@ -141,9 +141,10 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
-
+
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
+ ExecutorFactory getExecutorFactory();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -565,13 +565,6 @@
this.bridge = bridge;
}
- // public synchronized void reset() throws Exception
- // {
- // clearBindings();
- //
- // firstReset = false;
- // }
-
public synchronized void onMessage(final ClientMessage message)
{
try
Added: trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MemoryManagerImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MemoryManager;
+
+/**
+ * A MemoryManager
+
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ *
+ */
+public class MemoryManagerImpl implements MemoryManager
+{
+ private static final Logger log = Logger.getLogger(MemoryManagerImpl.class);
+
+ private static final long DEFAULT_MEASURE_INTERVAL = 3000;
+
+ private static final int DEFAULT_FREE_MEMORY_PERCENT = 25;
+
+ private Runtime runtime;
+
+ //TODO Should be configurable
+ private long measureInterval;
+
+ //TODO Should be configurable
+ private int freeMemoryPercent;
+
+ private volatile boolean started;
+
+ private Thread thread;
+
+ private volatile boolean low;
+
+ public MemoryManagerImpl()
+ {
+ runtime = Runtime.getRuntime();
+
+ this.measureInterval = DEFAULT_MEASURE_INTERVAL;
+
+ this.freeMemoryPercent = DEFAULT_FREE_MEMORY_PERCENT;
+ }
+
+ public boolean isMemoryLow()
+ {
+ return low;
+ }
+
+ public synchronized boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized void start()
+ {
+ log.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval
+ + " FREE_MEMORY_PERCENT: " + freeMemoryPercent);
+
+ if (started)
+ {
+ //Already started
+ return;
+ }
+
+ started = true;
+
+ thread = new Thread(new MemoryRunnable());
+
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ //Already stopped
+ return;
+ }
+
+ started = false;
+
+ thread.interrupt();
+
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ private class MemoryRunnable implements Runnable
+ {
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ if (thread.isInterrupted() && !started)
+ {
+ break;
+ }
+
+ Thread.sleep(measureInterval);
+ }
+ catch (InterruptedException ignore)
+ {
+ if (!started)
+ {
+ break;
+ }
+ }
+
+ long maxMemory = runtime.maxMemory();
+
+ long totalMemory = runtime.totalMemory();
+
+ long availableMemory = maxMemory - totalMemory;
+
+ double currentFreeMemoryPercent = 100 * (double)availableMemory / maxMemory;
+
+ log.info("Free memory " + currentFreeMemoryPercent + "% " +
+ " Calculated available memory " + availableMemory +
+ " Total mem " + totalMemory);
+
+ if (currentFreeMemoryPercent <= freeMemoryPercent)
+ {
+ log.warn("Less than " + freeMemoryPercent + "% (" + currentFreeMemoryPercent + "% total: " + totalMemory +
+ " available " + availableMemory
+ + ") of total available memory free. " +
+ "You are in danger of running out of RAM. Have you set paging parameters " +
+ "on your addresses? (See user manual \"Paging\" chapter)");
+
+ low = true;
+ }
+ else
+ {
+ low = false;
+ }
+
+ }
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -82,6 +82,7 @@
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
import org.jboss.messaging.core.server.ActivateCallback;
import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.MemoryManager;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
@@ -173,9 +174,11 @@
private RemotingService remotingService;
private ManagementService managementService;
+
+ private MemoryManager memoryManager;
private DeploymentManager deploymentManager;
-
+
private Deployer basicUserCredentialsDeployer;
private Deployer addressSettingsDeployer;
@@ -382,6 +385,8 @@
threadPool = null;
pagingManager.stop();
+
+ memoryManager.stop();
pagingManager = null;
securityStore = null;
@@ -392,6 +397,7 @@
queueFactory = null;
resourceManager = null;
messagingServerControl = null;
+ memoryManager = null;
sessions.clear();
@@ -747,6 +753,11 @@
{
activateCallbacks.remove(callback);
}
+
+ public ExecutorFactory getExecutorFactory()
+ {
+ return executorFactory;
+ }
// Public
// ---------------------------------------------------------------------------------------
@@ -874,6 +885,10 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
remotingService = new RemotingServiceImpl(configuration, this, managementService, threadPool, scheduledPool, managementConnectorID);
+
+ memoryManager = new MemoryManagerImpl();
+
+ memoryManager.start();
}
private void initialisePart2() throws Exception
@@ -1349,11 +1364,11 @@
{
if (version.getIncrementingVersion() != incrementingVersion)
{
- throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
- "Client with version " + incrementingVersion + " is not compatible with server version " +
+ log.warn("Client with version " + incrementingVersion + " is not compatible with server version " +
version.getFullVersion() + ". " +
"Please ensure all clients and servers are upgraded to the same version for them to " +
"interoperate");
+ return null;
}
// Is this comment relevant any more ?
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -212,6 +212,11 @@
request.isPreAcknowledge(),
request.isXA(),
request.getWindowSize());
+
+ if (response == null)
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS));
+ }
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -125,7 +125,7 @@
public void write(final MessagingBuffer buffer, final boolean flush)
{
- ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());
+ ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());
if (flush)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -40,7 +40,6 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.jms.client.JBossBytesMessage;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -107,7 +107,7 @@
final RemotingConnectionImpl conn1 = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
- ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).cancelPingerForConnectionID(conn1.getID());
+ ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).stopPingingAfterOne();
for (int i = 0; i < 1000; i++)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/FloodServerTest.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -24,14 +24,12 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -49,7 +47,6 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.Topic;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -157,7 +154,7 @@
serverManager.createConnectionFactory("ManualReconnectionToSingleServerTest",
connectorConfigs,
null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ 1000,
DEFAULT_CONNECTION_TTL,
callTimeout,
DEFAULT_MAX_CONNECTIONS,
@@ -185,116 +182,100 @@
jndiBindings);
}
-
- public void testFoo()
- {
- }
-
- public void _testFlood() throws Exception
- {
- Connection connection = null;
- try
+ public void testFoo()
+ {
+ }
+
+ public void _testFlood() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
+
+ final int numProducers = 20;
+
+ final int numConsumers = 20;
+
+ final int numMessages = 10000;
+
+ ProducerThread[] producers = new ProducerThread[numProducers];
+
+ for (int i = 0; i < numProducers; i++)
{
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
-
- connection = cf.createConnection();
-
- final int numProducers = 20;
-
- final int numConsumers = 20;
-
- final int numMessages = 100000;
-
- ProducerThread[] producers = new ProducerThread[numProducers];
-
- for (int i = 0; i < numProducers; i++)
- {
- producers[i] = new ProducerThread(cf, numMessages);
- }
-
- ConsumerThread[] consumers = new ConsumerThread[numConsumers];
-
- for (int i = 0; i < numConsumers; i++)
- {
- consumers[i] = new ConsumerThread(cf, numMessages);
- }
-
-
- for (int i = 0; i < numConsumers; i++)
- {
- consumers[i].start();
- }
-
- for (int i = 0; i < numProducers; i++)
- {
- producers[i].start();
- }
-
- for (int i = 0; i < numConsumers; i++)
- {
- consumers[i].join();
- }
-
- for (int i = 0; i < numProducers; i++)
- {
- producers[i].join();
- }
-
+ producers[i] = new ProducerThread(cf, numMessages);
}
- finally
+
+ ConsumerThread[] consumers = new ConsumerThread[numConsumers];
+
+ for (int i = 0; i < numConsumers; i++)
{
- if (connection != null)
- {
- connection.close();
- }
+ consumers[i] = new ConsumerThread(cf, numMessages);
}
-
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ consumers[i].start();
+ }
+
+ for (int i = 0; i < numProducers; i++)
+ {
+ producers[i].start();
+ }
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ consumers[i].join();
+ }
+
+ for (int i = 0; i < numProducers; i++)
+ {
+ producers[i].join();
+ }
+
}
-
+
class ProducerThread extends Thread
{
private Connection connection;
-
+
private Session session;
-
+
private MessageProducer producer;
-
+
private int numMessages;
-
+
ProducerThread(ConnectionFactory cf, int numMessages) throws Exception
{
connection = cf.createConnection();
-
+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
producer = session.createProducer(new JBossTopic("my-topic"));
-
+
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
this.numMessages = numMessages;
}
-
+
public void run()
{
try
{
byte[] bytes = new byte[1000];
-
+
BytesMessage message = session.createBytesMessage();
-
+
message.writeBytes(bytes);
-
+
for (int i = 0; i < numMessages; i++)
{
producer.send(message);
-
- if (i % 1000 == 0)
- {
- log.info("Producer " + this + " sent " + i);
- }
+
+// if (i % 1000 == 0)
+// {
+// log.info("Producer " + this + " sent " + i);
+// }
}
-
+
connection.close();
}
catch (Exception e)
@@ -303,50 +284,50 @@
}
}
}
-
+
class ConsumerThread extends Thread
{
private Connection connection;
-
+
private Session session;
-
+
private MessageConsumer consumer;
-
+
private int numMessages;
-
+
ConsumerThread(ConnectionFactory cf, int numMessages) throws Exception
{
connection = cf.createConnection();
-
+
connection.start();
-
+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
consumer = session.createConsumer(new JBossTopic("my-topic"));
-
+
this.numMessages = numMessages;
}
-
+
public void run()
{
try
- {
+ {
for (int i = 0; i < numMessages; i++)
{
Message msg = consumer.receive();
-
+
if (msg == null)
{
log.error("message is null");
break;
}
-
- if (i % 1000 == 0)
- {
- log.info("Consumer " + this + " received " + i);
- }
+
+// if (i % 1000 == 0)
+// {
+// log.info("Consumer " + this + " received " + i);
+// }
}
-
+
connection.close();
}
catch (Exception e)
@@ -355,5 +336,5 @@
}
}
}
-
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-08-14 14:29:26 UTC (rev 7729)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-08-14 14:51:58 UTC (rev 7730)
@@ -28,7 +28,6 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionInternal;
import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -36,8 +35,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -111,6 +108,8 @@
csf.setConnectionTTL(CLIENT_FAILURE_CHECK_PERIOD * 2);
ClientSession session = csf.createSession(false, true, true);
+
+ log.info("Created session");
assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
@@ -231,12 +230,10 @@
session.addFailureListener(clientListener);
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+ // We need to get it to stop pinging after one
- // We need to get it to stop pinging
+ ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).stopPingingAfterOne();
- ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).cancelPingerForConnectionID(conn.getID());
-
RemotingConnection serverConn = null;
while (serverConn == null)
@@ -330,7 +327,10 @@
serverConn.addFailureListener(serverListener);
- ((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
+ //((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
+
+ //Setting the handler to null will prevent server sending pings back to client
+ serverConn.getChannel(0, -1, false).setHandler(null);
for (int i = 0; i < 1000; i++)
{
@@ -344,6 +344,7 @@
}
assertNotNull(clientListener.getException());
+
//Server connection will be closed too, when client closes client side connection after failure is detected
assertTrue(server.getRemotingService().getConnections().isEmpty());
More information about the jboss-cvs-commits
mailing list