Author: timfox
Date: 2010-05-18 12:30:21 -0400 (Tue, 18 May 2010)
New Revision: 9244
Modified:
trunk/docs/user-manual/en/paging.xml
trunk/docs/user-manual/en/thread-pooling.xml
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
Log:
https://jira.jboss.org/browse/HORNETQ-390 and some tweaks
Modified: trunk/docs/user-manual/en/paging.xml
===================================================================
--- trunk/docs/user-manual/en/paging.xml 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/paging.xml 2010-05-18 16:30:21 UTC (rev 9244)
@@ -155,6 +155,7 @@
able to continue sending.</para>
<para>To do this just set the
<literal>address-full-policy</literal> to <literal
BLOCK</literal> in the address settings</para>
+
<para>In the default configuration, all addresses are configured to block
producers after 10 MiB of data are in the address.</para>
</section>
<section>
<title>Caution with Addresses with Multiple Queues</title>
Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 16:30:21 UTC (rev 9244)
@@ -28,14 +28,18 @@
thread pool for scheduled use. A Java scheduled thread pool cannot be
configured to use
a standard thread pool, otherwise we could use a single thread pool for both
scheduled
and non scheduled activity.</para>
- <para>When using old (blocking) IO, a separate thread pool is also used to
service connections. Since old IO requires a thread per connection
- it does not make sense to get them from the standard pool as the pool will easily
get exhausted if too many connections are made, resulting in the
- server "hanging" since it has no remaining threads to do anything
else.</para>
- <para>When using new IO (NIO), HornetQ will, by default, use a number of
threads equal to three times the number of cores (or hyper-threads)
- as reported by Runtime.getRuntime().availableProcessors() for processing
incoming packets.
- If you want to override this value, you can set the number of threads by
specifying the parameter <literal>nio-remoting-threads</literal>
- in the transport configuration. See the
- <xref linkend="configuring-transports"/> for more information
on this.</para>
+ <para>When using old (blocking) IO, a separate thread pool is also used to
service
+ connections. Since old IO requires a thread per connection it does not make
sense to get
+ them from the standard pool as the pool will easily get exhausted if too
many
+ connections are made, resulting in the server "hanging" since it
has no remaining
+ threads to do anything else. If you require the server to handle many
concurrent
+ connections you should make sure you use NIO, not old IO.</para>
+ <para>When using new IO (NIO), HornetQ will, by default, use a number of
threads equal to
+ three times the number of cores (or hyper-threads) as reported by
+ Runtime.getRuntime().availableProcessors() for processing incoming packets.
If you want
+ to override this value, you can set the number of threads by specifying the
parameter
+ <literal>nio-remoting-threads</literal> in the transport
configuration. See the
+ <xref linkend="configuring-transports"/> for more
information on this.</para>
<para>There are also a small number of other places where threads are used
directly, we'll
discuss each in turn.</para>
<section id="server.scheduled.thread.pool">
@@ -111,8 +115,9 @@
myFactory.setUseGlobalPools(false);
myFactory.setScheduledThreadPoolMaxSize(10);
myFactory.setThreadPoolMaxSize(-1); </programlisting>
- <para>If you're using the JMS API, you can set the same parameters on
the ClientSessionFactory and use it to create the <literal
- >ConnectionFactory</literal> instance, for
example:</para>
+ <para>If you're using the JMS API, you can set the same parameters on
the
+ ClientSessionFactory and use it to create the
<literal>ConnectionFactory</literal>
+ instance, for example:</para>
<programlisting>ConnectionFactory myConnectionFactory =
HornetQJMSClient.createConnectionFactory(myFactory); </programlisting>
<para>If you're using JNDI to instantiate
<literal>HornetQConnectionFactory</literal>
instances, you can also set these parameters in the
<literal>hornetq-jms.xml</literal>
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18
13:38:01 UTC (rev 9243)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18
16:30:21 UTC (rev 9244)
@@ -165,7 +165,7 @@
try
{
- session.close();
+ session.close(true);
}
catch (Exception e)
{
@@ -181,7 +181,7 @@
try
{
- session.close();
+ session.close(false);
}
catch (Exception e)
{
@@ -422,7 +422,7 @@
case SESS_CLOSE:
{
requiresResponse = true;
- session.close();
+ session.close(false);
removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18
13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18
16:30:21 UTC (rev 9244)
@@ -54,7 +54,7 @@
// TODO use same value than HornetQConnection
private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -108,7 +108,7 @@
this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
-
+
// ProtocolManager implementation --------------------------------
public ConnectionEntry createConnectionEntry(final Connection connection)
@@ -131,26 +131,25 @@
if (frame == null)
{
return -1;
- }
+ }
else
{
return buffer.readerIndex() - start;
}
}
-
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
{
try
{
doHandleBuffer(connection, buffer);
- }
+ }
finally
{
server.getStorageManager().clearContext();
}
}
-
+
private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
{
StompConnection conn = (StompConnection)connection;
@@ -162,7 +161,7 @@
{
log.trace("received " + request);
}
-
+
String command = request.getCommand();
StompFrame response = null;
@@ -269,6 +268,7 @@
}
}
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -290,11 +290,12 @@
}
if (noLocal)
{
- String noLocalFilter = CONNECTION_ID_PROP + " <> '" +
connection.getID().toString() + "'";
+ String noLocalFilter = CONNECTION_ID_PROP + " <> '" +
connection.getID().toString() + "'";
if (selector == null)
{
- selector = noLocalFilter;
- } else
+ selector = noLocalFilter;
+ }
+ else
{
selector += " AND " + noLocalFilter;
}
@@ -325,7 +326,13 @@
}
long consumerID = server.getStorageManager().generateUniqueID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() :
null;
- stompSession.addSubscription(consumerID, subscriptionID, clientID,
durableSubscriptionName, destination, selector, ack);
+ stompSession.addSubscription(consumerID,
+ subscriptionID,
+ clientID,
+ durableSubscriptionName,
+ destination,
+ selector,
+ ack);
return null;
}
@@ -423,7 +430,7 @@
}
StompSession session = getTransactedSession(connection, txID);
-
+
if (session == null)
{
throw new StompException("No transaction started: " + txID);
@@ -567,15 +574,16 @@
{
connection.setValid(false);
- try {
+ try
+ {
StompSession session = sessions.remove(connection.getID());
if (session != null)
{
try
{
session.getSession().rollback(true);
- session.getSession().close();
- }
+ session.getSession().close(false);
+ }
catch (Exception e)
{
log.warn(e.getMessage(), e);
@@ -593,7 +601,7 @@
try
{
serverSession.rollback(true);
- serverSession.close();
+ serverSession.close(false);
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-05-18 13:38:01 UTC
(rev 9243)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-05-18 16:30:21 UTC
(rev 9244)
@@ -28,9 +28,9 @@
{
long getID();
- void close() throws Exception;
+ void close(boolean failed) throws Exception;
- List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction
tx) throws Exception;
+ List<MessageReference> cancelRefs(boolean failed, boolean
lastConsumedAsDelivered, Transaction tx) throws Exception;
void setStarted(boolean started);
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 13:38:01 UTC (rev
9243)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 16:30:21 UTC (rev
9244)
@@ -107,7 +107,7 @@
void requestProducerCredits(SimpleString address, int credits) throws Exception;
- void close() throws Exception;
+ void close(boolean failed) throws Exception;
void setTransferring(boolean transferring);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-18
13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-18
16:30:21 UTC (rev 9244)
@@ -262,7 +262,7 @@
return filter;
}
- public void close() throws Exception
+ public void close(final boolean failed) throws Exception
{
setStarted(false);
@@ -278,7 +278,7 @@
session.removeConsumer(id);
- LinkedList<MessageReference> refs = cancelRefs(false, null);
+ LinkedList<MessageReference> refs = cancelRefs(failed, false, null);
Iterator<MessageReference> iter = refs.iterator();
@@ -356,7 +356,7 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean
lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed, final
boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
@@ -374,7 +374,13 @@
}
else
{
- ref.decrementDeliveryCount();
+ if (!failed)
+ {
+ //We don't decrement delivery count if the client failed, since
there's a possibility that refs were actually delivered but we just didn't get any
acks for them
+ //before failure
+ log.info("decrementing delivery count");
+ ref.decrementDeliveryCount();
+ }
refs.add(ref);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 13:38:01
UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 16:30:21
UTC (rev 9244)
@@ -241,20 +241,20 @@
}
}
- private synchronized void doClose() throws Exception
+ private synchronized void doClose(final boolean failed) throws Exception
{
if (tx != null && tx.getXid() == null)
{
// We only rollback local txs on close, not XA tx branches
- rollback(false);
+ rollback(failed);
}
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
for (ServerConsumer consumer : consumersClone)
{
- consumer.close();
+ consumer.close(failed);
}
consumers.clear();
@@ -905,7 +905,7 @@
setStarted(false);
}
- public void close()
+ public void close(final boolean failed)
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -917,7 +917,7 @@
{
try
{
- doClose();
+ doClose(failed);
}
catch (Exception e)
{
@@ -933,7 +933,7 @@
if (consumer != null)
{
- consumer.close();
+ consumer.close(false);
}
else
{
@@ -1065,7 +1065,7 @@
{
ServerSessionImpl.log.warn("Client connection failed, clearing up resources
for session " + name);
- close();
+ close(true);
ServerSessionImpl.log.warn("Cleared up resources for session " +
name);
}
@@ -1134,7 +1134,7 @@
consumer.setStarted(false);
}
- toCancel.addAll(consumer.cancelRefs(lastMessageAsDelived, theTx));
+ toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
for (MessageReference ref : toCancel)
Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-05-18
13:38:01 UTC (rev 9243)
+++
trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-05-18
16:30:21 UTC (rev 9244)
@@ -61,8 +61,7 @@
{
assertActiveConnections(0);
- // spawn a JVM that creates a Core client, which waits to receive a test
- // message
+ // spawn a JVM that creates a Core client, which sends a message
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
ClientSession session = sf.createSession(false, true, true);
@@ -72,7 +71,7 @@
session.start();
- // send the message to the queue
+ // receive a message from the queue
Message messageFromClient = consumer.receive(5000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT,
messageFromClient.getBodyBuffer().readString());
@@ -110,7 +109,42 @@
// FIXME
https://jira.jboss.org/jira/browse/JBMESSAGING-1421
assertActiveSession(0);
}
+
+ public void testCrashClient2() throws Exception
+ {
+ assertActiveConnections(0);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
+
+ // spawn a JVM that creates a Core client, which sends a message
+ Process p = SpawnedVMSupport.spawnVM(CrashClient2.class.getName());
+
+ ClientCrashTest.log.debug("waiting for the client VM to crash ...");
+ p.waitFor();
+
+ Assert.assertEquals(9, p.exitValue());
+
+ System.out.println("VM Exited");
+
+ Thread.sleep(3 * ClientCrashTest.CONNECTION_TTL);
+
+ ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
+
+ session.start();
+
+ // receive a message from the queue
+ ClientMessage messageFromClient = consumer.receive(10000);
+ Assert.assertNotNull("no message received", messageFromClient);
+ Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT,
messageFromClient.getBodyBuffer().readString());
+
+ assertEquals(2, messageFromClient.getDeliveryCount());
+
+ session.close();
+
+ }
+
// Package protected ---------------------------------------------
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-05-18
13:38:01 UTC (rev 9243)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-05-18
16:30:21 UTC (rev 9244)
@@ -23,8 +23,6 @@
/**
* Code to be run in an external VM, via main().
*
- * This client will open a connection, receive a message and crash.
- *
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu
Feodorov</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*