[jboss-cvs] JBoss Messaging SVN: r3188 - in trunk: src/main/org/jboss/jms/server/bridge and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 12 08:25:54 EDT 2007


Author: timfox
Date: 2007-10-12 08:25:53 -0400 (Fri, 12 Oct 2007)
New Revision: 3188

Added:
   trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java
   trunk/src/main/org/jboss/jms/server/bridge/DestinationFactory.java
   trunk/src/main/org/jboss/jms/server/bridge/JNDIDestinationFactory.java
   trunk/src/main/org/jboss/jms/server/bridge/JNDIFactorySupport.java
Removed:
   trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java
Modified:
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java
   trunk/src/main/org/jboss/jms/server/bridge/JNDIConnectionFactoryFactory.java
   trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-999


Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -1332,7 +1332,7 @@
       if (started)
       {
          throw new IllegalAccessException("supportsFailover can only be changed when " +
-                                          "connection factory is stopped");
+                                          "server peer is stopped");
       }
       this.supportsFailover = supportsFailover;
    }

Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -30,6 +30,7 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -165,7 +166,7 @@
    
    private boolean started;
    
-   private LinkedList messages;
+   private LinkedList<Message> messages;
    
    private Object lock;
    
@@ -173,6 +174,10 @@
    
    private ConnectionFactoryFactory targetCff;
    
+   private DestinationFactory sourceDestinationFactory;
+   
+   private DestinationFactory targetDestinationFactory;
+   
    private Connection sourceConn; 
    
    private Connection targetConn;
@@ -201,20 +206,26 @@
    
    private boolean failed;
    
-   private boolean usingXA;
+   private int forwardMode;
    
+   private static final int FORWARD_MODE_XA = 0;
+   
+   private static final int FORWARD_MODE_LOCALTX = 1;
+   
+   private static final int FORWARD_MODE_NONTX = 2;
+   
    /*
     * Constructor for MBean
     */
    public Bridge()
    {      
-      this.messages = new LinkedList();      
+      this.messages = new LinkedList<Message>();      
       
       this.lock = new Object();      
    }
    
    public Bridge(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory destCff,
-                 Destination sourceDestination, Destination targetDestination,         
+                 DestinationFactory sourceDestinationFactory, DestinationFactory targetDestinationFactory,         
                  String sourceUsername, String sourcePassword,
                  String targetUsername, String targetPassword,
                  String selector, long failureRetryInterval,
@@ -230,9 +241,9 @@
       
       this.targetCff = destCff;
       
-      this.sourceDestination = sourceDestination;
+      this.sourceDestinationFactory = sourceDestinationFactory;
       
-      this.targetDestination = targetDestination;
+      this.targetDestinationFactory = targetDestinationFactory;
       
       this.sourceUsername = sourceUsername;
       
@@ -259,7 +270,7 @@
       this.clientID = clientID;
       
       this.addMessageIDInHeader = addMessageIDInHeader;
-            
+              
       if (trace)
       {
          log.trace("Created " + this);
@@ -435,35 +446,35 @@
       
       if (trace) { log.trace("Resumed " + this); }
    }
-   
-   public Destination getSourceDestination()
+      
+   public DestinationFactory getSourceDestinationFactory()
    {
-      return sourceDestination;
+   	return sourceDestinationFactory;
    }
-   
-   public void setSourceDestination(Destination dest)
+
+   public void setSourceDestinationFactory(DestinationFactory dest)
    {
-      if (started)
-      {
-         log.warn("Cannot set SourceDestination while bridge is started");
-         return;
-      }
-      this.sourceDestination = dest;
+   	if (started)
+   	{
+   		log.warn("Cannot set SourceDestinationFactory while bridge is started");
+   		return;
+   	}
+   	sourceDestinationFactory = dest;
    }
    
-   public Destination getTargetDestination()
+   public DestinationFactory getTargetDestinationFactory()
    {
-      return targetDestination;
+   	return targetDestinationFactory;
    }
-   
-   public void setTargetDestination(Destination dest)
+
+   public void setTargetDestinationFactory(DestinationFactory dest)
    {
-      if (started)
-      {
-         log.warn("Cannot set TargetDestination while bridge is started");
-         return;
-      }
-      this.targetDestination = dest;
+   	if (started)
+   	{
+   		log.warn("Cannot set TargetDestinationFactory while bridge is started");
+   		return;
+   	}
+   	targetDestinationFactory = dest;
    }
    
    public String getSourceUsername()
@@ -704,19 +715,19 @@
    {
       if (sourceCff == null)
       {
-         throw new IllegalArgumentException("sourceCfFactory cannot be null");
+         throw new IllegalArgumentException("sourceCff cannot be null");
       }
       if (targetCff == null)
       {
-         throw new IllegalArgumentException("destCfFactory cannot be null");
+         throw new IllegalArgumentException("targetCff cannot be null");
       }
-      if (sourceDestination == null)
+      if (sourceDestinationFactory == null)
       {
-         throw new IllegalArgumentException("destSource cannot be null");
+         throw new IllegalArgumentException("sourceDestinationFactory cannot be null");
       }
-      if (targetDestination == null)
+      if (targetDestinationFactory == null)
       {
-         throw new IllegalArgumentException("destDest cannot be null");
+         throw new IllegalArgumentException("targetDestinationFactory cannot be null");
       }
       if (failureRetryInterval < 0 && failureRetryInterval != -1)
       {
@@ -813,6 +824,8 @@
       return tm;
    }
    
+   
+   
    private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
       throws Exception
    {
@@ -852,6 +865,9 @@
             conn = cf.createConnection(username, password);            
          }  
       }
+      
+      conn.setExceptionListener(new BridgeExceptionListener());
+      
       return conn;
    }
     
@@ -892,25 +908,59 @@
    {
       try
       {  
-         //Are source and target destinations on the server? If so we can get once and only once
-         //just using a local transacted session
-         boolean sourceAndTargetSameServer = sourceCff == targetCff;
+      	//Lookup the destinations
+      	sourceDestination = sourceDestinationFactory.createDestination();
+      	
+      	targetDestination = targetDestinationFactory.createDestination();
+      	      
+         if (sourceCff == targetCff)
+         {
+            //Source and target destinations are on the server - we can get once and only once
+            //just using a local transacted session
+         	//everything becomes once and only once
+         	
+         	forwardMode = FORWARD_MODE_LOCALTX;
+         }
+         else
+         {
+         	//Different servers
+         	if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+         	{
+         		//Use XA
+         		
+         		forwardMode = FORWARD_MODE_XA;
+         	}
+         	else
+         	{
+         		forwardMode = FORWARD_MODE_NONTX;
+         	}
+         }
          
+      	//Lookup the destinations
+      	sourceDestination = sourceDestinationFactory.createDestination();
+      	
+      	targetDestination = targetDestinationFactory.createDestination();
+      	      
+         
          sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff);
          
-         if (!sourceAndTargetSameServer)
+         if (forwardMode != FORWARD_MODE_LOCALTX)
          {
             targetConn = createConnection(targetUsername, targetPassword, targetCff);
+            
+          //  targetConn.setExceptionListener(exceptionListener); 
          }
                   
          if (clientID != null)
          {
             sourceConn.setClientID(clientID);
          }
+         
+        // sourceConn.setExceptionListener(exceptionListener);         
           
          Session sess;
          
-         if (sourceAndTargetSameServer)
+         if (forwardMode == FORWARD_MODE_LOCALTX)
          {
             //We simply use a single local transacted session for consuming and sending      
             
@@ -920,9 +970,7 @@
          }
          else
          {
-            //Source and destination are on different resource managers
-            
-            if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+            if (forwardMode == FORWARD_MODE_XA)
             {
                //Create an XASession for consuming from the source
                if (trace) { log.trace("Creating XA source session"); }
@@ -930,8 +978,6 @@
                sourceSession = ((XAConnection)sourceConn).createXASession();
                
                sess = ((XASession)sourceSession).getSession();
-               
-               usingXA = true;
             }
             else
             {
@@ -939,35 +985,15 @@
                
                //Create a standard session for consuming from the source
                
-               //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
-               //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
-               //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
-               //performant in some implementations that manually acking every time but it really depends
-               //on the implementation.
-               //We could also use local transacted for both the above but don't for the same reasons.
+               //We use ack mode client ack
+                              
+               sourceSession = sourceConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                
-               //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
-               //We could use local transacted, whether one is faster than the other probably depends on the
-               //messaging implementation but there's probably not much in it
-               
-               int ackMode;
-               if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
-               {
-                  ackMode = Session.AUTO_ACKNOWLEDGE;
-               }
-               else
-               {
-                  ackMode = Session.CLIENT_ACKNOWLEDGE;
- 
-               }
-               
-               sourceSession = sourceConn.createSession(false, ackMode);
-               
                sess = sourceSession;
             }
          }
          
-         if (usingXA && sourceSession instanceof JBossSession)
+         if (forwardMode == FORWARD_MODE_XA && sourceSession instanceof JBossSession)
          {
          	JBossSession jsession = (JBossSession)sourceSession;
          	
@@ -1002,9 +1028,10 @@
          
          //Now the sending session
          
-         if (!sourceAndTargetSameServer)
+         
+         if (forwardMode != FORWARD_MODE_LOCALTX)
          {            
-            if (usingXA)
+            if (forwardMode == FORWARD_MODE_XA)
             {
                if (trace) { log.trace("Creating XA dest session"); }
                
@@ -1018,21 +1045,19 @@
             {
                if (trace) { log.trace("Creating non XA dest session"); }
                
-               //Create a standard session for sending to the destination
+               //Create a standard session for sending to the target
+                                             
+               //If batch size > 1 we use a transacted session since is more efficient
                
-               //If maxBatchSize == 1 we just create a non transacted session, otherwise we
-               //create a transacted session for the send, since sending the batch in a transaction
-               //is likely to be more efficient than sending messages individually
+               boolean transacted = maxBatchSize > 1;
                
-               boolean manualCommit = maxBatchSize == 1;
+               targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
                
-               targetSession = targetConn.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               
                sess = targetSession;
             }       
          }
          
-         if (usingXA)
+         if (forwardMode == FORWARD_MODE_XA)
          {
             if (trace) { log.trace("Starting JTA transaction"); }
             
@@ -1151,124 +1176,177 @@
       //If we get here then we exceed maxRetries
       return false;      
    }
-    
-   private void sendBatch() 
+      
+   private void sendBatchNonTransacted()
    {
-      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-        
-      if (paused)
-      {
-         //Don't send now
-         if (trace) { log.trace("Paused, so not sending now"); }
-         
-         return;            
-      }
-         
-      try
+   	try
       {         
-         if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
-         {
-            //We ack *before* we send
-            if (sourceSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-            {
-               //Ack on the last message
-               ((Message)messages.getLast()).acknowledge();       
-            }
-         }
-         
-         //Now send the message(s)   
+   		if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
+   		{
+   			//We client ack before sending
+   			
+            if (trace) { log.trace("Client acking source session"); }
+               			
+            ((Message)messages.getLast()).acknowledge();
             
-         Iterator iter = messages.iterator();
+            if (trace) { log.trace("Client acked source session"); }            
+   		}
+   		   		
+         sendMessages();
          
-         Message msg = null;
-         
-         while (iter.hasNext())
+         if (maxBatchSize > 1)
          {
-            msg = (Message)iter.next();
-            
-            if (addMessageIDInHeader)
-            {
-            	addMessageIDInHeader(msg);            	
-            }
-            
-            if (trace) { log.trace("Sending message " + msg); }
-            
-            long timeToLive = msg.getJMSExpiration();
-            
-   			if (timeToLive != 0)
-   			{
-   				timeToLive -=  System.currentTimeMillis();
-   				
-   				if (timeToLive <= 0)
-   				{
-   					timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
-   				}
-   			}
-            
-   			producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
-   			
-            if (trace) { log.trace("Sent message " + msg); }                    
+         	//The sending session is transacted - we need to commit it
+         	
+            if (trace) { log.trace("Committing target session"); }
+                     	
+         	targetSession.commit();
+         	
+            if (trace) { log.trace("Committed source session"); }            
          }
          
          if (qualityOfServiceMode == QOS_DUPLICATES_OK)
-         {
-            //We ack the source message(s) after sending
+   		{
+   			//We client ack after sending
+         	
+         	//Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
+         	//For a slightly less strong delivery guarantee
+   		
+            if (trace) { log.trace("Client acking source session"); }
+               			
+            ((Message)messages.getLast()).acknowledge();
             
-            if (sourceSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-            {               
-               //Ack on the last message
-               ((Message)messages.getLast()).acknowledge();
-            }                  
-         }
+            if (trace) { log.trace("Client acked source session"); }            
+   		}
+                                         
+         //Clear the messages
+         messages.clear();            
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
+         handleFailureOnSend();                                                 
+      }	
+   }
+   
+   private void sendBatchXA()
+   {
+   	try
+      {         
+         sendMessages();
          
-         //Now we commit the sending session if necessary
-         if (targetSession != null && targetSession.getTransacted() && !usingXA)
-         {
-            if (trace) { log.trace("Committing target session"); }
+         //Commit the JTA transaction and start another
+                                 
+         delistResources(tx);
             
-            targetSession.commit();    
-            
-            if (trace) { log.trace("Committed target session"); }
-         }
+         if (trace) { log.trace("Committing JTA transaction"); }
          
-         //And commit the consuming session if necessary
-         if (sourceSession.getTransacted() && !usingXA)
-         {
-            if (trace) { log.trace("Committing source session"); }
-            
-            sourceSession.commit();
-            
-            if (trace) { log.trace("Committed source session"); }
-         }
-         
-         if (usingXA)
-         {
-            //Commit the JTA transaction and start another
-                                    
-            delistResources(tx);
-               
-            if (trace) { log.trace("Committing JTA transaction"); }
-            
-            tx.commit();
+         tx.commit();
 
-            if (trace) { log.trace("Committed JTA transaction"); }
-            
-            tx = startTx();  
-            
-            enlistResources(tx);
-         }
+         if (trace) { log.trace("Committed JTA transaction"); }
          
+         tx = startTx();  
+         
+         enlistResources(tx);
+                  
          //Clear the messages
          messages.clear();            
       }
       catch (Exception e)
       {
          log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
+         handleFailureOnSend();                                                 
+      }
+   }
+   
+   private void sendBatchLocalTx()
+   {
+   	try
+      {         
+         sendMessages();
+                     
+         if (trace) { log.trace("Committing source session"); }
          
+         sourceSession.commit();
+         
+         if (trace) { log.trace("Committed source session"); }     
+         
+         //Clear the messages
+         messages.clear();           
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
          handleFailureOnSend();                                                 
       }
    }
    
+   private void sendBatch() 
+   {
+      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
+        
+      if (paused)
+      {
+         //Don't send now
+         if (trace) { log.trace("Paused, so not sending now"); }
+         
+         return;            
+      }
+      
+      if (forwardMode == FORWARD_MODE_LOCALTX)
+      {
+      	sendBatchLocalTx();
+      }
+      else if (forwardMode == FORWARD_MODE_XA)      	
+      {
+      	sendBatchXA();
+      }
+      else
+      {
+      	sendBatchNonTransacted();
+      }
+   }
+   
+   private void sendMessages() throws Exception
+   {
+      Iterator iter = messages.iterator();
+      
+      Message msg = null;
+      
+      while (iter.hasNext())
+      {
+      	msg = (Message)iter.next();
+      	
+      	if (addMessageIDInHeader)
+         {
+         	addMessageIDInHeader(msg);            	
+         }
+         
+         if (trace) { log.trace("Sending message " + msg); }
+         
+         //Make sure the correct time to live gets propagated
+         
+         long timeToLive = msg.getJMSExpiration();
+         
+   		if (timeToLive != 0)
+   		{
+   			timeToLive -=  System.currentTimeMillis();
+   			
+   			if (timeToLive <= 0)
+   			{
+   				timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+   			}
+   		}
+         
+   		producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+   		
+         if (trace) { log.trace("Sent message " + msg); }     
+      }
+   }
+   
    private void handleFailureOnSend()
    {
       handleFailure(new FailureHandler());
@@ -1308,7 +1386,7 @@
    	
    	Enumeration en = msg.getPropertyNames();
    	
-   	Map oldProps = null;
+   	Map<String, Object> oldProps = null;
    	
    	while (en.hasMoreElements())
    	{
@@ -1316,7 +1394,7 @@
    		
    		if (oldProps == null)
    		{
-   			oldProps = new HashMap();
+   			oldProps = new HashMap<String, Object>();
    		}
    		
    		oldProps.put(propName, msg.getObjectProperty(propName));
@@ -1389,7 +1467,7 @@
 
       protected void succeeded()
       {
-         log.debug("Succeeded in reconnecting to servers");
+         log.info("Succeeded in reconnecting to servers");
          
          synchronized (lock)
          {
@@ -1456,7 +1534,7 @@
       protected void succeeded()
       {
          // Don't call super - a bit ugly in this case but better than taking the lock twice.
-         log.debug("Succeeded in connecting to servers");
+         log.info("Succeeded in connecting to servers");
          
          synchronized (lock)
          {
@@ -1564,4 +1642,24 @@
       }      
    }   
    
+   private class BridgeExceptionListener implements ExceptionListener
+   {
+		public void onException(JMSException e)
+		{
+			log.warn("Detected failure on connection", e);
+			
+			synchronized (lock)
+			{
+				if (failed)
+				{
+					//The failure has already been detected and is being handled
+					if (trace) { log.trace("Failure recovery already in progress"); }
+				}
+				else
+			   {				
+					handleFailure(new FailureHandler());
+			   }
+			}
+		}   	
+   }   
 }

Modified: trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -23,10 +23,7 @@
 
 import java.util.Properties;
 
-import javax.jms.Destination;
 import javax.management.ObjectName;
-import javax.naming.Context;
-import javax.naming.InitialContext;
 
 import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.system.ServiceMBeanSupport;
@@ -316,14 +313,6 @@
       
       Properties targetProps = (Properties)server.getAttribute(targetProviderLoader, "Properties");
       
-      Context icSource = new InitialContext(sourceProps);
-      
-      Context icTarget = new InitialContext(targetProps);
-      
-      Destination sourceDest = (Destination)icSource.lookup(sourceDestinationLookup);
-      
-      Destination targetDest = (Destination)icTarget.lookup(targetDestinationLookup);
-            
       String sourceCFRef = (String)server.getAttribute(sourceProviderLoader, "FactoryRef");
       
       String targetCFRef = (String)server.getAttribute(targetProviderLoader, "FactoryRef");
@@ -342,14 +331,18 @@
       	destCff= new JNDIConnectionFactoryFactory(targetProps, targetCFRef);
       }
       
-      bridge.setSourceDestination(sourceDest);
-      
-      bridge.setTargetDestination(targetDest);
-      
       bridge.setSourceConnectionFactoryFactory(sourceCff);
       
       bridge.setDestConnectionFactoryFactory(destCff);
       
+      DestinationFactory sourceDestinationFactory = new JNDIDestinationFactory(sourceProps, sourceDestinationLookup);
+      
+      DestinationFactory targetDestinationFactory = new JNDIDestinationFactory(targetProps, targetDestinationLookup);
+      
+      bridge.setSourceDestinationFactory(sourceDestinationFactory);
+      
+      bridge.setTargetDestinationFactory(targetDestinationFactory);
+
       bridge.start();      
       
       log.info("Started bridge " + this.getName() + ". Source: " + sourceDestinationLookup + " Target: " + targetDestinationLookup);

Deleted: trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.bridge;
-
-import javax.jms.ConnectionFactory;
-
-/**
- * A ConnectionFactoryFactory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public interface ConnectionFactoryFactory
-{
-   ConnectionFactory createConnectionFactory() throws Exception;
-}

Added: trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/bridge/ConnectionFactoryFactory.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.bridge;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * A ConnectionFactoryFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ *
+ */
+public interface ConnectionFactoryFactory
+{
+   ConnectionFactory createConnectionFactory() throws Exception;
+}

Added: trunk/src/main/org/jboss/jms/server/bridge/DestinationFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/DestinationFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/bridge/DestinationFactory.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.bridge;
+
+import javax.jms.Destination;
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public interface DestinationFactory
+{
+	Destination createDestination() throws Exception;
+}

Modified: trunk/src/main/org/jboss/jms/server/bridge/JNDIConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/JNDIConnectionFactoryFactory.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/server/bridge/JNDIConnectionFactoryFactory.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -24,7 +24,6 @@
 import java.util.Hashtable;
 
 import javax.jms.ConnectionFactory;
-import javax.naming.InitialContext;
 
 /**
  * A JNDIConnectionFactoryFactory
@@ -35,46 +34,16 @@
  * $Id$
  *
  */
-public class JNDIConnectionFactoryFactory implements ConnectionFactoryFactory
+public class JNDIConnectionFactoryFactory extends JNDIFactorySupport implements ConnectionFactoryFactory
 {
-   private Hashtable jndiProperties;
-   
-   private String lookup;
-   
-   public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
+	public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
    {
-      this.jndiProperties = jndiProperties;
-      
-      this.lookup = lookup;       
+      super(jndiProperties, lookup);      
    }
 
    public ConnectionFactory createConnectionFactory() throws Exception
    {
-      InitialContext ic = null;
-      
-      ConnectionFactory cf = null;
-      
-      try
-      {
-         if (jndiProperties == null)
-         {
-            ic = new InitialContext();
-         }
-         else
-         {
-            ic = new InitialContext(jndiProperties);
-         }
-         
-         cf = (ConnectionFactory)ic.lookup(lookup);         
-      }
-      finally
-      {
-         if (ic != null)
-         {
-            ic.close();
-         }
-      }
-      return cf;      
+   	return (ConnectionFactory)createObject();   	
    }
 
 }

Added: trunk/src/main/org/jboss/jms/server/bridge/JNDIDestinationFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/JNDIDestinationFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/bridge/JNDIDestinationFactory.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.bridge;
+
+import java.util.Hashtable;
+
+import javax.jms.Destination;
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public class JNDIDestinationFactory extends JNDIFactorySupport implements DestinationFactory
+{
+   public JNDIDestinationFactory(Hashtable jndiProperties, String lookup)
+   {
+      super(jndiProperties, lookup);      
+   }
+
+   public Destination createDestination() throws Exception
+   {
+   	return (Destination)createObject();   	
+   }
+}

Added: trunk/src/main/org/jboss/jms/server/bridge/JNDIFactorySupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/JNDIFactorySupport.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/bridge/JNDIFactorySupport.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.bridge;
+
+import java.util.Hashtable;
+
+import javax.naming.InitialContext;
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public abstract class JNDIFactorySupport
+{
+	protected Hashtable jndiProperties;
+   
+   protected String lookup;
+   
+   protected JNDIFactorySupport(Hashtable jndiProperties, String lookup)
+   {
+      this.jndiProperties = jndiProperties;
+      
+      this.lookup = lookup;       
+   }
+
+   protected Object createObject() throws Exception
+   {
+      InitialContext ic = null;
+      
+      Object obj = null;
+      
+      try
+      {
+         if (jndiProperties == null)
+         {
+            ic = new InitialContext();
+         }
+         else
+         {
+            ic = new InitialContext(jndiProperties);
+         }
+         
+         obj = ic.lookup(lookup);         
+      }
+      finally
+      {
+         if (ic != null)
+         {
+            ic.close();
+         }
+      }
+      return obj;      
+   }
+}

Modified: trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -90,9 +90,18 @@
          advised.send(msg, checkForDuplicates, sequence);
       }
       else
-      {
-      	//Since NP messages are sent one way, there is a possibility the session has closed
-      	//by the time the message arrives, so we ignore this
+      {      	
+      	if (sequence == -1)
+      	{
+      		//Persistent message
+
+      		throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+      	}
+      	else
+      	{
+      		// Since NP messages are sent one way, there is a possibility the session has closed
+         	//by the time the message arrives, so we ignore this
+      	}
       }
       
       return null;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -100,7 +100,7 @@
    	try
    	{
 	      on = deployBridge(0, "Bridge1", sourceProviderLoader, targetProviderLoader,
-	                                   "/queue/sourceQueue", "/queue/destQueue",
+	                                   "/queue/sourceQueue", "/queue/targetQueue",
 	                                   null, null, null, null,
 	                                   Bridge.QOS_AT_MOST_ONCE, null, 1,
 	                                   -1, null, null, 5000, -1, false);
@@ -126,7 +126,7 @@
          
          Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons = sessTarget.createConsumer(destQueue);
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
@@ -137,7 +137,7 @@
          
          //It's stopped so no messages should be received
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
          
          //Start it
          
@@ -156,7 +156,7 @@
             assertEquals("message" + i, tm.getText());
          }
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
                   
          //Send some more
          
@@ -178,7 +178,7 @@
             assertEquals("message" + i, tm.getText());
          }
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
          
          //Pause it
          
@@ -199,7 +199,7 @@
          
          //These shouldn't be received
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
          
          // Resume
          
@@ -216,7 +216,7 @@
             assertEquals("message" + i, tm.getText());
          }
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
          
          isPaused = ((Boolean)ServerManagement.getAttribute(on, "Paused")).booleanValue();
          
@@ -264,7 +264,7 @@
       try
       {                
          on = deployBridge(0, "Bridge2", sourceProviderLoader, targetProviderLoader,
-                           "/queue/sourceQueue", "/queue/destQueue",
+                           "/queue/sourceQueue", "/queue/targetQueue",
                            null, null, null, null,
                            Bridge.QOS_ONCE_AND_ONLY_ONCE, null, 1,
                            -1, null, null, 5000, -1, false);
@@ -305,11 +305,11 @@
          
          {
             String destLookup = (String)ServerManagement.getAttribute(on, "TargetDestinationLookup");
-            assertEquals("/queue/destQueue", destLookup);
+            assertEquals("/queue/targetQueue", destLookup);
             ServerManagement.setAttribute(on, "TargetDestinationLookup", "/queue/WibbleQueue");
             destLookup = (String)ServerManagement.getAttribute(on, "TargetDestinationLookup");
             assertEquals("/queue/WibbleQueue", destLookup);
-            ServerManagement.setAttribute(on, "TargetDestinationLookup", "/queue/destQueue");
+            ServerManagement.setAttribute(on, "TargetDestinationLookup", "/queue/targetQueue");
          }
          
          {
@@ -451,10 +451,10 @@
          
          {
             String destLookup = (String)ServerManagement.getAttribute(on, "TargetDestinationLookup");
-            assertEquals("/queue/destQueue", destLookup);
+            assertEquals("/queue/targetQueue", destLookup);
             ServerManagement.setAttribute(on, "TargetDestinationLookup", "/queue/WibbleQueue");
             destLookup = (String)ServerManagement.getAttribute(on, "TargetDestinationLookup");
-            assertEquals("/queue/destQueue", destLookup);
+            assertEquals("/queue/targetQueue", destLookup);
          }
          
          {
@@ -563,7 +563,7 @@
          log.trace("Checking bridged bridge");
          
          checkBridged(icSource, icTarget, "/ConnectionFactory", "/ConnectionFactory",
-                      "/queue/sourceQueue", "/queue/destQueue");
+                      "/queue/sourceQueue", "/queue/targetQueue");
          
          log.trace("Checked bridge");
          

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -366,7 +366,7 @@
          
          try
          {
-            bridge= new Bridge(null, cff1, sourceQueue, destQueue,
+            bridge= new Bridge(null, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -379,7 +379,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, null, sourceQueue, destQueue,
+            bridge= new Bridge(cff0, null, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -392,7 +392,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, null, destQueue,
+            bridge= new Bridge(cff0, cff1, null, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -405,7 +405,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, null,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -418,7 +418,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, destQueue,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, -2, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -431,7 +431,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, destQueue,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, -1, 10, qosMode,
                                batchSize, maxBatchTime,
@@ -444,7 +444,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, null,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, -2,
                                batchSize, maxBatchTime,
@@ -457,7 +457,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, null,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, 3,
                                batchSize, maxBatchTime,
@@ -470,7 +470,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, null,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, 3,
                                0, maxBatchTime,
@@ -483,7 +483,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueue, null,
+            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, 3,
                                batchSize, -2,
@@ -517,7 +517,7 @@
          
          String selector = "vegetable='radish'";
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   selector, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -551,7 +551,7 @@
          
          Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons = sessRec.createConsumer(destQueue);
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
          
          connTarget.start();
                                  
@@ -611,7 +611,7 @@
            
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -621,7 +621,7 @@
          
          this.sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false);
             
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);                          
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);                          
       }
       finally
       {      
@@ -663,7 +663,7 @@
       {   
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -673,7 +673,7 @@
             
          sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false);
          
-         checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);                    
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);                    
       }
       finally
       {                        
@@ -692,7 +692,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -702,7 +702,7 @@
             
          sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true);
          
-         checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);              
+         checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);              
       }
       finally
       {                      
@@ -742,7 +742,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -785,7 +785,7 @@
          
          Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons = sessTarget.createConsumer(destQueue);
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
          
          connTarget.start();
          
@@ -913,7 +913,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -931,7 +931,7 @@
          
          Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons = sessTarget.createConsumer(destQueue);
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
          
          connTarget.start();
          
@@ -1031,7 +1031,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
@@ -1065,7 +1065,7 @@
          
          Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons = sessTarget.createConsumer(destQueue);
+         MessageConsumer cons = sessTarget.createConsumer(targetQueue);
          
          connTarget.start();
          
@@ -1118,7 +1118,7 @@
             
       try
       {      
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   batchSize, -1,
@@ -1144,7 +1144,7 @@
          
          t.start();
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);
                                               
          t.join();
          
@@ -1191,7 +1191,7 @@
             
       try
       {      
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   2, maxBatchTime,
@@ -1217,7 +1217,7 @@
          
          t.start();
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);
                                               
          t.join();
          
@@ -1265,7 +1265,7 @@
             
       try
       {  
-         bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
+         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   batchSize, -1,
@@ -1291,7 +1291,7 @@
          
          t.start();
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES);
                          
          t.join();
          
@@ -1337,7 +1337,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   NUM_MESSAGES, -1,
@@ -1351,7 +1351,7 @@
                          
          //Verify none are received
          
-         this.checkEmpty(destQueue, 1);
+         this.checkEmpty(targetQueue, 1);
          
          //Send the other half
          
@@ -1359,7 +1359,7 @@
          
          //This should now be receivable
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);
          
          //Send another batch with one more than batch size
          
@@ -1367,15 +1367,15 @@
                   
          //Make sure only batch size are received
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);
          
          //Final batch
          
          this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent);
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, NUM_MESSAGES, 1);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, NUM_MESSAGES, 1);
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES - 1);
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES - 1);
       }
       finally
       {      
@@ -1395,7 +1395,7 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
+         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   NUM_MESSAGES, -1,
@@ -1405,7 +1405,7 @@
             
          this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
          
-         this.checkEmpty(destQueue, 1);                
+         this.checkEmpty(targetQueue, 1);                
          
          //Send the other half
          
@@ -1414,9 +1414,9 @@
          
          //This should now be receivable
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES);
          
-         this.checkEmpty(localDestQueue, 0);
+         this.checkEmpty(localTargetQueue, 0);
          
          this.checkEmpty(sourceQueue, 0);
          
@@ -1426,15 +1426,15 @@
          
          //Make sure only batch size are received
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES);
          
          //Final batch
          
          this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent);
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, NUM_MESSAGES, 1);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, NUM_MESSAGES, 1);
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES - 1);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES - 1);
       }
       finally
       {               
@@ -1455,7 +1455,7 @@
          
          final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 3000, 10, qosMode,
                   MAX_BATCH_SIZE, MAX_BATCH_TIME,
@@ -1471,11 +1471,11 @@
          
          //Verify none are received
          
-         this.checkEmpty(destQueue, 1);
+         this.checkEmpty(targetQueue, 1);
          
          //Messages should now be receivable
          
-         this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);         
+         this.checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES);         
       }
       finally
       {      
@@ -1496,7 +1496,7 @@
          
          final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
          
-         bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
+         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 3000, 10, qosMode,
                   MAX_BATCH_SIZE, MAX_BATCH_TIME,
@@ -1514,11 +1514,11 @@
          
          //Verify none are received
          
-         this.checkEmpty(localDestQueue, 0);;
+         this.checkEmpty(localTargetQueue, 0);;
          
          //Messages should now be receivable
          
-         this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
+         this.checkAllMessageReceivedInOrder(cf0, localTargetQueue, 0, NUM_MESSAGES);
       }
       finally
       {              

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -39,7 +39,9 @@
 
 import org.jboss.jms.server.bridge.Bridge;
 import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
+import org.jboss.jms.server.bridge.DestinationFactory;
 import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
+import org.jboss.jms.server.bridge.JNDIDestinationFactory;
 import org.jboss.logging.Logger;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -65,8 +67,10 @@
    
    protected static ConnectionFactory cf0, cf1;
    
-   protected static Queue sourceQueue, destQueue, localDestQueue;
+   protected static DestinationFactory sourceQueueFactory, targetQueueFactory, localTargetQueueFactory, sourceTopicFactory;
    
+   protected static Queue sourceQueue, targetQueue, localTargetQueue;
+   
    protected static Topic sourceTopic;
    
    protected static boolean firstTime = true;
@@ -92,9 +96,9 @@
 
       	ServerManagement.deployTopic("sourceTopic", 0);  
 
-      	ServerManagement.deployQueue("localDestQueue", 0);
+      	ServerManagement.deployQueue("localTargetQueue", 0);
 
-      	ServerManagement.deployQueue("destQueue", 1);     
+      	ServerManagement.deployQueue("targetQueue", 1);     
       	
       	setUpAdministeredObjects();
       	
@@ -114,12 +118,10 @@
    protected void tearDown() throws Exception
    {       
       super.tearDown(); 
-                  
-      //sc.stop();  
-      
+                             
       checkEmpty(sourceQueue);
-      checkEmpty(localDestQueue);
-      checkEmpty(destQueue, 1);
+      checkEmpty(localTargetQueue);
+      checkEmpty(targetQueue, 1);
       
       // Check no subscriptions left lying around
             
@@ -147,13 +149,21 @@
          
          cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
          
-         sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+         sourceQueueFactory = new JNDIDestinationFactory(props0, "/queue/sourceQueue");
          
-         destQueue = (Queue)ic1.lookup("/queue/destQueue");
+         sourceQueue = (Queue)sourceQueueFactory.createDestination();
          
-         sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
+         targetQueueFactory = new JNDIDestinationFactory(props1, "/queue/targetQueue");
          
-         localDestQueue = (Queue)ic0.lookup("/queue/localDestQueue");         
+         targetQueue = (Queue)targetQueueFactory.createDestination();
+         
+         sourceTopicFactory = new JNDIDestinationFactory(props0, "/topic/sourceTopic");
+         
+         sourceTopic = (Topic)sourceTopicFactory.createDestination();
+         
+         localTargetQueueFactory = new JNDIDestinationFactory(props0, "/queue/localTargetQueue"); 
+         
+         localTargetQueue = (Queue)localTargetQueueFactory.createDestination();
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -97,7 +97,7 @@
    {
       ServerManagement.kill(1);
 
-      Bridge bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+      Bridge bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
             null, null, null, null,
             null, 1000, -1, Bridge.QOS_DUPLICATES_OK,
             10, -1,
@@ -110,7 +110,7 @@
          assertTrue(bridge.isFailed());
 
          ServerManagement.start(1, "all", false);
-         ServerManagement.deployQueue("destQueue", 1);         
+         ServerManagement.deployQueue("targetQueue", 1);         
          setUpAdministeredObjects();
          
          Thread.sleep(3000);
@@ -144,7 +144,7 @@
          
       try
       {   
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 1000, -1, qosMode,
                   10, -1,
@@ -160,7 +160,7 @@
          
          //Verify none are received
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
          
          //Now crash the dest server
          
@@ -179,7 +179,7 @@
          
          ServerManagement.start(1, "all", false);
          
-         ServerManagement.deployQueue("destQueue", 1);
+         ServerManagement.deployQueue("targetQueue", 1);
          
          setUpAdministeredObjects();
          
@@ -191,7 +191,7 @@
          
          log.info("Sent messages");
          
-         checkMessagesReceived(cf1, destQueue, qosMode, NUM_MESSAGES, false);                  
+         checkMessagesReceived(cf1, targetQueue, qosMode, NUM_MESSAGES, false);                  
       }
       finally
       {      
@@ -224,7 +224,7 @@
             
       try
       {
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
                   10, 5000,
@@ -239,7 +239,7 @@
                   
          //verify none are received
          
-         checkEmpty(destQueue, 1);
+         checkEmpty(targetQueue, 1);
                   
          //Now crash the dest server
          
@@ -256,13 +256,13 @@
          
          ServerManagement.start(1, "all", false);
          
-         ServerManagement.deployQueue("destQueue", 1);
+         ServerManagement.deployQueue("targetQueue", 1);
          
          setUpAdministeredObjects();
          
          sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
                            
-         checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);         
+         checkMessagesReceived(cf1, targetQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);         
       }
       finally
       {      

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java	2007-10-11 16:46:54 UTC (rev 3187)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java	2007-10-12 12:25:53 UTC (rev 3188)
@@ -89,7 +89,7 @@
       {
          final int NUM_MESSAGES = 10;         
          
-         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
                   NUM_MESSAGES, -1,
@@ -123,7 +123,7 @@
          
          log.info("Restarted server");    
          
-         ServerManagement.deployQueue("destQueue", 1);
+         ServerManagement.deployQueue("targetQueue", 1);
                   
          this.setUpAdministeredObjects();
                         
@@ -132,7 +132,7 @@
          log.info("*** waiting for recovery");
              
          //There may be a long wait for the first time (need to let recovery kick in)
-         checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, true);
+         checkMessagesReceived(cf1, targetQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, true);
       }
       finally
       {      




More information about the jboss-cvs-commits mailing list