[jboss-cvs] JBoss Messaging SVN: r1960 - in branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742: tests/src/org/jboss/test/messaging/jms and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 11 12:27:24 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-01-11 12:27:19 -0500 (Thu, 11 Jan 2007)
New Revision: 1960

Modified:
   branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
   branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java
   branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-742 - fix

Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java	2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java	2007-01-11 17:27:19 UTC (rev 1960)
@@ -1,6 +1,6 @@
 /*
  * JBoss, the OpenSource J2EE webOS
- * 
+ *
  * Distributable under LGPL license.
  * See terms of license at gnu.org.
  */
@@ -70,100 +70,100 @@
 public class JDBCChannelMapper extends ServiceMBeanSupport implements ChannelMapper
 {
    // Constants -----------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(JDBCChannelMapper.class);
-   
+
    protected static final char TYPE_QUEUE = 'Q';
-   
+
    protected static final char TYPE_TOPIC = 'T';
-   
+
    protected static final char TYPE_DURABLE_SUB = 'D';
-   
+
    //========================================
-   
+
    //FIXME - user-role table management should be handled by a different service
    //It doesn't belong here
-   
+
    private String createUserTable =
       "CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128),"
       + " PRIMARY KEY(USERID))";
-   
+
    private String createRoleTable = "CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL,"
       + " PRIMARY KEY(USERID, ROLEID))";
-   
-   private String selectPreConfClientId = 
+
+   private String selectPreConfClientId =
       "SELECT CLIENTID FROM JMS_USER WHERE USERID=?";
-   
-   
+
+
    // =============================================================================
-   
-   private String createMappingTable = 
+
+   private String createMappingTable =
       "CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), " +
       "JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), " +
       "CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))";
-   
-   private String insertMapping = 
+
+   private String insertMapping =
       "INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) " +
       "VALUES (?, ?, ?, ?, ?, ?, ?)";
-   
+
    private String deleteMapping =
       "DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?";
-   
-   private String selectIdForDestination = 
+
+   private String selectIdForDestination =
       "SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?";
-   
-   private String selectDurableSub = 
+
+   private String selectDurableSub =
       "SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?";
-   
-   private String selectSubscriptionsForTopic = 
+
+   private String selectSubscriptionsForTopic =
       "SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND J" +
       "MS_DEST_NAME=?";
-   
+
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
-   
+
    // Map<clientID - Map<subscriptionName - CoreDurableSubscription>>
    protected Map subscriptions;
-   
+
    // Map<name - Queue>
    protected Map queues;
-   
+
    // Map<name - Topic>
    protected Map topics;
-   
+
    // Map<id - JBossDestination>
    protected Map idMap;
-   
+
    protected String dataSourceJNDIName;
    protected DataSource ds;
    protected ObjectName tmObjectName;
 
    protected IdManager channelIDManager;
-   
+
    protected boolean createTablesOnStartup = true;
    protected Properties sqlProperties;
    protected List populateTables;
-   
+
    protected QueuedExecutorPool queuedExecutorPool;
 
    // Constructors --------------------------------------------------
-   
+
    public JDBCChannelMapper()
    {
       subscriptions = new ConcurrentReaderHashMap();
-      
+
       queues = new ConcurrentReaderHashMap();
-      
+
       topics = new ConcurrentReaderHashMap();
-      
+
       idMap = new ConcurrentReaderHashMap();
-      
+
       sqlProperties = new Properties();
-      
+
       populateTables = new ArrayList();
    }
-   
+
    /*
     * Only to be used by tests to create an instance
     */
@@ -177,23 +177,23 @@
    }
 
    //Injection
-   
+
    //TODO
    //Since channel mapper requires knowledge of the persistence manager anyway so it can get ids for new channels
    //there seems to me no reason why channel mapper should handle it's own persistence
    //IMHO all persistence should be handled by the pm
    public void setPersistenceManager(PersistenceManager pm) throws Exception
-   {      
+   {
       this.channelIDManager = new IdManager("CHANNEL_ID", 10, pm);
    }
-   
+
    public void setQueuedExecutorPool(QueuedExecutorPool pool) throws Exception
    {
       this.queuedExecutorPool = pool;
    }
-   
+
    // ServiceMBeanSupport overrides ---------------------------------
-   
+
    protected void startService() throws Exception
    {
       try
@@ -204,67 +204,67 @@
             ds = (DataSource)ic.lookup(dataSourceJNDIName);
             ic.close();
          }
-         
+
          if (ds == null)
          {
             throw new IllegalStateException("No DataSource found. This service dependencies must " +
             "have not been enforced correctly!");
          }
-   
+
          initSqlProperties();
-         
+
          if (createTablesOnStartup)
          {
             createSchema();
          }
-         
+
          log.debug(this + " started");
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
-      } 
+      }
    }
-   
+
    protected void stopService() throws Exception
    {
       log.debug(this + " stopped");
    }
-   
+
    // DurableSubscriptionStore implementation ---------------
-   
+
    public Object getInstance()
    {
       return this;
    }
-   
+
    public CoreDestination getCoreDestination(JBossDestination jbDest)
    {
       return getCoreDestinationInternal(jbDest.isQueue(), jbDest.getName());
    }
-   
+
    public JBossDestination getJBossDestination(long coreDestinationId)
    {
       return (JBossDestination)idMap.get(new Long(coreDestinationId));
-   }      
-    
-   public void deployCoreDestination(boolean isQueue, 
+   }
+
+   public void deployCoreDestination(boolean isQueue,
                                      String destName,
-                                     MessageStore ms, 
+                                     MessageStore ms,
                                      PersistenceManager pm,
                                      MemoryManager mm,
-                                     int fullSize, 
-                                     int pageSize, 
+                                     int fullSize,
+                                     int pageSize,
                                      int downCacheSize) throws Exception
-   {        
+   {
       if (log.isTraceEnabled()) { log.trace("creating core destination for " + destName); }
-      
+
       CoreDestination cd = getCoreDestinationInternal(isQueue, destName);
       if (cd != null)
       {
          throw new JMSException("Destination " + destName + " already deployed");
       }
-      
+
       // Might already be in db
       long id;
       Long l = getIdForDestination(isQueue, destName);
@@ -272,7 +272,7 @@
       {
          // Not in db - insert a new mapping row
          id = this.getNextId();
-         
+
          insertMappingRow(id, isQueue ? TYPE_QUEUE : TYPE_TOPIC, destName, null, null, null, null);
 
       }
@@ -280,15 +280,30 @@
       {
          id = l.longValue();
       }
-      
+
+      // Put in id map too
+
+      JBossDestination jbd ;
+
+      if (isQueue)
+      {
+         jbd = new JBossQueue(destName);
+      }
+      else
+      {
+         jbd =  new JBossTopic(destName);
+      }
+
+      idMap.put(new Long(id), jbd);
+
       // TODO I am using LocalQueues for the time being, switch to distributed Queues
       if (isQueue)
       {
          // We allocate an executor for the queue from the rotating pool
          QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
-         
+
          cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize, executor);
-         
+
          try
          {
             // we load the queue with any state it might have in the db
@@ -301,22 +316,22 @@
             e2.setLinkedException(e);
             throw e2;
          }
-         
+
          queues.put(destName, cd);
       }
       else
       {
          // TODO I am using LocalTopics for the time being, switch to distributed Topics
          cd = new Topic(id, fullSize, pageSize, downCacheSize);
-         
+
          topics.put(destName, cd);
-         
+
          // TODO: The following piece of code may be better placed either in the Topic itself or in
          //       the DurableSubscriptionStore - I'm not sure it really belongs here
-         
+
          // Load any durable subscriptions for the Topic
          List durableSubs = loadDurableSubscriptionsForTopic(destName, ms, pm, mm);
-         
+
          Iterator iter = durableSubs.iterator();
          while (iter.hasNext())
          {
@@ -337,138 +352,123 @@
             sub.connect();
          }
       }
-      
-      // Put in id map too
-      
-      JBossDestination jbd ;
-      
-      if (isQueue)
-      {
-         jbd = new JBossQueue(destName);
-      }
-      else
-      {
-         jbd =  new JBossTopic(destName);
-      }
-      
-      idMap.put(new Long(id), jbd);
 
       log.debug("core destination " + cd + " (fullSize=" + fullSize + ", pageSize=" +
                 pageSize  + ", downCacheSize=" + downCacheSize + ") deployed");
    }
-         
+
    public CoreDestination undeployCoreDestination(boolean isQueue, String destName)
       throws Exception
    {
       Map m = isQueue ? queues : topics;
-      
+
       CoreDestination dest = (CoreDestination)m.remove(destName);
-      
+
       if (dest != null)
-      {      
+      {
          idMap.remove(new Long(dest.getId()));
-         
+
          //If topic need to remove durable subs from map too
-         
+
          if (!isQueue)
          {
             Topic topic = (Topic)dest;
-            
+
             Iterator iter = topic.iterator();
-            
+
             while (iter.hasNext())
             {
                CoreSubscription sub = (CoreSubscription)iter.next();
-               
+
                if (sub.isRecoverable())
                {
                   DurableSubscription dursub = (DurableSubscription)sub;
-                  
+
                   String clientID = dursub.getClientID();
-                  
+
                   String name = dursub.getName();
-                  
+
                   removeDurableSubscriptionInMemory(clientID, name);
                }
             }
          }
-         
+
       }
-      
+
       return dest;
    }
-   
-   
-   public void deployTemporaryCoreDestination(boolean isQueue, 
+
+
+   public void deployTemporaryCoreDestination(boolean isQueue,
             String destName,
             long id,
-            MessageStore ms, 
+            MessageStore ms,
             PersistenceManager pm,
             MemoryManager mm,
-            int fullSize, 
-            int pageSize, 
+            int fullSize,
+            int pageSize,
             int downCacheSize) throws Exception
-   {        
+   {
       if (log.isTraceEnabled()) { log.trace("creating temporary core destination for " + destName); }
-      
+
+      //Put in id map too
+
+      JBossDestination jbd ;
+
+      if (isQueue)
+      {
+         jbd = new JBossQueue(destName);
+      }
+      else
+      {
+         jbd =  new JBossTopic(destName);
+      }
+
+      idMap.put(new Long(id), jbd);
+
       CoreDestination cd = getCoreDestinationInternal(isQueue, destName);
       if (cd != null)
       {
          throw new JMSException("Destination " + destName + " already deployed");
-      }      
-      
+      }
+
       if (isQueue)
       {
          //We allocate an executor for the queue from the rotating pool
          QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
-         
-         cd = new Queue(id, ms, pm, mm, false, fullSize, pageSize, downCacheSize, executor);                 
-         
+
+         cd = new Queue(id, ms, pm, mm, false, fullSize, pageSize, downCacheSize, executor);
+
          queues.put(destName, cd);
       }
       else
       {
          cd = new Topic(id, fullSize, pageSize, downCacheSize);
-         
-         topics.put(destName, cd);         
+
+         topics.put(destName, cd);
       }
-      
-      //Put in id map too
-      
-      JBossDestination jbd ;
-      
-      if (isQueue)
-      {
-         jbd = new JBossQueue(destName);
-      }
-      else
-      {
-         jbd =  new JBossTopic(destName);
-      }
-      
-      idMap.put(new Long(id), jbd);
-      
+
       log.debug("core destination " + cd + " (fullSize=" + fullSize + ", pageSize=" +
                pageSize  + ", downCacheSize=" + downCacheSize + ") deployed");
    }
-   
+
    public CoreDestination undeployTemporaryCoreDestination(boolean isQueue, String destName)
       throws Exception
    {
       Map m = isQueue ? queues : topics;
-      
+
       CoreDestination dest = (CoreDestination)m.remove(destName);
-      
+
       if (dest != null)
-      {      
-         idMap.remove(new Long(dest.getId())); 
+      {
+         idMap.remove(new Long(dest.getId()));
       }
-      
+
       return dest;
    }
-   
+
    public DurableSubscription getDurableSubscription(String clientID,
-                                                     String subscriptionName,                                                         
+                                                     String subscriptionName,
                                                      MessageStore ms,
                                                      PersistenceManager pm,
                                                      MemoryManager mm)
@@ -476,29 +476,29 @@
    {
       // look in memory first
       DurableSubscription sub = getDurableSubscription(clientID, subscriptionName);
-      
+
       if (sub != null)
       {
          return sub;
       }
-      
+
       // now look in the db
       Connection conn = null;
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(selectDurableSub);
-         
+
          ps.setString(1, clientID);
          ps.setString(2, subscriptionName);
-         
+
          boolean exists = false;
-         
+
          try
          {
             rs = ps.executeQuery();
@@ -512,29 +512,29 @@
                log.trace(s + (rs == null ? " failed!" : (exists ? " returned rows" : " did NOT return rows")));
             }
          }
-         
+
          if (exists)
          {
             String topicName = rs.getString(1);
             long id = rs.getLong(2);
             String selector = rs.getString(3);
             boolean noLocal = rs.getString(4).equals("Y");
-            
+
             Selector sel = selector == null ? null : new Selector(selector);
-            
+
             Map subs = (Map)subscriptions.get(clientID);
             if (subs == null)
             {
                subs = new ConcurrentReaderHashMap();
                subscriptions.put(clientID, subs);
             }
-            
+
             //The subscription might be in the database, but the Topic which owns the subscription
             //might not be deployed.
             //In this case the user needs to make sure the Topic is deployed
-            
+
             Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-            
+
             if (topic == null)
             {
                throw new MessagingJMSException("Unable to get subscription: " + subscriptionName
@@ -542,16 +542,16 @@
                      + clientID + " which belongs to topic: " + topicName
                      + " since this topic is not currently deployed. Please deploy the topic and try again");
             }
-            
-            
+
+
             // create in memory
             sub = createDurableSubscriptionInternal(id, topicName, clientID, subscriptionName, sel,
                                                     noLocal, ms, pm, mm);
-            
+
             // load its state
             sub.load();
          }
-         
+
          return sub;
       }
       finally
@@ -569,7 +569,7 @@
             conn.close();
          }
          wrap.end();
-      }      
+      }
    }
 
    public DurableSubscription createDurableSubscription(String topicName,
@@ -582,17 +582,17 @@
                                                         MemoryManager mm) throws Exception
    {
       Selector sel = selector == null ? null : new Selector(selector);
-      
+
       long id;
 
       // first insert a row in the db
       id = this.getNextId();
-      
+
       insertMappingRow(id, TYPE_DURABLE_SUB, topicName, subscriptionName, clientID,
                        selector, new Boolean(noLocal));
-            
+
       return createDurableSubscriptionInternal(id, topicName, clientID, subscriptionName, sel,
-                                               noLocal, ms, pm, mm);          
+                                               noLocal, ms, pm, mm);
    }
 
    public Subscription createSubscription(String topicName, String selector, boolean noLocal,
@@ -603,9 +603,9 @@
       Selector sel = selector == null ? null : new Selector(selector);
 
       long id = this.getNextId();
-      
+
       Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-      
+
       if (topic == null)
       {
          throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
@@ -614,24 +614,24 @@
       //We allocate an executor for the subscription from the rotating pool
       //Currently all subscriptions for the same topic share the same executor
       QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
-               
+
       return new Subscription(id, topic, ms, pm, mm,
                               topic.getFullSize(), topic.getPageSize(),
                               topic.getDownCacheSize(), executor, sel, noLocal);
    }
-   
-   
-   
+
+
+
    public boolean removeDurableSubscription(String clientID, String subscriptionName)
       throws Exception
    {
       DurableSubscription removed = removeDurableSubscriptionInMemory(clientID, subscriptionName);
-      
+
       if (removed != null)
       {
          //Now remove from db
          deleteMappingRow(removed.getChannelID());
-         
+
          return true;
       }
       else
@@ -639,7 +639,7 @@
          return false;
       }
    }
-   
+
    //FIXME - This doesn't belong here
    public String getPreConfiguredClientID(String username) throws Exception
    {
@@ -649,24 +649,24 @@
          PreparedStatement ps  = null;
          ResultSet rs = null;
          TransactionWrapper wrap = new TransactionWrapper();
-         
+
          try
          {
             conn = ds.getConnection();
-            
+
             ps = conn.prepareStatement(selectPreConfClientId);
-            
+
             ps.setString(1, username);
-            
+
             rs = ps.executeQuery();
-            
+
             String clientID = null;
-            
+
             if (rs.next())
             {
                clientID = rs.getString(1);
             }
-            
+
             return clientID;
          }
          catch (SQLException e)
@@ -700,11 +700,11 @@
          throw e2;
       }
    }
-   
- 
-   
+
+
+
    // MBean operations ----------------------------------------------
-   
+
    public String getSqlProperties()
    {
       try
@@ -718,25 +718,25 @@
          return "";
       }
    }
-   
+
    public void setSqlProperties(String value)
    {
       try
       {
-         
+
          ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
          sqlProperties = new Properties();
          sqlProperties.load(is);
-         
+
       }
       catch (IOException shouldnothappen)
       {
          log.error("Caught IOException", shouldnothappen);
       }
    }
-   
+
    // Public --------------------------------------------------------
-   
+
    /**
     * Managed attribute.
     */
@@ -744,7 +744,7 @@
    {
       this.dataSourceJNDIName = dataSourceJNDIName;
    }
-   
+
    /**
     * Managed attribute.
     */
@@ -752,7 +752,7 @@
    {
       return dataSourceJNDIName;
    }
-   
+
    /**
     * Managed attribute.
     */
@@ -760,7 +760,7 @@
    {
       this.tmObjectName = tmObjectName;
    }
-   
+
    /**
     * Managed attribute.
     */
@@ -784,7 +784,7 @@
    {
       createTablesOnStartup = b;
    }
-   
+
    /**
     * @return a Set<String>. It may return an empty Set, but never null.
     */
@@ -800,75 +800,75 @@
       result.addAll(m.keySet());
       return result;
    }
-   
+
    public String toString()
    {
       return "JDBCChannelMapper[" + Integer.toHexString(hashCode()) + "]";
    }
-   
+
    // Package protected ---------------------------------------------
-   
+
    // Protected -----------------------------------------------------
-   
+
    protected DurableSubscription removeDurableSubscriptionInMemory(String clientID, String subscriptionName)
       throws JMSException
    {
       if (log.isTraceEnabled()) { log.trace("removing durable subscription " + subscriptionName); }
-      
+
       if (clientID == null)
       {
          throw new JMSException("Client ID must be set for connection!");
       }
-   
+
       Map subs = (Map)subscriptions.get(clientID);
-   
+
       if (subs == null)
       {
          return null;
       }
-   
+
       if (log.isTraceEnabled()) { log.trace("removing durable subscription " + subscriptionName); }
-   
+
       DurableSubscription removed = (DurableSubscription)subs.remove(subscriptionName);
-      
+
       if (removed == null)
       {
          return null;
       }
-   
+
       if (subs.size() == 0)
       {
          subscriptions.remove(clientID);
       }
-                  
+
       return removed;
    }
-   
-   protected List loadDurableSubscriptionsForTopic(String topicName,                                               
+
+   protected List loadDurableSubscriptionsForTopic(String topicName,
                                                    MessageStore ms,
                                                    PersistenceManager pm,
                                                    MemoryManager mm) throws Exception
-   {              
+   {
       List result = new ArrayList();
-      
+
       Connection conn = null;
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
       int rowCount = -1;
-      
+
       try
       {
          conn = ds.getConnection();
 
          ps = conn.prepareStatement(selectSubscriptionsForTopic);
-         
+
          ps.setString(1, topicName);
-         
+
          rs = ps.executeQuery();
-         
+
          rowCount = 0;
-         
+
          while (rs.next())
          {
             rowCount ++;
@@ -877,11 +877,11 @@
             String subName = rs.getString(3);
             String selector = rs.getString(4);
             boolean noLocal = rs.getString(5).equals("Y");
-            
+
             Selector sel = selector == null ? null : new Selector(selector);
-                              
+
             DurableSubscription sub = getDurableSubscription(clientId, subName);
-            
+
             if (sub == null)
             {
                sub = createDurableSubscriptionInternal(id, topicName,
@@ -891,7 +891,7 @@
                                                       noLocal,
                                                       ms, pm, mm);
                result.add(sub);
-            }                  
+            }
          }
       }
       catch (SQLException e)
@@ -905,7 +905,7 @@
          {
             String s = JDBCUtil.statementToString(selectSubscriptionsForTopic, topicName);
             log.trace(s + " returned " + rowCount + " rows");
-         }            
+         }
          if (rs != null)
          {
             rs.close();
@@ -920,12 +920,12 @@
          }
          wrap.end();
       }
-      
+
       return result;
    }
 
-   
 
+
    protected void insertMappingRow(long id, char type, String jmsDestName, String jmsSubName, String clientID,
                                    String selector, Boolean noLocal) throws Exception
    {
@@ -933,27 +933,27 @@
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       boolean failed = true;
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(insertMapping);
-         
+
          ps.setLong(1, id);
-         
-         ps.setString(2, String.valueOf(type)); 
-         
+
+         ps.setString(2, String.valueOf(type));
+
          ps.setString(3, jmsDestName);
-         
+
          ps.setString(4, jmsSubName);
-         
+
          ps.setString(5, clientID);
-         
+
          ps.setString(6, selector);
-         
+
          if (noLocal == null)
          {
             ps.setNull(7, Types.VARCHAR);
@@ -962,14 +962,14 @@
          {
             ps.setString(7, noLocal.booleanValue() ? "Y" : "N");
          }
-                  
+
          int rows = ps.executeUpdate();
-         
+
          failed = rows != 1;
-          
+
          ps.close();
-         
-         ps = null;        
+
+         ps = null;
       }
       finally
       {
@@ -980,7 +980,7 @@
                   clientID, selector, noLocal);
             log.trace(s + (failed ? " failed!" : " executed successfully"));
          }
-         
+
          if (rs != null)
          {
             try
@@ -1011,26 +1011,26 @@
          wrap.end();
       }
    }
-   
+
    protected Long getIdForDestination(boolean isQueue, String destName) throws Exception
    {
       Connection conn = null;
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(selectIdForDestination);
-         
+
          ps.setString(1, String.valueOf(isQueue ? TYPE_QUEUE : TYPE_TOPIC));
-         
+
          ps.setString(2, destName);
-        
-         rs = ps.executeQuery();  
-         
+
+         rs = ps.executeQuery();
+
          if (rs.next())
          {
             return new Long(rs.getLong(1));
@@ -1038,7 +1038,7 @@
          else
          {
             return null;
-         }         
+         }
       }
       finally
       {
@@ -1072,26 +1072,26 @@
          wrap.end();
       }
    }
-   
-   
+
+
    protected boolean deleteMappingRow(long id) throws Exception
    {
       Connection conn = null;
       PreparedStatement ps  = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       boolean failed = true;
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(deleteMapping);
-         
+
          ps.setLong(1, id);
-        
-         failed = ps.executeUpdate() != -1;      
-         
+
+         failed = ps.executeUpdate() != -1;
+
          return !failed;
       }
       finally
@@ -1122,7 +1122,7 @@
          wrap.end();
       }
    }
-   
+
    protected DurableSubscription getDurableSubscription(String clientID,
                                                         String subscriptionName) throws JMSException
    {
@@ -1147,98 +1147,98 @@
          subs = new ConcurrentReaderHashMap();
          subscriptions.put(clientID, subs);
       }
-      
+
       Topic topic = (Topic)getCoreDestinationInternal(false, topicName);
-      
+
       if (topic == null)
       {
          throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
       }
-       
+
       // We allocate an executor for the subscription from the rotating pool.
       // Currently all subscriptions for the same topic share the same executor.
       QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
-             
+
       DurableSubscription subscription =
          new DurableSubscription(id, topic, ms, pm, mm,
                topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(),
                executor, selector,
                noLocal, subscriptionName, clientID);
-      
+
       subs.put(subscriptionName, subscription);
-      
+
       return subscription;
-   }   
-   
+   }
+
    protected CoreDestination getCoreDestinationInternal(boolean isQueue, String destName)
    {
       Map m = isQueue ? queues : topics;
-      
+
       return (CoreDestination)m.get(destName);
    }
-   
+
    protected void initSqlProperties()
    {
       createUserTable = sqlProperties.getProperty("CREATE_USER_TABLE", createUserTable);
       createRoleTable = sqlProperties.getProperty("CREATE_ROLE_TABLE", createRoleTable);
-      selectPreConfClientId = sqlProperties.getProperty("SELECT_PRECONF_CLIENTID", selectPreConfClientId);      
+      selectPreConfClientId = sqlProperties.getProperty("SELECT_PRECONF_CLIENTID", selectPreConfClientId);
       createMappingTable = sqlProperties.getProperty("CREATE_MAPPING_TABLE", createMappingTable);
       insertMapping = sqlProperties.getProperty("INSERT_MAPPING", insertMapping);
       deleteMapping = sqlProperties.getProperty("DELETE_MAPPING", deleteMapping);
       selectIdForDestination = sqlProperties.getProperty("SELECT_ID_FOR_DESTINATION", selectIdForDestination);
       selectDurableSub = sqlProperties.getProperty("SELECT_DURABLE_SUB", selectDurableSub);
       selectSubscriptionsForTopic = sqlProperties.getProperty("SELECT_SUBSCRIPTIONS_FOR_TOPIC", selectSubscriptionsForTopic);
-   
+
       for (Iterator i = sqlProperties.entrySet().iterator(); i.hasNext();)
       {
          Map.Entry entry = (Map.Entry) i.next();
          String key = (String) entry.getKey();
          if (key.startsWith("POPULATE.TABLES."))
             populateTables.add(entry.getValue());
-      }      
+      }
    }
-   
+
    protected void createSchema() throws Exception
-   {      
-      Connection conn = null;      
+   {
+      Connection conn = null;
       TransactionWrapper tx = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          try
          {
-            if (log.isTraceEnabled()) { log.trace("Creating JMS_USERS table"); }            
+            if (log.isTraceEnabled()) { log.trace("Creating JMS_USERS table"); }
             conn.createStatement().executeUpdate(createUserTable);
          }
-         catch (SQLException e) 
+         catch (SQLException e)
          {
             log.debug("Failed to create users table: " + createUserTable, e);
          }
-         
+
          try
          {
             if (log.isTraceEnabled()) { log.trace("Creating JMS_ROLES table"); }
             conn.createStatement().executeUpdate(createRoleTable);
          }
-         catch (SQLException e) 
+         catch (SQLException e)
          {
             log.debug("Failed to create roles table: " + createRoleTable, e);
          }
-         
+
          try
          {
             if (log.isTraceEnabled()) { log.trace("Creating JMS_SUBSCRIPTIONS table"); }
             conn.createStatement().
             executeUpdate(createMappingTable);
          }
-         catch (SQLException e) 
+         catch (SQLException e)
          {
             log.debug("Failed to create subscriptions table: " + createMappingTable, e);
          }
-         
-         
+
+
          //Insert user-role data
          Iterator iter = populateTables.iterator();
          String nextQry = null;
@@ -1247,8 +1247,8 @@
             Statement st = null;
             try
             {
-               nextQry = (String) iter.next();               
-               st = conn.createStatement();              
+               nextQry = (String) iter.next();
+               st = conn.createStatement();
                st.executeUpdate(nextQry);
             }
             catch (SQLException ignored)
@@ -1256,7 +1256,7 @@
                log.debug("Error populating tables: " + nextQry, ignored);
             }
          }
-         
+
       }
       finally
       {
@@ -1271,14 +1271,14 @@
          }
          tx.end();
       }
-      
+
    }
-   
+
    protected long getNextId() throws Exception
    {
       return channelIDManager.getId();
    }
-   
+
    // Private -------------------------------------------------------
 
    // never access directly
@@ -1300,23 +1300,23 @@
    }
 
    // Inner classes -------------------------------------------------
-   
+
    /*
     * TODO This inner class is duplicated from HSQLPersistenceManager - need to avoid code duplication
     */
    class TransactionWrapper
    {
       private javax.transaction.Transaction oldTx;
-      
+
       private TransactionWrapper() throws Exception
       {
          TransactionManager tm = getTransactionManagerReference();
 
          oldTx = tm.suspend();
-         
+
          tm.begin();
       }
-      
+
       private void end() throws Exception
       {
          TransactionManager tm = getTransactionManagerReference();
@@ -1340,11 +1340,11 @@
             }
          }
       }
-      
+
       private void exceptionOccurred() throws Exception
       {
          getTransactionManagerReference().setRollbackOnly();
       }
    }
-   
+
 }

Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java	2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/QueueTest.java	2007-01-11 17:27:19 UTC (rev 1960)
@@ -31,6 +31,7 @@
 import javax.naming.InitialContext;
 
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -43,39 +44,39 @@
 public class QueueTest extends MessagingTestCase
 {
    // Constants -----------------------------------------------------
-   
+
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
 
    protected InitialContext ic;
    protected ConnectionFactory cf;
 
    // Constructors --------------------------------------------------
-   
+
    public QueueTest(String name)
    {
       super(name);
    }
-   
+
    // TestCase overrides -------------------------------------------
-   
+
    public void setUp() throws Exception
    {
-      super.setUp();                  
-      
+      super.setUp();
+
       ServerManagement.start("all");
-      
-      
+
+
       ic = new InitialContext(ServerManagement.getJNDIEnvironment());
       cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
-      
+
       ServerManagement.undeployQueue("TestQueue");
       ServerManagement.deployQueue("TestQueue");
 
       log.debug("setup done");
    }
-   
+
    public void tearDown() throws Exception
    {
       ServerManagement.undeployQueue("TestQueue");
@@ -83,8 +84,8 @@
 
       log.debug("tear down done");
    }
-   
-   
+
+
    // Public --------------------------------------------------------
 
    /**
@@ -117,6 +118,62 @@
       }
    }
 
+   /**
+    * The simplest possible queue test.
+    */
+   public void testRedeployQueue() throws Exception
+   {
+      Queue queue = (Queue)ic.lookup("/queue/TestQueue");
+
+      Connection conn = cf.createConnection();
+
+      try
+      {
+         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer p = s.createProducer(queue);
+         MessageConsumer c = s.createConsumer(queue);
+         conn.start();
+
+         for (int i = 0; i < 500; i++)
+         {
+            p.send(s.createTextMessage("payload " + i));
+         }
+
+         //ServerManagement.undeployQueue("TestQueue");
+
+         log.info("Stopping server");
+         ServerManagement.stopServerPeer();
+
+         log.info("Starting server");
+         ServerManagement.startServerPeer();
+         ServerManagement.deployQueue("TestQueue");
+
+         ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+         cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+         conn = cf.createConnection();
+         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         p = s.createProducer(queue);
+         c = s.createConsumer(queue);
+         conn.start();
+
+         for (int i = 0; i < 500; i++)
+         {
+            TextMessage message = (TextMessage)c.receive(3000);
+            assertNotNull(message);
+            assertNotNull(message.getJMSDestination());
+         }
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
    public void testQueueName() throws Exception
    {
       Queue queue = (Queue)ic.lookup("/queue/TestQueue");
@@ -124,12 +181,12 @@
    }
 
    // Package protected ---------------------------------------------
-   
+
    // Protected -----------------------------------------------------
-   
+
    // Private -------------------------------------------------------
-   
+
    // Inner classes -------------------------------------------------
-   
+
 }
 

Modified: branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java
===================================================================
--- branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java	2007-01-11 17:08:16 UTC (rev 1959)
+++ branches/JBOSS_MESSAGING_1_0_2_SP2_JBMESSAGING-742/tests/src/org/jboss/test/messaging/jms/server/plugin/JDBCChannelMapperTest.java	2007-01-11 17:27:19 UTC (rev 1960)
@@ -23,6 +23,9 @@
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
 
 import javax.naming.InitialContext;
 import javax.sql.DataSource;
@@ -35,13 +38,18 @@
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.plugin.contract.ChannelMapper;
 import org.jboss.jms.server.subscription.DurableSubscription;
+import org.jboss.jms.message.JBossTextMessage;
 import org.jboss.messaging.core.local.CoreDestination;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.local.Topic;
 import org.jboss.messaging.core.persistence.JDBCUtil;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.Channel;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
 import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleChannel;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.tm.TransactionManagerService;
 import org.jboss.util.id.GUID;
@@ -50,9 +58,10 @@
  * These tests must not be run in remote mode!
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>1.1</tt>
+ * @author <a href="mailto:Galder.Zamarreno at jboss.com">Galder Zamarreno</a>
+ * @version <tt>$Release$</tt>
  *
- * JDBCChannelMapperTest.java,v 1.1 2006/02/28 16:48:15 timfox Exp
+ * $Id: JDBCChannelMapperTest.java 1958 2007-01-11 16:55:40Z clebert.suconic at jboss.com $
  */
 public class JDBCChannelMapperTest extends MessagingTestCase
 {
@@ -87,18 +96,66 @@
       ms = ServerManagement.getMessageStore();
       pm = ServerManagement.getPersistenceManager();
       channelMapper = ServerManagement.getChannelMapper();
-      
 
+
       log.debug("setup done");
    }
 
    public void tearDown() throws Exception
    {
       log.debug("starting tear down");
-      
+
       super.tearDown();
    }
 
+   // I had to place this test as the firs one one the list..
+   // it's dependent on the channelMapper empty..
+   // It won't affect the other tests
+   public void testSendRedeployAndConsume() throws Throwable
+   {
+      /* Deploy 'paradise' as destination queue */
+      channelMapper.deployCoreDestination(true, "paradise", ms, pm, null, 100, 20, 10);
+      /* Deploy 'earth' as queue to reply to */
+      channelMapper.deployCoreDestination(true, "earth", ms, pm, null, 100, 20, 10);
+
+      Channel channel = new SimpleChannel(0, ms);
+      /* Create a message for 'paradise' queue destination */
+      Message[] m = createMessages(1);
+      List refs = new ArrayList();
+      MessageReference ref1 = ms.reference(m[0]);
+
+      ref1.setOrdering(11);
+      refs.add(ref1);
+
+      /* We persist the message */
+      pm.addReferences(channel.getChannelID(), refs, false);
+
+      /* Delete the message from memory so that on redeployment, messaging is forced
+      * to retrieve it from the database */
+      ms.forgetMessage(0);
+
+      log.info("********************** UnDeploying coreDestination");
+      channelMapper.undeployCoreDestination(true, "paradise");
+      channelMapper.undeployCoreDestination(true, "earth");
+
+      log.info("********************** Deploying coreDestination");
+
+      /* We redeploy the target destination which will load the message from the database */
+      channelMapper.deployCoreDestination(true, "paradise", ms, pm, null, 100, 20, 10);
+
+      JBossDestination jd = channelMapper.getJBossDestination(0);
+      Queue q = (Queue) channelMapper.getCoreDestination(jd);
+
+      /* Browse the pending messages */
+      List l = q.browse();
+      JBossTextMessage message = (JBossTextMessage)l.get(0);
+      /* Browse the pending messages. There's only one and its destination should not be
+      * null, in fact, it should be 'paradise' */
+      assertNotNull(message.getJMSDestination());
+      assertEquals("Destination should be 'paradise'","paradise",
+         ((JBossDestination)message.getJMSDestination()).getName());
+   }
+
    public void testCreateGetRemoveDurableSubscription() throws Exception
    {
       String topicName = new GUID().toString();
@@ -214,133 +271,131 @@
 
       assertNotNull(theClientID);
       assertEquals(clientID, theClientID);
-           
+
    }
-   
+
    public void testGetDeployCoreDestinationTest() throws Exception
    {
       //Lookup a non existent core destination -  verify returns null
-      
+
       JBossQueue queue = new JBossQueue("queue1");
-      
+
       JBossTopic topic = new JBossTopic("topic1");
-      
+
       CoreDestination cd = channelMapper.getCoreDestination(queue);
-      
+
       assertNull(cd);
-      
+
       cd = channelMapper.getCoreDestination(topic);
-      
+
       assertNull(cd);
-      
+
       //Lookup a non existent jboss destination - verify returns null
-      
+
       JBossDestination jbd = channelMapper.getJBossDestination(123);
-      
+
       assertNull(jbd);
-      
+
       //Deploy core destinations
-      
+
       channelMapper.deployCoreDestination(true, "queue1", ms, pm, null, 100, 20, 10);
-      
+
       channelMapper.deployCoreDestination(false, "topic1", ms, pm, null, 100, 20, 10);
-      
+
       //Lookup core dest
-      
+
       CoreDestination cd1 = channelMapper.getCoreDestination(queue);
-      
+
       assertNotNull(cd1);
-      
+
       assertTrue(cd1 instanceof Queue);
-      
+
       Queue q = (Queue)cd1;
-      
+
       CoreDestination cd2 = channelMapper.getCoreDestination(topic);
-      
+
       assertNotNull(cd2);
-      
+
       assertTrue(cd2 instanceof Topic);
-      
+
       Topic t = (Topic)cd2;
-                  
+
       //Lookup jboss dest
-      
+
       JBossDestination jb1 = channelMapper.getJBossDestination(q.getChannelID());
-      
+
       assertNotNull(jb1);
-      
+
       assertEquals("queue1", jb1.getName());
-      
+
       JBossDestination jb2 = channelMapper.getJBossDestination(t.getId());
-      
+
       assertNotNull(jb2);
-      
+
       assertEquals("topic1", jb2.getName());
-      
+
       //undeploy core dest
-      
+
       CoreDestination cd3 = channelMapper.undeployCoreDestination(true, "queue1");
-      
+
       assertNotNull(cd3);
-      
+
       assertEquals(q.getChannelID(), cd3.getId());
-      
+
       CoreDestination cd4 = channelMapper.undeployCoreDestination(false, "topic1");
-      
+
       assertNotNull(cd3);
-      
+
       assertEquals(t.getId(), cd4.getId());
-      
+
       cd3 = channelMapper.undeployCoreDestination(true, "queue1");
-      
+
       assertNull(cd3);
-      
+
       cd4 = channelMapper.undeployCoreDestination(false, "topic1");
-      
+
       assertNull(cd4);
-      
+
       //Lookup core dest - null
-      
+
       cd3 = channelMapper.getCoreDestination(queue);
-      
+
       assertNull(cd3);
-      
+
       cd4 = channelMapper.getCoreDestination(topic);
-      
+
       assertNull(cd4);
-      
+
       //lookup jboss dest - null
-      
+
       jb1 = channelMapper.getJBossDestination(q.getChannelID());
-      
+
       assertNull(jb1);
-      
+
       jb2 = channelMapper.getJBossDestination(t.getId());
-      
+
       assertNull(jb2);
-      
+
       //Deploy a core dest
-      
+
       channelMapper.deployCoreDestination(true, "queue1", ms, pm, null, 100, 20, 10);
-      
+
       channelMapper.deployCoreDestination(false, "topic1", ms, pm, null, 100, 20, 10);
-            
+
       //lookup core dest - verify has same id
-      
+
       cd3 = channelMapper.getCoreDestination(queue);
-      
+
       assertNotNull(cd3);
-      
+
       assertEquals(cd3.getId(), cd1.getId());
-      
+
       cd4 = channelMapper.getCoreDestination(topic);
-      
+
       assertNotNull(cd4);
-      
+
       assertEquals(cd4.getId(), cd2.getId());
    }
-   
-   
 
    // Package protected ---------------------------------------------
 
@@ -348,6 +403,35 @@
 
    // Private -------------------------------------------------------
 
+   private Message[] createMessages(int num) throws Throwable
+   {
+      //Generate some messages with a good range of attribute values
+      Message[] messages = new Message[num];
+      for (int i = 0; i < num; i++)
+      {
+         messages[i] = createMessage((byte)i, true);
+      }
+      return messages;
+   }
+
+   private Message createMessage(byte i, boolean reliable) throws Throwable
+   {
+      return new JBossTextMessage(i,
+                           reliable,
+                           System.currentTimeMillis() + 1000 * 60 * 60,
+                           System.currentTimeMillis(),
+                           (byte)(i % 10),
+                           new HashMap(),
+                           "hello".getBytes(),
+                           0,
+                           "msg",
+                           "1",
+                           "1".getBytes(),
+                           new JBossQueue("paradise"),
+                           new JBossQueue("earth"),
+                           new HashMap());
+   }
+
    // Inner classes -------------------------------------------------
 
 }




More information about the jboss-cvs-commits mailing list