[jboss-cvs] JBoss Messaging SVN: r3197 - in trunk/src: main/org/jboss/jms/client and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 18 11:51:21 EDT 2007


Author: timfox
Date: 2007-10-18 11:51:20 -0400 (Thu, 18 Oct 2007)
New Revision: 3197

Modified:
   trunk/src/etc/remoting/remoting-http-service.xml
   trunk/src/etc/remoting/remoting-sslbisocket-service.xml
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Few tweaks etc


Modified: trunk/src/etc/remoting/remoting-http-service.xml
===================================================================
--- trunk/src/etc/remoting/remoting-http-service.xml	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/etc/remoting/remoting-http-service.xml	2007-10-18 15:51:20 UTC (rev 3197)
@@ -14,14 +14,16 @@
       <attribute name="Configuration">
          <config>
             <invoker transport="http">
+               <!-- There should be no reason to change these parameters - warning!
+                    Changing them may stop JBoss Messaging working correctly -->
                <attribute name="marshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
                <attribute name="unmarshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
                <attribute name="dataType" isParam="true">jms</attribute>
                <attribute name="serverBindAddress">${jboss.bind.address}</attribute>
                <attribute name="serverBindPort">4458</attribute>
-               <attribute name="leasePeriod">10000</attribute>
                <attribute name="socket.check_connection" isParam="true">false</attribute>
                <attribute name="callbackStore">org.jboss.remoting.callback.BlockingCallbackStore</attribute>
+               
                <attribute name="callbackPollPeriod" isParam="true">102</attribute>
                <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
             </invoker>

Modified: trunk/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- trunk/src/etc/remoting/remoting-sslbisocket-service.xml	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/etc/remoting/remoting-sslbisocket-service.xml	2007-10-18 15:51:20 UTC (rev 3197)
@@ -14,21 +14,35 @@
           display-name="SSL Bisocket Transport Connector">
       <attribute name="Configuration">
          <config>
-            <invoker transport="sslbisocket">            
+            <invoker transport="sslbisocket">   
+            
+               <!-- There should be no reason to change these parameters - warning!
+                    Changing them may stop JBoss Messaging working correctly -->          
                <attribute name="marshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
                <attribute name="unmarshaller" isParam="true">org.jboss.jms.wireformat.JMSWireFormat</attribute>
                <attribute name="dataType" isParam="true">jms</attribute>
                <attribute name="socket.check_connection" isParam="true">false</attribute>
                <attribute name="timeout" isParam="true">0</attribute>
                <attribute name="serverBindAddress">${jboss.bind.address}</attribute>
-               <attribute name="serverBindPort">5457</attribute>
-               <attribute name="leasePeriod">10000</attribute>
+               <attribute name="serverBindPort">5457</attribute>               
                <attribute name="clientSocketClass" isParam="true">org.jboss.jms.client.remoting.ClientSocketWrapper</attribute>
                <attribute name="serverSocketClass">org.jboss.jms.server.remoting.ServerSocketWrapper</attribute>
-               <attribute name="numberOfRetries" isParam="true">1</attribute>
+               <attribute name="serverSocketFactory">jboss.messaging:service=ServerSocketFactory,type=SSL</attribute>
                <attribute name="numberOfCallRetries" isParam="true">1</attribute>
-               <attribute name="clientMaxPoolSize" isParam="true">50</attribute>
-               <attribute name="serverSocketFactory">jboss.messaging:service=ServerSocketFactory,type=SSL</attribute>
+               <attribute name="pingFrequency" isParam="true">214748364</attribute>
+               <attribute name="pingWindowFactor" isParam="true">10</attribute>
+               <attribute name="onewayThreadPool">org.jboss.jms.server.remoting.DirectThreadPool</attribute>
+                              
+               <!-- Periodicity of client pings. Server window by default is twice this figure -->                               
+               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+
+               <!-- Number of seconds to wait for a connection in the client pool to become free -->
+               <attribute name="numberOfRetries" isParam="true">10</attribute>
+
+               <!-- Max Number of connections in client pool. This should be significantly higher than
+                    the max number of sessions/consumers you expect -->
+               <attribute name="clientMaxPoolSize" isParam="true">200</attribute> 
+               
             </invoker>
             <handlers>
                <handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -121,40 +121,51 @@
          broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));        
          
          int failedNodeID = state.getServerID();
-         ConnectionFactoryDelegate clusteredDelegate =
-            state.getClusteredConnectionFactoryDelegate();
-         
-         // re-try creating the connection
+                  
+         ConnectionFactoryDelegate clusteredDelegate = state.getClusteredConnectionFactoryDelegate();
+                           
+         // try recreating the connection
+         log.trace("Creating new connection");
          res = clusteredDelegate.
             createConnectionDelegate(state.getUsername(), state.getPassword(), failedNodeID);
+         log.trace("Created connection");
          
          if (res == null)
          {
             // Failover did not occur
             failoverSuccessful = false;
+            log.trace("No failover");
          }
          else
          {      
             // recursively synchronize state
             ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
             
+            log.trace("Synchronizing state");
             state.getDelegate().synchronizeWith(newDelegate);
+            log.trace("Synchronized state");
                            
+            log.trace("Opening valve");
             valve.open();
+            log.trace("Opened valve");
             valveOpened = true;
             
             //Now start the connection - note! this can't be done while the valve is closed
             //or it will block itself
             
-            // start the connection again on the serverEndpoint if necessary
+            // start the connection again on the serverEndpoint if necessary            
             if (state.isStarted())
             {
+            	log.trace("Starting new connection");
                newDelegate.start();
+               log.trace("Started new connection");
             }
             
             failoverSuccessful = true;                        
          }
          
+         log.trace("failureDetected() complete");
+         
          return failoverSuccessful;
       }
       catch (Exception e)
@@ -167,7 +178,9 @@
       {
          if (!valveOpened)
          {
+         	log.trace("finally opening valve");
             valve.open();
+            log.trace("valve opened");
          }
 
          if (failoverSuccessful)

Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -215,7 +215,7 @@
    private boolean shouldAck;
    private boolean handleFlowControl;
    private long redeliveryDelay;
-        
+   private volatile int currentToken;
    
    // Constructors ---------------------------------------------------------------------------------
 
@@ -261,21 +261,8 @@
       //       failover where a message is sent then the valve is locked, and the message send cause
       //       a message delivery back to the same client which tries to ack but can't get through
       //       the valve. This won't be necessary when we move to a non blocking transport
-      this.sessionExecutor.execute(
-         new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  handleMessageInternal(message);
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to handle message", e);
-               }
-            }
-         });
+   	
+      sessionExecutor.execute(new HandleMessageRunnable(currentToken, message));         
    }
    
    public void setMessageListener(MessageListener listener) throws JMSException
@@ -546,21 +533,23 @@
 
    /**
     * Needed for failover
+    * Note this can't lock the mainLock since receive() also locks the main lock
+    * and this would prevent failover occuring when a consumer is blocked on receive()
     */
    public void synchronizeWith(ClientConsumer newHandler)
    {
+      currentToken++;
+   	
       consumerID = newHandler.consumerID;
 
-      // Clear the buffer. This way the non persistent messages that managed to arive are
-      // irremendiably lost, while the peristent ones are failed-over on the server and will be
+      // Clear the buffer. This way the non persistent messages that managed to arrive are
+      // irredeemably lost, while the persistent ones are failed-over on the server and will be
       // resent
 
-      // TODO If we don't zap this buffer, we may be able to salvage some non-persistent messages
-
       buffer.clear();
       
       // need to reset toggle state
-      serverSending = true;      
+      serverSending = true;
    }
    
    public long getRedeliveryDelay()
@@ -623,41 +612,6 @@
       }
    }
    
-   private void handleMessageInternal(Object message) throws Exception
-   {
-      MessageProxy proxy = (MessageProxy) message;
-
-      if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
-
-      synchronized (mainLock)
-      {
-         if (closed)
-         {
-            // Sanity - this should never happen - we should always wait for all deliveries to arrive
-            // when closing
-            throw new IllegalStateException(this + " is closed, so ignoring message");
-         }
-
-         proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
-         proxy.getMessage().doBeforeReceive();
-
-         //Add it to the buffer
-         buffer.addLast(proxy, proxy.getJMSPriority());
-
-         lastDeliveryId = proxy.getDeliveryId();
-         
-         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
-         messageAdded();
-
-         if (handleFlowControl)
-         {
-         	checkStop();
-         }
-      }
-   }
-
    private void checkStop()
    {
       int size = buffer.size();
@@ -885,6 +839,69 @@
       }
    }
    
+   private class HandleMessageRunnable implements Runnable
+   {
+   	private int token;
+   	
+   	private Object message;
+   	
+   	HandleMessageRunnable(int token, Object message)
+   	{
+   		this.token = token;
+   		
+   		this.message = message;
+   	}
+   	
+   	public void run()
+      {
+         try
+         {
+         	 MessageProxy proxy = (MessageProxy) message;
+
+             if (trace) { log.trace(this + " receiving message " + proxy + " from the remoting layer"); }
+
+             synchronized (mainLock)
+             {
+                if (closed)
+                {
+                   // Sanity - this should never happen - we should always wait for all deliveries to arrive
+                   // when closing
+                   throw new IllegalStateException(this + " is closed, so ignoring message");
+                }
+                
+                if (token != currentToken)
+                {
+               	 //This message was queued up from before failover - we don't want to add it
+               	 log.info("Ignoring message " + message);
+               	 return;
+                }
+                
+                proxy.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+                proxy.getMessage().doBeforeReceive();
+
+                //Add it to the buffer
+                buffer.addLast(proxy, proxy.getJMSPriority());
+
+                lastDeliveryId = proxy.getDeliveryId();
+                
+                if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+                messageAdded();
+
+                if (handleFlowControl)
+                {
+                	checkStop();
+                }
+             }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to handle message", e);
+         }
+      }
+   }
+   
    /*
     * This class handles the execution of onMessage methods
     */
@@ -911,24 +928,7 @@
             
             // remove a message from the buffer
 
-            mp = (MessageProxy)buffer.removeFirst();
-                          
-//            if (!buffer.isEmpty())
-//            {
-//            	//Queue up the next runner to run
-//            	
-//            	if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
-//            	
-//            	queueRunner(this);
-//            	
-//            	if (trace) { log.trace("Queued next onMessage to run"); }
-//            }
-//            else
-//            {
-//            	if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
-//            	
-//            	listenerRunning  = false;
-//            }               
+            mp = (MessageProxy)buffer.removeFirst();                                       
          }
          
          /*
@@ -940,8 +940,7 @@
           * 
           * Solution - don't use a session executor - have a sesion thread instead much nicer
           */
-         
-                        
+                                
          if (mp != null)
          {
             try

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -228,6 +228,8 @@
       		if (underlying.getMessage() != null &&
       			 underlying.getMessage().startsWith("Can not obtain client socket connection from pool"))
       		{
+      			log.warn("Timed out getting a connection from the pool. Try increasing clientMaxPoolSize and/or numberOfRetries " +
+      					   "attributes in remoting-xxx-service.xml");
       			failover = false;
       		}
       	}

Modified: trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/message/MessageIdGenerator.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -49,7 +49,7 @@
 
    private boolean trace = log.isTraceEnabled();
 
-   protected long high;
+   protected long high = -1;
    protected long nextID;
    protected int blockSize;
 
@@ -74,7 +74,7 @@
 
    public synchronized long getId(ConnectionEndpoint connection) throws JMSException
    {
-      if (nextID == high)
+      if (nextID == high + 1)
       {
          getNextBlock(connection);
       }

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -162,7 +162,7 @@
             "with the connection to remote client ":
             "trying to send a message to remote client ") +
          remotingSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
-         "its connection(s) or there is a network problem. All connection resources " +
+         "its connection(s) or the network has failed. All connection resources " +
          "corresponding to that client process will now be removed.");
 
       closeConsumersForClientVMID(jmsClientID);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -512,7 +512,7 @@
    
    public void recoverDeliveries(List deliveryRecoveryInfos, String oldSessionID) throws JMSException
    {
-      if (trace) { log.trace(this + "recovers deliveries " + deliveryRecoveryInfos); }
+      if (trace) { log.trace(this + " recovers deliveries " + deliveryRecoveryInfos); }
 
       try
       {
@@ -1744,8 +1744,7 @@
       
       if (rec == null)
       {
-         log.warn("Cannot find " + ack + " to acknowledge, " +
-            "maybe it was already acknowledged before failover!");
+         log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before ");
          return;
       }
       

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-18 09:13:13 UTC (rev 3196)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-18 15:51:20 UTC (rev 3197)
@@ -619,7 +619,10 @@
    	//This is to prevent overwhelming JGroups
    	//See http://jira.jboss.com/jira/browse/JBMESSAGING-1112
    	
-   	replicateSemaphore.acquire();
+   	if (reply)
+   	{
+   		replicateSemaphore.acquire();
+   	}
    	
    	try
    	{	   	   	
@@ -651,7 +654,10 @@
    	}
    	catch (Exception e)
    	{
-   		replicateSemaphore.release();
+   		if (reply)
+   		{
+   			replicateSemaphore.release();
+   		}
    		
    		throw e;
    	}




More information about the jboss-cvs-commits mailing list