[jboss-cvs] JBoss Messaging SVN: r3197 - in trunk/src: main/org/jboss/jms/client and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 18 11:51:21 EDT 2007
Author: timfox
Date: 2007-10-18 11:51:20 -0400 (Thu, 18 Oct 2007)
New Revision: 3197
Modified:
trunk/src/etc/remoting/remoting-http-service.xml
trunk/src/etc/remoting/remoting-sslbisocket-service.xml
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Few tweaks etc
Modified: trunk/src/etc/remoting/remoting-http-service.xml
===================================================================
--- trunk/src/etc/remoting/remoting-http-service.xml 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/etc/remoting/remoting-http-service.xml 2007-10-18 15:51:20 UTC (rev 3197)
@@ -14,14 +14,16 @@
<attribute name="Configuration">
<config>
<invoker transport="http">
+ <!-- There should be no reason to change these parameters - warning!
+ Changing them may stop JBoss Messaging working correctly -->
<attribute name="marshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
<attribute name="unmarshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
<attribute name="dataType" isParam="true">jms</attribute>
<attribute name="serverBindAddress">${jboss.bind.address}</attribute>
<attribute name="serverBindPort">4458</attribute>
- <attribute name="leasePeriod">10000</attribute>
<attribute name="socket.check_connection" isParam="true">false</attribute>
<attribute name="callbackStore">org.jboss.remoting.callback.BlockingCallbackStore</attribute>
+
<attribute name="callbackPollPeriod" isParam="true">102</attribute>
<attribute name="clientLeasePeriod" isParam="true">10000</attribute>
</invoker>
Modified: trunk/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- trunk/src/etc/remoting/remoting-sslbisocket-service.xml 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/etc/remoting/remoting-sslbisocket-service.xml 2007-10-18 15:51:20 UTC (rev 3197)
@@ -14,21 +14,35 @@
display-name="SSL Bisocket Transport Connector">
<attribute name="Configuration">
<config>
- <invoker transport="sslbisocket">
+ <invoker transport="sslbisocket">
+
+ <!-- There should be no reason to change these parameters - warning!
+ Changing them may stop JBoss Messaging working correctly -->
<attribute name="marshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
<attribute name="unmarshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
<attribute name="dataType" isParam="true">jms</attribute>
<attribute name="socket.check_connection" isParam="true">false</attribute>
<attribute name="timeout" isParam="true">0</attribute>
<attribute name="serverBindAddress">${jboss.bind.address}</attribute>
- <attribute name="serverBindPort">5457</attribute>
- <attribute name="leasePeriod">10000</attribute>
+ <attribute name="serverBindPort">5457</attribute>
<attribute name="clientSocketClass" isParam="true">org.jboss.jms.client.remoting.ClientSocketWrapper</attribute>
<attribute name="serverSocketClass">org.jboss.jms.server.remoting.ServerSocketWrapper</attribute>
- <attribute name="numberOfRetries" isParam="true">1</attribute>
+ <attribute name="serverSocketFactory">jboss.messaging:service=ServerSocketFactory,type=SSL</attribute>
<attribute name="numberOfCallRetries" isParam="true">1</attribute>
- <attribute name="clientMaxPoolSize" isParam="true">50</attribute>
- <attribute name="serverSocketFactory">jboss.messaging:service=ServerSocketFactory,type=SSL</attribute>
+ <attribute name="pingFrequency" isParam="true">214748364</attribute>
+ <attribute name="pingWindowFactor" isParam="true">10</attribute>
+ <attribute name="onewayThreadPool">org.jboss.jms.server.remoting.DirectThreadPool</attribute>
+
+ <!-- Periodicity of client pings. Server window by default is twice this figure -->
+ <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+
+ <!-- Number of seconds to wait for a connection in the client pool to become free -->
+ <attribute name="numberOfRetries" isParam="true">10</attribute>
+
+ <!-- Max Number of connections in client pool. This should be significantly higher than
+ the max number of sessions/consumers you expect -->
+ <attribute name="clientMaxPoolSize" isParam="true">200</attribute>
+
</invoker>
<handlers>
<handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -121,40 +121,51 @@
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
int failedNodeID = state.getServerID();
- ConnectionFactoryDelegate clusteredDelegate =
- state.getClusteredConnectionFactoryDelegate();
-
- // re-try creating the connection
+
+ ConnectionFactoryDelegate clusteredDelegate = state.getClusteredConnectionFactoryDelegate();
+
+ // try recreating the connection
+ log.trace("Creating new connection");
res = clusteredDelegate.
createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
+ log.trace("Created connection");
if (res == null)
{
// Failover did not occur
failoverSuccessful = false;
+ log.trace("No failover");
}
else
{
// recursively synchronize state
ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+ log.trace("Synchronizing state");
state.getDelegate().synchronizeWith(newDelegate);
+ log.trace("Synchronized state");
+ log.trace("Opening valve");
valve.open();
+ log.trace("Opened valve");
valveOpened = true;
//Now start the connection - note! this can't be done while the valve is closed
//or it will block itself
- // start the connection again on the serverEndpoint if necessary
+ // start the connection again on the serverEndpoint if necessary
if (state.isStarted())
{
+ log.trace("Starting new connection");
newDelegate.start();
+ log.trace("Started new connection");
}
failoverSuccessful = true;
}
+ log.trace("failureDetected() complete");
+
return failoverSuccessful;
}
catch (Exception e)
@@ -167,7 +178,9 @@
{
if (!valveOpened)
{
+ log.trace("finally opening valve");
valve.open();
+ log.trace("valve opened");
}
if (failoverSuccessful)
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -215,7 +215,7 @@
private boolean shouldAck;
private boolean handleFlowControl;
private long redeliveryDelay;
-
+ private volatile int currentToken;
// Constructors ---------------------------------------------------------------------------------
@@ -261,21 +261,8 @@
// failover where a message is sent then the valve is locked, and the message send cause
// a message delivery back to the same client which tries to ack but can't get through
// the valve. This won't be necessary when we move to a non blocking transport
- this.sessionExecutor.execute(
- new Runnable()
- {
- public void run()
- {
- try
- {
- handleMessageInternal(message);
- }
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
- }
- }
- });
+
+ sessionExecutor.execute(new HandleMessageRunnable(currentToken, message));
}
public void setMessageListener(MessageListener listener) throws JMSException
@@ -546,21 +533,23 @@
/**
* Needed for failover
+ * Note this can't lock the mainLock since receive() also locks the main lock
+ * and this would prevent failover occuring when a consumer is blocked on receive()
*/
public void synchronizeWith(ClientConsumer newHandler)
{
+ currentToken++;
+
consumerID = newHandler.consumerID;
- // Clear the buffer. This way the non persistent messages that managed to arive are
- // irremendiably lost, while the peristent ones are failed-over on the server and will be
+ // Clear the buffer. This way the non persistent messages that managed to arrive are
+ // irredeemably lost, while the persistent ones are failed-over on the server and will be
// resent
- // TODO If we don't zap this buffer, we may be able to salvage some non-persistent messages
-
buffer.clear();
// need to reset toggle state
- serverSending = true;
+ serverSending = true;
}
public long getRedeliveryDelay()
@@ -623,41 +612,6 @@
}
}
- private void handleMessageInternal(Object message) throws Exception
- {
- MessageProxy proxy = (MessageProxy) message;
-
- if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
-
- synchronized (mainLock)
- {
- if (closed)
- {
- // Sanity - this should never happen - we should always wait for all deliveries to arrive
- // when closing
- throw new IllegalStateException(this + " is closed, so ignoring message");
- }
-
- proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
- proxy.getMessage().doBeforeReceive();
-
- //Add it to the buffer
- buffer.addLast(proxy, proxy.getJMSPriority());
-
- lastDeliveryId = proxy.getDeliveryId();
-
- if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
- messageAdded();
-
- if (handleFlowControl)
- {
- checkStop();
- }
- }
- }
-
private void checkStop()
{
int size = buffer.size();
@@ -885,6 +839,69 @@
}
}
+ private class HandleMessageRunnable implements Runnable
+ {
+ private int token;
+
+ private Object message;
+
+ HandleMessageRunnable(int token, Object message)
+ {
+ this.token = token;
+
+ this.message = message;
+ }
+
+ public void run()
+ {
+ try
+ {
+ MessageProxy proxy = (MessageProxy) message;
+
+ if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
+
+ synchronized (mainLock)
+ {
+ if (closed)
+ {
+ // Sanity - this should never happen - we should always wait for all deliveries to arrive
+ // when closing
+ throw new IllegalStateException(this + " is closed, so ignoring message");
+ }
+
+ if (token != currentToken)
+ {
+ //This message was queued up from before failover - we don't want to add it
+ log.info("Ignoring message " + message);
+ return;
+ }
+
+ proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+ proxy.getMessage().doBeforeReceive();
+
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
+
+ lastDeliveryId = proxy.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+ messageAdded();
+
+ if (handleFlowControl)
+ {
+ checkStop();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ }
+ }
+ }
+
/*
* This class handles the execution of onMessage methods
*/
@@ -911,24 +928,7 @@
// remove a message from the buffer
- mp = (MessageProxy)buffer.removeFirst();
-
-// if (!buffer.isEmpty())
-// {
-// //Queue up the next runner to run
-//
-// if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
-//
-// queueRunner(this);
-//
-// if (trace) { log.trace("Queued next onMessage to run"); }
-// }
-// else
-// {
-// if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
-//
-// listenerRunning = false;
-// }
+ mp = (MessageProxy)buffer.removeFirst();
}
/*
@@ -940,8 +940,7 @@
*
* Solution - don't use a session executor - have a sesion thread instead much nicer
*/
-
-
+
if (mp != null)
{
try
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -228,6 +228,8 @@
if (underlying.getMessage() != null &&
underlying.getMessage().startsWith("Can not obtain client socket connection from pool"))
{
+ log.warn("Timed out getting a connection from the pool. Try increasing clientMaxPoolSize and/or numberOfRetries " +
+ "attributes in remoting-xxx-service.xml");
failover = false;
}
}
Modified: trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -49,7 +49,7 @@
private boolean trace = log.isTraceEnabled();
- protected long high;
+ protected long high = -1;
protected long nextID;
protected int blockSize;
@@ -74,7 +74,7 @@
public synchronized long getId(ConnectionEndpoint connection) throws JMSException
{
- if (nextID == high)
+ if (nextID == high + 1)
{
getNextBlock(connection);
}
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -162,7 +162,7 @@
"with the connection to remote client ":
"trying to send a message to remote client ") +
remotingSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
- "its connection(s) or there is a network problem. All connection resources " +
+ "its connection(s) or the network has failed. All connection resources " +
"corresponding to that client process will now be removed.");
closeConsumersForClientVMID(jmsClientID);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -512,7 +512,7 @@
public void recoverDeliveries(List deliveryRecoveryInfos, String oldSessionID) throws JMSException
{
- if (trace) { log.trace(this + "recovers deliveries " + deliveryRecoveryInfos); }
+ if (trace) { log.trace(this + " recovers deliveries " + deliveryRecoveryInfos); }
try
{
@@ -1744,8 +1744,7 @@
if (rec == null)
{
- log.warn("Cannot find " + ack + " to acknowledge, " +
- "maybe it was already acknowledged before failover!");
+ log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before ");
return;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-18 15:51:20 UTC (rev 3197)
@@ -619,7 +619,10 @@
//This is to prevent overwhelming JGroups
//See http://jira.jboss.com/jira/browse/JBMESSAGING-1112
- replicateSemaphore.acquire();
+ if (reply)
+ {
+ replicateSemaphore.acquire();
+ }
try
{
@@ -651,7 +654,10 @@
}
catch (Exception e)
{
- replicateSemaphore.release();
+ if (reply)
+ {
+ replicateSemaphore.release();
+ }
throw e;
}
More information about the jboss-cvs-commits
mailing list